"""Tests for CHAPPS policy module"""
import pytest
import logging
import redis
import time
import spf
from unittest.mock import Mock
from chapps.policy import (
EmailPolicy,
GreylistingPolicy,
OutboundQuotaPolicy,
SenderDomainAuthPolicy,
TIME_FORMAT,
)
from chapps.config import CHAPPSConfig
from chapps.tests.test_config.conftest import (
chapps_mock_cfg_path,
chapps_mock_env,
chapps_mock_config,
chapps_mock_config_file,
chapps_sentinel_cfg_path,
chapps_sentinel_env,
chapps_sentinel_config,
chapps_sentinel_config_file,
)
from chapps.tests.test_policy.conftest import _auto_ppr_param_list, idfn
from chapps.models import Quota
from inspect import isclass
seconds_per_day = 3600 * 24
pytestmark = pytest.mark.order(3)
SKIP_SENTINEL = True
[docs]class Test_EmailPolicy:
"""Tests of the base policy class"""
[docs] def test_rediskey(self):
"""
GIVEN a prefix and some arguments
WHEN we ask EmailPolicy for a redis key
THEN it should glue everything together with colons
"""
prefix = "pre"
args = ["foo", "bar"]
redis_key = EmailPolicy.rediskey(prefix, *args)
assert redis_key == "pre:foo:bar"
[docs] def test_fmtkey(self):
"""
GIVEN some arguments
WHEN we ask EmailPolicy to format a key
THEN it should use the prefix 'grl', and glue all together with colons
"""
args = ["foo", "bar"]
redis_key = EmailPolicy._fmtkey(*args)
assert redis_key == "chapps:foo:bar"
[docs] def test_approval_not_implemented(self, allowable_ppr):
"""
GIVEN an instance of EmailPolicy or a subclass
WHEN the superclass/abstract version of approve_policy_request is called
THEN a NotImplementedError should be raised
"""
policy = EmailPolicy()
with pytest.raises(NotImplementedError):
assert policy.approve_policy_request(allowable_ppr)
[docs] def test_connect_to_redis(self):
"""
GIVEN a new EmailPolicy
WHEN asked for a Redis handle
THEN return a Redis handle
"""
policy = EmailPolicy()
assert policy.redis.ping()
[docs] @pytest.mark.skipif(SKIP_SENTINEL, reason="Skipping Sentinel tests")
def test_connect_to_sentinel(
self, chapps_sentinel_env, chapps_sentinel_config_file
):
"""
GIVEN Sentinel servers are listed in the config file
WHEN asked for a Redis handle
THEN use Sentinel server and dataset information to get a read-write Redis handle
"""
sentinel_config = CHAPPSConfig.get_config()
policy = EmailPolicy(cfg=sentinel_config)
assert policy.redis.ping()
assert policy.sentinel is not None
[docs]class Test_PostfixActions:
"""Tests for abstract Postfix action superclass"""
[docs] def test_okay(self, postfix_actions):
"""Returns OK"""
result = postfix_actions.okay()
assert result == "OK"
[docs] def test_okay_accepts_useless_message(self, postfix_actions):
"""Returns OK even if a message is supplied"""
arbitrary_message = "This message will be ignored."
result = postfix_actions.okay(arbitrary_message)
assert result == "OK"
[docs] def test_dunno(self, postfix_actions):
"""Returns DUNNO"""
result = postfix_actions.dunno()
assert result == "DUNNO"
[docs] def test_dunno_accepts_useless_message(self, postfix_actions):
"""Returns DUNNO even if a message is supplied"""
arbitrary_message = "This message will be ignored."
result = postfix_actions.dunno(arbitrary_message)
assert result == "DUNNO"
[docs] def test_action_for_raises_not_implemented_error(self, postfix_actions):
"""The method action_for() is abstract and not implemented by PostfixActions"""
with pytest.raises(NotImplementedError):
assert postfix_actions.action_for("foo")
[docs]class Test_PostfixOQPActions:
"""Testing outbound quota actions for Postfix"""
[docs] def test_pass_yields_okay(self, oqp_actions):
"""Pass returns OK"""
### When the calling routine gets True from the policy, it will call .pass()
### and when it gets false, it will call .fail()
### It may be that the acceptance message, even for outbound quota, should still be DUNNO
result = oqp_actions.passing()
assert result == "OK" or result == "DUNNO"
[docs] def test_fail_yields_rejection(self, oqp_actions):
"""Fail returns a string starting with '554 Rejected' or 'REJECT'"""
### Postfix will insert enhanced status code 5.7.1 which is the code
### we want anyhow. There seems to be no code for gateways to use
### to signal that the user's outbound quota has been reached.
result = oqp_actions.fail()
assert result[0:7] == "REJECT " or result[0:12] == "554 Rejected"
[docs]class Test_PostfixGRLActions:
"""Testing greylisting actions for Postfix"""
[docs] def test_pass_yields_dunno(self, grl_actions):
"""Pass returns DUNNO"""
result = grl_actions.passing()
assert result == "DUNNO"
[docs] def test_fail_yields_rejection(self, grl_actions):
"""Fail returns a string starting with DEFER_IF_PERMIT"""
result = grl_actions.fail()
assert result[0:16] == "DEFER_IF_PERMIT "
[docs]class Test_GreylistingPolicy_Base:
"""Tests of the greylisting policy module"""
[docs] def test_fmtkey(self):
"""
GIVEN some arguments
WHEN we ask EmailPolicy to format a key
THEN it should use the prefix 'grl', and glue all together with colons
"""
args = ["foo", "bar"]
redis_key = GreylistingPolicy._fmtkey(*args)
assert redis_key == "grl:foo:bar"
[docs] def test_tuple_key(self, caplog, allowable_inbound_ppr):
"""
GIVEN a PostfixPolicyRequest object populated with valid data
WHEN we as for a tuple key
THEN a string in the form of grl:<ip>:<sender>:<recipient> should be returned
"""
caplog.set_level(logging.DEBUG)
policy = GreylistingPolicy()
ppr = allowable_inbound_ppr
tuple_key_result = policy.tuple_key(ppr)
assert (
tuple_key_result
== f"{GreylistingPolicy.redis_key_prefix}:{ppr.client_address}:{ppr.sender}:{ppr.recipient}"
)
[docs] def test_client_key(self, caplog, allowable_inbound_ppr):
"""
GIVEN a PostfixPolicyRequest object populated with valid data
WHEN we ask for a client key
THEN a string in the form of grl:<ip> should be returned
"""
caplog.set_level(logging.DEBUG)
policy = GreylistingPolicy()
ppr = allowable_inbound_ppr
client_key_result = policy.client_key(ppr)
assert (
client_key_result
== f"{GreylistingPolicy.redis_key_prefix}:{ppr.client_address}"
)
[docs] def test_config_overrides_properly_forwarded(
self, caplog, testing_policy_grl
):
### sanity check
### this really tests functionality of the superclass, but it is much more complicated to test it there
assert (
testing_policy_grl.config.get_block(
"GreylistingPolicy"
).rejection_message
== testing_policy_grl.config.policy_grl.rejection_message
)
assert (
testing_policy_grl.params.rejection_message
== testing_policy_grl.config.policy_grl.rejection_message
)
[docs] def test_approve_policy_request(
self, caplog, monkeypatch, allowable_inbound_ppr
):
"""
GIVEN a positive policy evaluation
WHEN approve_policy_request is called
THEN it should return True
"""
caplog.set_level(logging.DEBUG)
policy = GreylistingPolicy()
ppr = allowable_inbound_ppr
instance = ppr.instance
monkeypatch.setattr(policy, "_approve_policy_request", lambda x: True)
assert policy.approve_policy_request(ppr) == True
[docs] def test_policy_request_instance_cache(
self, caplog, monkeypatch, allowable_inbound_ppr
):
"""
GIVEN a positive policy evaluation
WHEN approve_policy_request is called
THEN it should return True
"""
caplog.set_level(logging.DEBUG)
policy = GreylistingPolicy()
ppr = allowable_inbound_ppr
instance = ppr.instance
monkeypatch.setattr(policy, "_approve_policy_request", lambda x: True)
_ = policy.approve_policy_request(ppr)
assert policy.instance_cache[ppr.instance] == True
[docs]class Test_GreylistingPolicyEvaluation:
[docs] def test_first_encounter_false(
self,
caplog,
monkeypatch,
allowable_inbound_ppr,
testing_policy_grl,
populated_database_fixture,
):
"""
GIVEN a new tuple
WHEN executed
THEN return False
"""
caplog.set_level(logging.DEBUG)
policy = testing_policy_grl
### simulate a new encounter with a new client
monkeypatch.setattr(
policy, "_get_control_data", lambda x: (None, None, None)
)
response = policy._approve_policy_request(allowable_inbound_ppr)
assert not response # .passing is False
[docs] def test_pass_if_option_false(
self,
caplog,
monkeypatch,
allowable_inbound_ppr,
testing_policy_grl,
populated_database_fixture,
):
"""
GIVEN that the option is set to False
WHEN examining a policy request
THEN issue a passing response since we are not enforcing this policy
"""
caplog.set_level(logging.DEBUG)
policy = testing_policy_grl
monkeypatch.setattr(
policy, "_get_control_data", lambda x: (0, None, None)
)
response = policy._approve_policy_request(allowable_inbound_ppr)
assert response == "DUNNO" or response() == "DUNNO"
[docs] def test_pass_if_whitelisted(
self,
caplog,
monkeypatch,
helo_ppr_factory,
testing_policy_grl,
populated_database_fixture,
):
"""
:GIVEN: that the PPR's client (HELO) is whitelisted
:WHEN: evaluating an approval request
:THEN: issue a passing response
"""
caplog.set_level(logging.DEBUG)
policy = testing_policy_grl
ppr = helo_ppr_factory("mail.chapps.io", "10.10.10.10")
with monkeypatch.context() as m:
m.setattr(
policy.config,
"helo_whitelist",
{"mail.chapps.io": "10.10.10.10"},
)
response = policy._approve_policy_request(ppr)
assert response == "DUNNO" or response() == "DUNNO"
[docs] def test_first_encounter_updates_tuple(
self, caplog, monkeypatch, allowable_inbound_ppr, testing_policy_grl
):
"""
GIVEN a new tuple
WHEN executed
THEN update the tuple
"""
caplog.set_level(logging.DEBUG)
policy = testing_policy_grl
### simulate a new encounter with a new client
monkeypatch.setattr(
policy, "_get_control_data", lambda x: (None, None, None)
)
mock_update = Mock()
monkeypatch.setattr(policy, "_update_tuple", mock_update)
response = policy._approve_policy_request(allowable_inbound_ppr)
assert mock_update.called
[docs] def test_recognized_tuple_passes(
self, caplog, monkeypatch, allowable_inbound_ppr, testing_policy_grl
):
"""
GIVEN a recognized tuple - a timestamp
WHEN the tuple was seen more than min_delay seconds ago
THEN return True
"""
caplog.set_level(logging.DEBUG)
policy = testing_policy_grl
### simulate a timestamp we saw about 15 min ago
monkeypatch.setattr(
policy,
"_get_control_data",
lambda x: (1, time.time() - (60 * 15), None),
)
response = policy._approve_policy_request(allowable_inbound_ppr)
assert response
[docs] def test_recognized_tuple_updates_client_tally(
self, caplog, monkeypatch, allowable_inbound_ppr, testing_policy_grl
):
"""
GIVEN a recognized tuple - a timestamp
WHEN the tuple was seen more than min_delay seconds ago
THEN return True
"""
caplog.set_level(logging.DEBUG)
policy = testing_policy_grl
### simulate a timestamp we saw about 15 min ago
monkeypatch.setattr(
policy,
"_get_control_data",
lambda x: (1, time.time() - (60 * 15), None),
)
mock_update = Mock()
monkeypatch.setattr(policy, "_update_client_tally", mock_update)
response = policy._approve_policy_request(allowable_inbound_ppr)
assert mock_update.called
[docs] def test_sufficient_client_tally_permits_sending_for_unrecognized_tuple(
self,
caplog,
monkeypatch,
allowable_inbound_ppr,
mock_client_tally,
testing_policy_grl,
populate_redis_grl,
populated_database_fixture,
):
"""
GIVEN a new tuple from a client with a large enough tally
WHEN executed
THEN return True
"""
caplog.set_level(logging.DEBUG)
ppr = allowable_inbound_ppr
policy = testing_policy_grl
with monkeypatch.context() as m:
m.setattr(ppr, "sender", "someschmo@chapps.io")
m.setattr(ppr, "recipient", "someone@chapps.io")
populate_redis_grl(
policy.tuple_key(ppr),
{
policy.client_key(ppr): mock_client_tally(
policy.allow_after
)
},
)
# # this hotwires the data-getter, which circumvents testing of
# # the Redis-access routines which are part of the policy
# m.setattr(
# policy,
# "_get_control_data",
# lambda x: (1, None, policy.allow_after),
# )
response = policy._approve_policy_request(ppr)
assert response
[docs] def test_client_tally_updated_when_unrecognized_tuple_passes(
self,
caplog,
monkeypatch,
allowable_inbound_ppr,
mock_client_tally,
testing_policy_grl,
populate_redis_grl,
):
"""
GIVEN a new tuple from a client with a large enough tally
WHEN executed
THEN update the tally
"""
caplog.set_level(logging.DEBUG)
ppr = allowable_inbound_ppr
policy = testing_policy_grl
with monkeypatch.context() as m:
m.setattr(ppr, "sender", "someschmo@chapps.io")
populate_redis_grl(
policy.tuple_key(ppr),
{
policy.client_key(ppr): mock_client_tally(
policy.allow_after
)
},
)
tally = policy.redis.zrange(policy.client_key(ppr), 0, -1)
assert len(tally) == policy.allow_after
_ = policy._approve_policy_request(ppr)
tally = policy.redis.zrange(policy.client_key(ppr), 0, -1)
assert tally[-1].decode("utf-8") == ppr.instance
assert len(tally) == policy.allow_after + 1
[docs] def test_retry_too_soon_fails(
self, caplog, monkeypatch, allowable_inbound_ppr, testing_policy_grl
):
"""
GIVEN a recognized tuple
WHEN the tuple was seen too recently (less than min_delay seconds)
THEN return False
"""
caplog.set_level(logging.DEBUG)
policy = testing_policy_grl
### simulate a timestamp we saw about 15 min ago
monkeypatch.setattr(
policy, "_get_control_data", lambda x: (1, time.time(), None)
)
response = policy._approve_policy_request(allowable_inbound_ppr)
assert not response
[docs] def test_retry_too_soon_updates_tuple(
self, caplog, monkeypatch, allowable_inbound_ppr, testing_policy_grl
):
"""
GIVEN a recognized tuple
WHEN the tuple was seen too recently (less than min_delay seconds)
THEN return False
"""
caplog.set_level(logging.DEBUG)
policy = testing_policy_grl
### simulate a timestamp we saw about 15 min ago
monkeypatch.setattr(
policy, "_get_control_data", lambda x: (1, time.time(), None)
)
mock_update = Mock()
monkeypatch.setattr(policy, "_update_tuple", mock_update)
response = policy._approve_policy_request(allowable_inbound_ppr)
assert mock_update.called
[docs]class Test_GreylistingPolicy_update_control_data:
"""Testing control update routes _update_tuple and _update_client_tally"""
[docs] def test_update_tuple(
self, caplog, clear_redis_grl, allowable_inbound_ppr
):
"""
GIVEN a ppr
WHEN _update_tuple executed
THEN set a key with a particular structure to a current timestamp
"""
caplog.set_level(logging.DEBUG)
ppr = allowable_inbound_ppr
policy = GreylistingPolicy()
t = time.time()
policy._update_tuple(ppr)
t_stored = policy.redis.get(policy.tuple_key(ppr))
assert t_stored is not None
t_stored = float(t_stored)
assert t_stored > t and t_stored < time.time()
[docs] def test_update_client_tally(
self, caplog, clear_redis_grl, allowable_inbound_ppr
):
"""
GIVEN a ppr
WHEN _update_client_tally is executed
THEN add the tuple (map) instance -> timestamp to the client key
"""
caplog.set_level(logging.DEBUG)
ppr = allowable_inbound_ppr
policy = GreylistingPolicy()
policy._update_client_tally(ppr)
client_tally = policy.redis.zrange(policy.client_key(ppr), 0, -1)
assert client_tally[0].decode("utf-8") == ppr.instance
[docs] def test_skip_client_tally_update_if_allow_after_is_zero(
self, caplog, monkeypatch, clear_redis_grl, allowable_inbound_ppr
):
"""
GIVEN that allow_after is set to zero (we are not keeping a success tally)
WHEN _update_client_tally is executed
THEN immediately return
"""
caplog.set_level(logging.DEBUG)
ppr = allowable_inbound_ppr
policy = GreylistingPolicy(auto_allow_after=0)
mock_redis_pipeline = Mock()
with monkeypatch.context() as m:
m.setattr(policy.redis, "pipeline", mock_redis_pipeline)
_ = policy._update_client_tally(ppr)
assert not mock_redis_pipeline.called
[docs]class Test_GreylistingPolicy_get_control_data:
"""This method interfaces with Redis"""
[docs] def test_no_keys_yet_exist(
self, caplog, clear_redis_grl, allowable_inbound_ppr
):
"""
GIVEN a new tuple
WHEN control data is requested
THEN the returned tuple should have None as its first element
"""
caplog.set_level(logging.DEBUG)
policy = GreylistingPolicy()
ppr = allowable_inbound_ppr
response = policy._get_control_data(ppr)
assert response[0] is None
[docs] def test_tuple_is_recognized(
self, caplog, clear_redis_grl, allowable_inbound_ppr
):
"""
GIVEN a recognized tuple
WHEN control data is requested
THEN the tuple should have a timestamp (float) as its 2nd argument
"""
caplog.set_level(logging.DEBUG)
policy = GreylistingPolicy()
ppr = allowable_inbound_ppr
policy._update_tuple(ppr)
response = policy._get_control_data(ppr)
assert type(response[1]) == float
assert response[1] < time.time()
[docs] def test_allow_after_is_zero(
self, caplog, monkeypatch, clear_redis_grl, allowable_inbound_ppr
):
"""
GIVEN a recognized tuple, and allow_after set to 0
WHEN control data is requested
THEN the third member of the tuple should always be None
"""
caplog.set_level(logging.DEBUG)
policy = GreylistingPolicy()
ppr = allowable_inbound_ppr
policy._update_tuple(ppr)
monkeypatch.setattr(policy, "allow_after", 0)
response = policy._get_control_data(ppr)
assert response[2] == None
[docs] def test_new_tuple_client_tally_present(
self,
caplog,
monkeypatch,
mock_client_tally,
populate_redis_grl,
allowable_inbound_ppr,
unique_instance,
):
"""
GIVEN an unrecognized tuple, but existing client tally
WHEN executed
THEN the count should be returned as the third member of the tuple
"""
caplog.set_level(logging.DEBUG)
policy = GreylistingPolicy()
ppr = allowable_inbound_ppr
tally = mock_client_tally(policy.allow_after - 2)
tuple_key = ""
with monkeypatch.context() as m:
m.setattr(ppr, "sender", "someschmo@chapps.io")
tuple_key = policy.tuple_key(ppr)
client_key = policy.client_key(ppr)
populate_redis_grl(tuple_key, {client_key: tally})
response = policy._get_control_data(ppr)
assert response[1] is None
assert response[2] == len(tally)
[docs] def test_tuple_recognized_and_tally_exists(
self,
caplog,
mock_client_tally,
allowable_inbound_ppr,
populate_redis_grl,
unique_instance,
):
"""
:GIVEN: a recognized tuple, and an existing tally
:WHEN: executed
:THEN: return the option flag,
a timestamp,
and the client's tally in that order
"""
caplog.set_level(logging.DEBUG)
policy = GreylistingPolicy()
ppr = allowable_inbound_ppr
tally = mock_client_tally(policy.allow_after - 2)
tuple_key = policy.tuple_key(ppr)
client_key = policy.client_key(ppr)
populate_redis_grl(tuple_key, {client_key: tally})
response = policy._get_control_data(ppr)
assert type(response[1]) == float
assert response[1] < time.time()
assert response[2] == len(tally)
[docs]class Test_OutboundQuotaPolicy:
"""Tests of the outbound quota policy module"""
[docs] def test_oqp_fmtkey(self):
"""
GIVEN: email (user), and parameter name
WHEN: oqp is asked for a Redis key
THEN: oqp._fmtkey(user, param) should return a string like 'oqp:<user>:<param>'
"""
policy = OutboundQuotaPolicy()
redis_key = policy._fmtkey("ccullen@easydns.com", "attempts")
assert redis_key == "oqp:ccullen@easydns.com:attempts"
[docs] def test_approve_policy_request(
self, caplog, allowable_ppr, well_spaced_attempts, populate_redis
):
"""
Verify that underquota users' emails are approved.
"""
caplog.set_level(logging.DEBUG)
populate_redis(allowable_ppr.user, 100, well_spaced_attempts(80))
policy = OutboundQuotaPolicy()
assert policy.approve_policy_request(allowable_ppr)
[docs] def test_approve_last_remaining_quota(
self, caplog, allowable_ppr, well_spaced_attempts, populate_redis
):
"""
Verify that the last bit of a quota can be used.
"""
caplog.set_level(logging.DEBUG)
populate_redis(allowable_ppr.user, 100, well_spaced_attempts(99))
policy = OutboundQuotaPolicy()
assert policy.approve_policy_request(allowable_ppr)
[docs] def test_approve_last_with_multiple_recipients(
self, caplog, groupsend_ppr, well_spaced_attempts, populate_redis
):
"""
Verify that multiple recipients adding up to the last available messages will be approved.
"""
caplog.set_level(logging.DEBUG)
recipient_count = len(groupsend_ppr.recipient.split(","))
populate_redis(
groupsend_ppr.user,
100,
well_spaced_attempts(100 - recipient_count),
)
policy = OutboundQuotaPolicy()
assert policy.approve_policy_request(groupsend_ppr)
[docs] def test_approve_underquota_within_margin(
self,
caplog,
multisend_ppr_factory,
well_spaced_attempts,
populate_redis,
):
"""
Verify that when a multi-recipient email takes an underquota account overquota, it will be
approved if the amount overquota is within the established margin.
"""
caplog.set_level(logging.DEBUG)
# create a PPR reflecting 8 recipients
groupsend_ppr = multisend_ppr_factory("underquota@chapps.io", 8)
# for the sender, set a limit of 100, a margin of 10, and 95 well-spaced send attempts
populate_redis(groupsend_ppr.user, 100, well_spaced_attempts(95), 10)
policy = OutboundQuotaPolicy()
# even though this would be 3 emails overquota, it should be allowed anyway, by the margin
assert policy.approve_policy_request(groupsend_ppr)
assert any("OK" in rec.message for rec in caplog.records)
[docs] def test_deny_policy_request(
self, overquota_ppr, well_spaced_attempts, populate_redis
):
"""
Verify that overquota users are rejected.
"""
populate_redis(overquota_ppr.user, 100, well_spaced_attempts(150))
policy = OutboundQuotaPolicy()
assert not policy.approve_policy_request(overquota_ppr)
[docs] def test_deny_when_too_many_recipients(
self, multisend_ppr_factory, well_spaced_attempts, populate_redis
):
""":GIVEN: a multi-recipient PPR
:WHEN: the recipient list would go over the quota
:THEN: the attempt is denied.
This has the side-effect of meaning that rejected multi-recipient
attempts add their recipient count to the attempt history, meaning that
the account will be fully over-quota once this occurs.
"""
groupsend_ppr = multisend_ppr_factory("overquota@chapps.io", 20)
populate_redis(groupsend_ppr.user, 100, well_spaced_attempts(95), 10)
policy = OutboundQuotaPolicy()
assert not policy.approve_policy_request(groupsend_ppr)
[docs] def test_deny_overquota_within_margin(
self, caplog, groupsend_ppr, well_spaced_attempts, populate_redis
):
"""
Verify that an account which is just over-quota will not have a new email which is within
the margin approved. This is to ensure that the quota is real, and not just enlarged by
the margin.
"""
caplog.set_level(logging.DEBUG)
populate_redis(groupsend_ppr.user, 100, well_spaced_attempts(101), 10)
policy = OutboundQuotaPolicy()
assert not policy.approve_policy_request(groupsend_ppr)
assert any(
"too many attempts" in rec.message for rec in caplog.records
)
[docs] @pytest.mark.xfail # this feature is on hold at present
def test_deny_rapid_attempts(
self, allowable_ppr, rapid_attempts, populate_redis
):
"""
Verify that attempts which come too fast will be rejected.
"""
populate_redis(allowable_ppr.user, 200, rapid_attempts(20))
policy = OutboundQuotaPolicy()
assert not policy.approve_policy_request(allowable_ppr)
[docs] def test_return_cached_instance_approval(
self,
allowable_ppr,
well_spaced_double_attempts,
populate_redis,
caplog,
):
"""
GIVEN we have already seen a particular instance before,
WHEN we are asked to approve or deny it
THEN we will return the cached value of the instance (which really only matters on approval)
"""
populate_redis(
allowable_ppr.user, 200, well_spaced_double_attempts(100)
)
policy = OutboundQuotaPolicy()
policy.instance_cache[allowable_ppr.instance] = False
### need a patched policy which has the instance in its instance cache
response = policy.approve_policy_request(allowable_ppr)
for m in caplog.messages:
print(m)
assert response == False
[docs] def test_approve_policy_request_for_uncached(
self,
uncached_allowable_ppr,
well_spaced_attempts,
populated_database_fixture,
testing_policy,
):
"""
Verify that existing-but-uncached users' emails are approved.
(Uncached implies they have sent no emails w/i the rolling interval.)
"""
assert testing_policy.approve_policy_request(uncached_allowable_ppr)
[docs] def test_deny_policy_request_for_undefined(
self, undefined_ppr, populated_database_fixture, testing_policy
):
"""
Verify that undefined users are rejected.
"""
assert not testing_policy.approve_policy_request(undefined_ppr)
[docs] def test_current_quota(
self,
sda_allowable_ppr,
populate_redis,
well_spaced_attempts,
populated_database_fixture,
testing_policy,
):
ppr = sda_allowable_ppr
attempts = well_spaced_attempts(100)
populate_redis(ppr.user, 240, attempts)
remaining, remarks = testing_policy.current_quota(
ppr.user, Quota(id=1, name="10eph", quota=240)
)
assert remaining == 140
last_try = time.strftime(TIME_FORMAT, time.gmtime(attempts[-1]))
assert f"Last send attempt was at {last_try}" in remarks
auto_ppr_param_list = _auto_ppr_param_list(
senders=[
"ccullen@easydns.com",
"mautic+bounce_622b80da90870@easydns.com",
"weirdo@twodomains.ca@weird.com",
"bareword",
]
)
[docs]class Test_SenderDomainAuthPolicy:
[docs] def test_sda_fmtkey(self):
policy = SenderDomainAuthPolicy()
redis_key = policy._fmtkey("ccullen@easydns.com", "chapps.io")
assert redis_key == "sda:ccullen@easydns.com:chapps.io"
[docs] @pytest.mark.parametrize(
"auto_ppr, expected_result", auto_ppr_param_list, ids=idfn
)
def test_get_sender_domain(self, auto_ppr, expected_result):
policy = SenderDomainAuthPolicy()
if isclass(expected_result) and issubclass(expected_result, Exception):
with pytest.raises(expected_result):
assert policy._get_sender_domain(auto_ppr)
else:
result = policy._get_sender_domain(auto_ppr)
assert result == expected_result
[docs] def test_sender_domain_key(self, allowable_ppr):
policy = SenderDomainAuthPolicy()
redis_key = policy.sender_domain_key(allowable_ppr)
assert (
redis_key
== f"sda:{allowable_ppr.sasl_username}:{allowable_ppr.sender.split('@')[1]}"
)
[docs] def test_authorized_user(self, sda_allowable_ppr, testing_policy_sda):
result = testing_policy_sda.approve_policy_request(sda_allowable_ppr)
assert result == True
[docs] def test_whole_email_auth(self, sda_auth_email_ppr, testing_policy_sda):
result = testing_policy_sda.approve_policy_request(sda_auth_email_ppr)
assert result == True
[docs] def test_whole_email_unauth(
self, sda_unauth_email_ppr, testing_policy_sda
):
result = testing_policy_sda.approve_policy_request(
sda_unauth_email_ppr
)
assert result == False
### note that no clearing of Redis is going on
[docs] def test_cached_authed_user(
self, monkeypatch, sda_allowable_ppr, testing_policy_sda
):
mock_acq_pol_data = Mock(result=False)
with monkeypatch.context() as m:
m.setattr(
testing_policy_sda, "acquire_policy_for", mock_acq_pol_data
)
result = testing_policy_sda.approve_policy_request(
sda_allowable_ppr
)
mock_acq_pol_data.assert_not_called()
assert result == True
[docs] def test_cached_whole_email(
self, monkeypatch, sda_auth_email_ppr, testing_policy_sda
):
mock_acq_pol_data = Mock(result=True)
with monkeypatch.context() as m:
m.setattr(
testing_policy_sda, "acquire_policy_for", mock_acq_pol_data
)
result = testing_policy_sda.approve_policy_request(
sda_auth_email_ppr
)
mock_acq_pol_data.assert_not_called()
assert result == True
[docs] def test_unauthorized_user(
self, sda_unauth_ppr, testing_policy_sda, clear_redis_sda
):
result = testing_policy_sda.approve_policy_request(sda_unauth_ppr)
assert result == False
### For completeness we will test both
[docs] def test_cached_unauth_user(
self, monkeypatch, sda_unauth_ppr, testing_policy_sda
):
mock_acq_pol_data = Mock(result=True)
with monkeypatch.context() as m:
m.setattr(
testing_policy_sda, "acquire_policy_for", mock_acq_pol_data
)
result = testing_policy_sda.approve_policy_request(sda_unauth_ppr)
mock_acq_pol_data.assert_not_called()
[docs]class Test_InboundPolicy:
[docs] def test_whitelist(self, testing_policy_inbound, helo_ppr_factory):
"""
:GIVEN: a helo whitelist and an inbound policy
:WHEN: the PPR reflects a connection from the whitelisted server
:THEN: the policy's _whitelisted() method should return True
"""
policy = testing_policy_inbound
ppr = helo_ppr_factory("mail.chapps.io", "10.10.10.10")
assert ppr.helo_match({"mail.chapps.io": "10.10.10.10"})
[docs] def test_not_whitelisted(self, testing_policy_inbound, helo_ppr_factory):
"""
:GIVEN: a helo whitelist and an inbound policy
:WHEN: the PPR reflects a connection from an unlisted server
:THEN: the policy's _whitelisted() method should return False
"""
policy = testing_policy_inbound
ppr = helo_ppr_factory("spammity.spam.com", "1.2.3.4")
assert not ppr.helo_match({"mail.chapps.io": "10.10.10.10"})