Source code for chapps.tests.test_switchboard.test_switchboard

"""tests for chapps.switchboard"""

import pytest
from unittest.mock import Mock
import logging
from unittest.mock import patch
from chapps.switchboard import (
    RequestHandler,
    OutboundQuotaHandler,
    GreylistingHandler,
    SenderDomainAuthHandler,
    OutboundMultipolicyHandler,
)
from chapps.policy import OutboundQuotaPolicy
from chapps.signals import CallableExhausted
from chapps.outbound import OutboundPPR
from chapps.tests.test_adapter.conftest import (
    base_adapter_fixture,
    finalizing_pcadapter,
    database_fixture,
    populated_database_fixture,
)
from chapps.tests.test_policy.conftest import (
    clear_redis,
    clear_redis_grl,
    clear_redis_sda,
    testing_policy_sda,
    null_sender_policy_sda,
)

pytestmark = pytest.mark.order(-2)


[docs]@pytest.mark.asyncio class Test_OutboundQuotaHandler: """Tests for the OQP switchboard"""
[docs] async def test_exception_handling( self, caplog, clear_redis, testing_policy, mock_reader_ok, mock_exc_raising_writer, populated_database_fixture, ): """ Verify that if an exception is raised, it will be handled by logging """ caplog.set_level(logging.DEBUG) handle_policy_request = OutboundQuotaHandler( testing_policy ).async_policy_handler() _ = await handle_policy_request( mock_reader_ok, mock_exc_raising_writer ) assert any( "Exception raised trying to send" in rec.message for rec in caplog.records )
[docs] async def test_handle_policy_request( self, clear_redis, testing_policy, mock_reader_ok, mock_writer, populated_database_fixture, ): """ Verify that when a permissible request is received, it gets an OK. """ handle_policy_request = OutboundQuotaHandler( testing_policy ).async_policy_handler() with pytest.raises(CallableExhausted): _ = await handle_policy_request(mock_reader_ok, mock_writer) mock_reader_ok.readuntil.assert_called_with(b"\n\n") mock_writer.write.assert_called_with(b"action=DUNNO\n\n")
[docs] async def test_handle_policy_rejection( self, caplog, testing_policy, mock_reader_rej, mock_writer ): """ Verify that over-quota senders' requests are denied """ caplog.set_level(logging.DEBUG) handle_policy_request = OutboundQuotaHandler( testing_policy ).async_policy_handler() with pytest.raises(CallableExhausted): _ = await handle_policy_request(mock_reader_rej, mock_writer) assert mock_reader_rej.readuntil.call_args.args[0] == b"\n\n" # mock_writer.write.assert_called_with( b'554 Rejected - outbound quota fulfilled\n\n' ) assert ( mock_writer.write.call_args.args[0][0:11] == b"action=554 " or mock_writer.write.call_args.args[0][0:14] == b"action=REJECT " ) and mock_writer.write.call_args.args[0][-2:] == b"\n\n"
[docs] async def test_default_policy(self, mock_reader_ok, mock_writer): """ Verify that when no policy is supplied, a default policy is instantiated """ handler = OutboundQuotaHandler() assert handler.policy is not None assert type(handler.policy) == OutboundQuotaPolicy
[docs]@pytest.mark.asyncio class Test_GreylistingHandler: """Tests of the Greylisting switchboard"""
[docs] async def test_handle_new_tuple( self, clear_redis_grl, testing_policy_grl, grl_reader_recognized, populated_database_fixture, mock_writer, ): """ GIVEN an email attempt from a new tuple WHEN the client isn't auto-allowed THEN reject the email """ clear_redis_grl() handle_greylist_request = GreylistingHandler( testing_policy_grl ).async_policy_handler() reader_grl = grl_reader_recognized with pytest.raises(CallableExhausted): await handle_greylist_request(reader_grl, mock_writer) reader_grl.readuntil.assert_called_with(b"\n\n") mock_writer.write.assert_called_with( b"action=DEFER_IF_PERMIT Service temporarily stupid\n\n" )
[docs] async def test_handle_retry_too_fast( self, clear_redis_grl, testing_policy_grl, grl_reader_too_fast, mock_writer, ): """ GIVEN two back-to-back attempts with the same tuple WHEN the two attempts are two close together THEN reject the email """ handle_greylist_request = GreylistingHandler( testing_policy_grl ).async_policy_handler() grl_reader = grl_reader_too_fast with pytest.raises(CallableExhausted): await handle_greylist_request(grl_reader, mock_writer) grl_reader.readuntil.assert_called_with(b"\n\n") mock_writer.write.assert_called_with( b"action=DEFER_IF_PERMIT Service temporarily stupid\n\n" )
[docs] async def test_handle_recognized_tuple( self, clear_redis_grl, testing_policy_grl, grl_reader_recognized, mock_writer, ): """ GIVEN an email delivery attempt WHEN the tuple is recognized THEN return DUNNO to allow other filters to block it; it will be accepted by default """ handle_greylist_request = GreylistingHandler( testing_policy_grl ).async_policy_handler() grl_reader = grl_reader_recognized with pytest.raises(CallableExhausted): await handle_greylist_request(grl_reader, mock_writer) grl_reader.readuntil.assert_called_with(b"\n\n") mock_writer.write.assert_called_with(b"action=DUNNO\n\n")
[docs] async def test_handle_allowed_client( self, clear_redis_grl, testing_policy_grl, grl_reader_with_tally, mock_writer, ): """ GIVEN an email delivery attempt WHEN the client is recognized as a reliable sender THEN return DUNNO to allow other filters to block it; it will be accepted by default """ handle_greylist_request = GreylistingHandler( testing_policy_grl ).async_policy_handler() grl_reader = grl_reader_with_tally with pytest.raises(CallableExhausted): await handle_greylist_request(grl_reader, mock_writer) grl_reader.readuntil.assert_called_with(b"\n\n") mock_writer.write.assert_called_with(b"action=DUNNO\n\n")
[docs]@pytest.mark.asyncio class Test_SenderDomainAuthHandler: """Tests of the SenderDomainAuth handler"""
[docs] async def test_handle_authorized_user( self, clear_redis_sda, testing_policy_sda, mock_reader_sda_auth, mock_writer, ): """ GIVEN an email attempt from an authorized user WHEN asked for a response THEN return the acceptance message """ handle_sda_request = SenderDomainAuthHandler( testing_policy_sda ).async_policy_handler() with pytest.raises(CallableExhausted): await handle_sda_request(mock_reader_sda_auth, mock_writer) mock_reader_sda_auth.readuntil.assert_called_with(b"\n\n") mock_writer.write.assert_called_with(b"action=DUNNO\n\n")
[docs] async def test_handle_unauth_user( self, clear_redis_sda, testing_policy_sda, mock_reader_sda_unauth, mock_writer, ): """ GIVEN an email attempt from an UNauthorized user WHEN asked for a response THEN return the rejection message """ handle_sda_request = SenderDomainAuthHandler( testing_policy_sda ).async_policy_handler() with pytest.raises(CallableExhausted): await handle_sda_request(mock_reader_sda_unauth, mock_writer) mock_reader_sda_unauth.readuntil.assert_called_with(b"\n\n") mock_writer.write.assert_called_with( b"action=REJECT Rejected - not allowed to send mail from this domain\n\n" )
[docs]@pytest.mark.asyncio class Test_OutboundMultipolicyHandler:
[docs] async def test_authed_user_gets_quota_check( self, monkeypatch, clear_redis, clear_redis_sda, testing_policy, testing_policy_sda, mock_reader_sda_auth, mock_writer, populated_database_fixture, database_fixture, finalizing_pcadapter, base_adapter_fixture, ): """ Verify that a successful SDA test cascades to a quota test """ with pytest.raises(CallableExhausted): with monkeypatch.context() as m: mock_apr = Mock(return_value=True) m.setattr(testing_policy, "approve_policy_request", mock_apr) handler = OutboundMultipolicyHandler( [testing_policy_sda, testing_policy] ) handle_policy_request = handler.async_policy_handler() await handle_policy_request(mock_reader_sda_auth, mock_writer) mock_apr.assert_called_once()
[docs] async def test_missing_user_key( self, clear_redis, clear_redis_sda, null_user_oq_policy, null_user_sda_policy, mock_reader_factory, mock_writer, populated_database_fixture, ): """ :GIVEN: that we require a user-key :WHEN: a PPR is evaluated which has no value for the user-key :THEN: the email is rejected and the `no_user_key_response` is returned """ OutboundPPR.clear_memoized_routines() no_sasl_reader = mock_reader_factory( sasl_username="", ccert_subject="" ) handler = OutboundMultipolicyHandler( [null_user_sda_policy, null_user_oq_policy] ) handle_policy_request = handler.async_policy_handler() with pytest.raises(CallableExhausted): # with monkeypatch.context() as m: # mock_apr = Mock(return_value=True) # m.setattr(testing_policy, "approve_policy_request", mock_apr) await handle_policy_request(no_sasl_reader, mock_writer) # meaning the AuthenticationFailureException was handled mock_writer.write.assert_called_with( bytes( "action=" + handler.config.chapps.no_user_key_response + "\n\n", handler.config.chapps.payload_encoding, ) )
[docs] async def test_handle_null_sender( self, null_sender_policy_sda, testing_policy, mock_reader_sda_auth, mock_writer, ): handler = OutboundMultipolicyHandler( [null_sender_policy_sda, testing_policy] ) handle_policy_request = handler.async_policy_handler() with pytest.raises(CallableExhausted): await handle_policy_request(mock_reader_sda_auth, mock_writer) mock_writer.write.assert_called_once() # if it was called, we handled the exception raised by null_sender_policy_sda
[docs] async def test_reject_null_sender( self, null_sender_policy_sda, testing_policy, mock_reader_sda_auth, mock_writer, ): handler = OutboundMultipolicyHandler( [null_sender_policy_sda, testing_policy] ) handle_policy_request = handler.async_policy_handler() with pytest.raises(CallableExhausted): await handle_policy_request(mock_reader_sda_auth, mock_writer) mock_writer.write.assert_called_with( b"action=REJECT Rejected - not allowed to send mail from this domain\n\n" ) # the SDA rejection message b/c using SDA policy
[docs] async def test_unauthed_user_gets_rejected( self, monkeypatch, clear_redis, clear_redis_sda, testing_policy, testing_policy_sda, mock_reader_sda_unauth, mock_writer, populated_database_fixture, database_fixture, finalizing_pcadapter, base_adapter_fixture, ): """ Verify that unsuccessful SDA tests prevent quota testing, and immediately reject the email """ with pytest.raises(CallableExhausted): with monkeypatch.context() as m: mock_apr = Mock(return_value=True) m.setattr(testing_policy, "approve_policy_request", mock_apr) handler = OutboundMultipolicyHandler( [testing_policy_sda, testing_policy] ) handle_policy_request = handler.async_policy_handler() await handle_policy_request( mock_reader_sda_unauth, mock_writer ) mock_apr.assert_not_called()