Source code for chapps.tests.test_policy.conftest

"""Fixtures for testing of CHAPPS policy module"""
import pytest
from pytest import fixture
from unittest.mock import Mock
from chapps.signals import TooManyAtsException, NotAnEmailAddressException
from chapps.tests.test_config.conftest import (
    chapps_mock_config,
    chapps_mock_env,
    chapps_mock_cfg_path,
    chapps_mock_config_file,
)
from chapps.tests.test_util.conftest import (
    postfix_policy_request_message,
    postfix_policy_request_payload,
    _postfix_policy_request_message,
)
from chapps.tests.test_adapter.conftest import (
    base_adapter_fixture,
    finalizing_pcadapter,
    database_fixture,
    populated_database_fixture,
    populated_database_fixture_with_extras,
)
from chapps.tests.conftest import (
    _unique_instance,
    _mock_client_tally,
    _redis_handle,
    _clear_redis,
    _populate_redis_grl,
)
import redis
import time
import random
import string
from chapps.config import CHAPPSConfig
from chapps.policy import (
    InboundPolicy,
    OutboundQuotaPolicy,
    GreylistingPolicy,
    SenderDomainAuthPolicy,
    PostfixActions,
    PostfixGRLActions,
    PostfixOQPActions,
    PostfixPassfailActions,
)
from chapps.spf_policy import SPFEnforcementPolicy, PostfixSPFActions
from chapps.util import PostfixPolicyRequest
from chapps.inbound import InboundPPR
from chapps.outbound import OutboundPPR
from chapps.signals import NullSenderException

seconds_per_day = 3600 * 24


[docs]def testing_policy_factory(policy_type): newconfig = CHAPPSConfig() return policy_type(newconfig)
[docs]@fixture def testing_policy(chapps_mock_env, chapps_mock_config_file): return testing_policy_factory(OutboundQuotaPolicy)
[docs]@fixture def testing_policy_sda(chapps_mock_env, chapps_mock_config_file): return testing_policy_factory(SenderDomainAuthPolicy)
[docs]@fixture def null_sender_policy_sda( chapps_mock_env, chapps_mock_config_file, monkeypatch ): newconfig = CHAPPSConfig() policy = SenderDomainAuthPolicy(newconfig) apr = Mock(name="approve_policy_request", side_effect=NullSenderException) monkeypatch.setattr(policy, "approve_policy_request", apr) return policy
[docs]@fixture def testing_policy_inbound(chapps_mock_env, chapps_mock_config_file): return testing_policy_factory(InboundPolicy)
[docs]@fixture def testing_policy_grl(chapps_mock_env, chapps_mock_config_file): return testing_policy_factory(GreylistingPolicy)
[docs]@fixture def testing_policy_spf(chapps_mock_env, chapps_mock_config_file): return testing_policy_factory(SPFEnforcementPolicy)
[docs]@fixture def allowable_ppr(postfix_policy_request_message): return OutboundPPR(postfix_policy_request_message("underquota@chapps.io"))
[docs]@fixture def allowable_inbound_ppr(postfix_policy_request_message): return InboundPPR( postfix_policy_request_message( "underquota@chapps.io", ["somebody@chapps.io"] ) )
[docs]@fixture def sda_allowable_ppr(postfix_policy_request_message): return OutboundPPR( postfix_policy_request_message( "caleb@chapps.io", None, sasl_username="ccullen@easydns.com", ccert_subject="", instance="sda_allowable_ppr", ) )
[docs]@fixture def sda_auth_email_ppr(postfix_policy_request_message): return OutboundPPR( postfix_policy_request_message( "caleb@chapps.com", None, sasl_username="ccullen@easydns.com", ccert_subject="", instance="sda_whole_email_ppr", ) )
[docs]@fixture def sda_unauth_email_ppr(postfix_policy_request_message): return OutboundPPR( postfix_policy_request_message( "ccullen@chapps.com", None, sasl_username="ccullen@easydns.com", ccert_subject="", instance="sda_unauth_email_ppr", ) )
[docs]@fixture def sda_unauth_ppr(postfix_policy_request_message): return OutboundPPR( postfix_policy_request_message( "ccullen@easydns.com", None, sasl_username="somebody@chapps.io", ccert_subject="", instance="sda_unauth_ppr", ) )
[docs]@fixture def groupsend_ppr(postfix_policy_request_message): return OutboundPPR( postfix_policy_request_message( "underquota@chapps.io", [ "one@recipient.com", "two@recipient.com", "three@recipient.com", "four@recipient.com", "five@recipient.com", ], ) )
[docs]@fixture def helo_ppr_factory(postfix_policy_request_message): def _ppr_factory(helo_name, source_ip): return InboundPPR( postfix_policy_request_message( None, None, None, helo_name=helo_name, client_address=source_ip, ) ) return _ppr_factory
[docs]@fixture def multisend_ppr_factory(random_recipient, postfix_policy_request_message): def _ppr_factory(sender, recipient_count): return OutboundPPR( postfix_policy_request_message( sender, [random_recipient for _ in range(recipient_count)] ) ) return _ppr_factory
[docs]@fixture def random_recipient(): return ( "".join(random.choice(string.ascii_letters) for _ in range(8)) + "@recipient.com" )
[docs]@fixture def overquota_ppr(postfix_policy_request_message): return OutboundPPR(postfix_policy_request_message("overquota@chapps.io"))
[docs]@fixture def undefined_ppr(postfix_policy_request_message): return OutboundPPR(postfix_policy_request_message("nonexistent@chapps.io"))
[docs]@fixture def uncached_allowable_ppr(postfix_policy_request_message): return OutboundPPR(postfix_policy_request_message("ccullen@easydns.com "))
[docs]@fixture def well_spaced_attempts(): seconds_per_day = float(3600 * 24) def _wsa(number): delta = int(seconds_per_day / float(number - 1)) t0 = time.time() - seconds_per_day return [ t0 + float(t) + (float(delta) * random.random()) for t in range(0, int(seconds_per_day), delta) ] return _wsa
[docs]@fixture def well_spaced_double_attempts(well_spaced_attempts): wsa = well_spaced_attempts def _wsda(number): array = wsa(number - 1) array.append(time.time()) return array return _wsda
[docs]@fixture def rapid_attempts(): def _ra(number): t0 = int(time.time()) - number return [t0 + t for t in range(0, number)] return _ra
[docs]@fixture def populate_redis(clear_redis): fmtkey = OutboundQuotaPolicy._fmtkey def _popredis(email, limit, timestamps=[], margin=0): rh = _redis_handle() with rh.pipeline() as pipe: pipe.delete( fmtkey(email, "limit"), fmtkey(email, "attempts"), fmtkey(email, "margin"), ) pipe.set(fmtkey(email, "limit"), limit) pipe.set(fmtkey(email, "margin"), margin) pipe.zadd(fmtkey(email, "attempts"), {t: t for t in timestamps}) pipe.execute() yield _popredis rh = _redis_handle() keys = rh.keys("oqp:*") if len(keys) > 0: rh.delete(*keys)
[docs]@fixture def populate_redis_multi(clear_redis): fmtkey = OutboundQuotaPolicy._fmtkey def _popredis(email, limit, timestamps=[], margin=0): rh = _redis_handle() with rh.pipeline() as pipe: pipe.delete( fmtkey(email, "limit"), fmtkey(email, "attempts"), fmtkey(email, "margin"), ) pipe.set(fmtkey(email, "limit"), limit) pipe.set(fmtkey(email, "margin"), margin) pipe.zadd( fmtkey(email, "attempts"), {str(t) + ":00001": t for t in timestamps}, ) pipe.execute() yield _popredis rh = _redis_handle() keys = rh.keys("oqp:*") if len(keys) > 0: rh.delete(*keys)
[docs]@fixture def clear_redis(): return _clear_redis("oqp ")
[docs]@fixture def clear_redis_grl(): return _clear_redis("grl")
[docs]@fixture def clear_redis_sda(): return _clear_redis("sda")
[docs]@fixture def populate_redis_grl(clear_redis_grl): yield _populate_redis_grl clear_redis_grl()
[docs]@fixture def unique_instance(): return _unique_instance()
[docs]@fixture def mock_client_tally(unique_instance): return _mock_client_tally(unique_instance)
def _mock_spf_query(result, message): mock = Mock(name="spf_query") mock.check = Mock(name="check", return_value=(result, None, message)) mock.get_header = Mock(name="get_header", return_value="SPF prepend") return mock
[docs]def mock_spf_query(result, message): return Mock(name="query", return_value=_mock_spf_query(result, message))
[docs]def mock_spf_queries(*tuples): results = [_mock_spf_query(res, msg) for res, msg in tuples] return Mock(name="query", side_effect=results)
[docs]@fixture def passing_spf_query(): return mock_spf_query("pass", "CHAPPS passing SPF message")
[docs]@fixture def failing_spf_query(): return mock_spf_query("fail", "CHAPPS failing SPF message")
[docs]@fixture def temperror_spf_query(): return mock_spf_query("temperror", "CHAPPS temperror SPF message")
[docs]@fixture def permerror_spf_query(): return mock_spf_query("permerror", "CHAPPS permerror SPF message")
[docs]@fixture def none_spf_query(): return mock_spf_query("none", "")
[docs]@fixture def neutral_spf_query(): return mock_spf_query("neutral", "CHAPPS neutral SPF message")
[docs]@fixture def softfail_spf_query(): return mock_spf_query("softfail", "CHAPPS softfail SPF message")
[docs]@fixture(scope="function") def auto_spf_query(request): return mock_spf_queries(*(request.param))
[docs]@fixture def no_helo_passing_mf(): return mock_spf_queries( ("none", ""), ("pass", "CHAPPS passing SPF message") )
[docs]@fixture def failing_helo_passing_mf(): return mock_spf_queries( ("fail", "CHAPPS failing SPF message"), ("pass", "CHAPPS passing SPF message"), )
[docs]@fixture def passing_helo_failing_mf(): return mock_spf_queries( ("pass", "CHAPPS passing SPF message"), ("fail", "CHAPPS failing SPF message"), )
[docs]@fixture def no_helo_softfail_mf(): return mock_spf_queries( ("none", ""), ("softfail", "CHAPPS softfail SPF message") )
### Definitions for parameterization of SPF testing
[docs]def idfn(val): if type(val) == tuple: return f"{val[0][0]}-{val[1][0]}" if type(val) == PostfixPolicyRequest or type(val) == OutboundPPR: return f"{val.sender}"
def _spf_results(): return dict( passing=("pass", "CHAPPS passing SPF message"), fail=("fail", "CHAPPS failing SPF message"), softfail=("softfail", "CHAPPS softfail SPF message"), permerror=("permerror", "CHAPPS permerror SPF message"), temperror=("temperror", "CHAPPS temperror SPF message"), neutral=("neutral", "CHAPPS neutral SPF message"), none=("none", ""), ) def _spf_actions(): return dict( passing="PREPEND Received-SPF: SPF prepend", fail="550 5.7.1 SPF check failed: CHAPPS failing SPF message", softfail="DEFER_IF_PERMIT Service temporarily stupid CHAPPS softfail SPF message", none="DEFER_IF_PERMIT Service temporarily stupid due to SPF enforcement policy", neutral="DEFER_IF_PERMIT Service temporarily stupid CHAPPS neutral SPF message", permerror="550 5.5.2 SPF record(s) are malformed: CHAPPS permerror SPF message", temperror="451 4.4.3 SPF record(s) temporarily unavailable: CHAPPS temperror SPF message", ) def _spf_plus_greylist_actions(): return dict( passing="DEFER_IF_PERMIT Service temporarily stupid", fail="550 5.7.1 SPF check failed: CHAPPS failing SPF message", softfail="DEFER_IF_PERMIT Service temporarily stupid CHAPPS softfail SPF message", none="DEFER_IF_PERMIT Service temporarily stupid due to SPF enforcement policy", neutral="DEFER_IF_PERMIT Service temporarily stupid CHAPPS neutral SPF message", permerror="550 5.5.2 SPF record(s) are malformed: CHAPPS permerror SPF message", temperror="451 4.4.3 SPF record(s) temporarily unavailable: CHAPPS temperror SPF message", ) def __auto_queries(helo_list, actions_f, results_f=_spf_results): result = [] spf_actions = actions_f() spf_results = results_f() [ # list comps are faster than for loops result.extend( [ ( (first, second), spf_actions[ inner_key if outer_key not in helo_list else outer_key ], ) for inner_key, second in spf_results.items() ] ) for outer_key, first in spf_results.items() ] return result def _auto_query_param_list(helo_list=["fail"]): """Constructs a map for parameterized testing via pytest The map is a list of tuples. Each tuple's first element is a tuple of (helo_result, mf_result). The second element is the action indicated by the `spf_actions` dict. Right now `spf_actions` (not quite a fixture) is created from constants, which are the same as the mock configuration. If the mock config changes, some of these tests may start to fail. """ return __auto_queries(helo_list, _spf_actions) def _auto_query_param_list_spf_plus_greylist(helo_list=["fail"]): return __auto_queries(helo_list, _spf_plus_greylist_actions) def _auto_ppr_param_list(*, senders=["ccullen@easydns.com"]): """ Return tuples of sender and expected result, generally the sender domain """ def count_ats(s): """Count the @s in a string""" return 0 if len(s) == 0 else len([c for c in s if c == "@"]) ui = _unique_instance("deafbeef") # returns a callable def ppr_for(s): pprm = _postfix_policy_request_message() # returns a callable return PostfixPolicyRequest(pprm(s, instance=ui())) params = [] for s in senders: ats = count_ats(s) if ats == 1: params.append((ppr_for(s), s[s.index("@") + 1 :])) elif ats > 1: params.append((ppr_for(s), TooManyAtsException)) else: params.append((ppr_for(s), NotAnEmailAddressException)) return params # ## Actions stuff
[docs]@fixture def postfix_actions(): return PostfixActions()
[docs]@fixture def oqp_actions(): return PostfixOQPActions()
[docs]@fixture def grl_actions(): return PostfixGRLActions()
[docs]@fixture def spf_actions(testing_policy_spf): return PostfixSPFActions(testing_policy_spf)
[docs]@fixture def spf_reason(): return "mock SPF explanation"