"""
Communication handlers
----------------------
This module encapsulates the particular logic of:
1. receiving data payloads from Postfix, and then
2. sending back an appropriately-formatted response once the policy manager
has had a chance to weigh in on the payload contents
Classes defined here exist mainly to be factories which return the main-loop
closure for :mod:`asyncio`.
TODO: :class:`.RequestHandler` should be a subclass of
`.CascadingPolicyHandler` which simply only ever has one policy within it.
This is to avoid maintaining two nearly-identical code-paths. Running only one
policy is obviously a special case of running many.
"""
# from chapps.config import config # the global instance of the config object
from chapps.policy import (
EmailPolicy,
OutboundQuotaPolicy,
GreylistingPolicy,
SenderDomainAuthPolicy,
)
from chapps.util import PostfixPolicyRequest
from chapps.inbound import InboundPPR
from chapps.outbound import OutboundPPR
from chapps.signals import (
CHAPPSException,
CallableExhausted,
NullSenderException,
AuthenticationFailureException,
)
from functools import cached_property
from typing import Type, Optional, List
import logging
import asyncio
try:
from chapps.spf_policy import SPFEnforcementPolicy
except Exception:
HAVE_SPF = False
pass
else:
HAVE_SPF = True
logger = logging.getLogger(__name__) # pragma: no cover
[docs]class CascadingPolicyHandler:
"""Second-generation handler class which cascades multiple yes/no policies
This class started out nearly identical to :class:`.RequestHandler`, but as
the software has moved on, so has this handler, which is the main one in
general use.
This handler accepts a list of policy manager instances, all of which
should produce True/False results. The handler applies each policy to each
request, and passes those which pass both, or returns the message from the
policy which failed. Once a policy has failed, no further policies are
consulted.
Instance attributes:
:policies: a list of :class:`~chapps.policy.EmailPolicy` objects
:pprclass: the class of :class:`~PostfixPolicyRequest` to instantiate
from the Postfix request payload
:config: a reference to the :class:`~chapps.config.CHAPPSConfig` stored
on the first policy in the list
:listen_address: the IP address to bind to; see :meth:`.listen_address`
:listen_port: the port to listen on; see :meth:`.listen_port`
"""
[docs] def __init__(
self,
policies: Optional[List[EmailPolicy]] = [],
*,
pprclass: PostfixPolicyRequest = PostfixPolicyRequest,
):
self.policies = policies
self.pprclass = pprclass
if not self.policies:
raise ValueError("A list of policy objects must be provided.")
self.config = self.policies[
0
].config # all copies of the config are the same
logger.debug(
"Grabbing config from file "
+ self.config.chapps.config_file
+ " via "
+ self.policies[0].__class__.__name__
)
@cached_property
def listen_address(self):
return next(
(getattr(p.params, "listen_address", None) for p in self.policies),
None,
)
@cached_property
def listen_port(self):
return next(
(getattr(p.params, "listen_port", None) for p in self.policies),
None,
)
# an asynchronous policy handler which cascades through all the policies;
# fails stop execution
[docs] def async_policy_handler(self):
"""Coroutine factory
:returns: a coroutine which handles requests according to the policies,
in order
"""
pprclass = self.pprclass
policies = self.policies
encoding = self.config.chapps.payload_encoding
no_user_key_response = self.config.chapps.no_user_key_response
logger.debug(
f"Cascading policy handler requested for "
f"{[ type(p) for p in policies ]} using PPR "
f"class {pprclass.__name__}."
)
async def handle_policy_request(reader, writer):
"""Handles reading and writing the streams around policy approval messages, and manages the cascade"""
while True:
try:
policy_payload = await reader.readuntil(b"\n\n")
except ConnectionResetError:
logger.debug(
"Postfix said goodbye. Terminating this thread."
)
return
except asyncio.IncompleteReadError as e:
logger.debug(
"Postfix hung up before a read could be completed. Terminating this thread."
)
return
except CallableExhausted as e:
raise e
except Exception:
if reader.at_eof():
logger.debug(
"Postfix said goodbye oddly. Terminating this thread."
)
return
else:
logger.exception("UNEXPECTED ")
continue
logger.debug(
f"Payload received: {policy_payload.decode( 'utf-8' )}"
)
policy_data = pprclass(
policy_payload.decode(encoding).split("\n")
)
approval = True
for policy in policies:
try:
if policy.approve_policy_request(policy_data):
resp = (
"action="
+ policy.params.acceptance_message
+ "\n\n"
)
logger.info(
f"{type(policy).__name__} PASS {policy_data}"
)
else:
resp = (
"action="
+ policy.params.rejection_message
+ "\n\n"
)
approval = False
logger.info(
f"{type(policy).__name__} FAIL {policy_data}"
)
except NullSenderException:
if policy.params.null_sender_ok:
resp = (
"action="
+ policy.params.acceptance_message
+ "\n\n"
)
logger.info(
f"{type(policy).__name__} PASS NS {policy_data}"
)
else:
resp = (
"action="
+ policy.params.rejection_message
+ "\n\n"
)
approval = False
logger.info(
f"{type(policy).__name__} FAIL NS {policy_data}"
)
except AuthenticationFailureException:
resp = "action=" + no_user_key_response + "\n\n"
approval = False
logger.info(
f"{type(policy).__name__} FAIL NA {policy_data}"
)
except CHAPPSException:
logger.exception("During policy evaluation:")
if not approval:
break
try:
writer.write(resp.encode())
except asyncio.CancelledError:
pass
except Exception:
logger.exception(
f"Exception raised trying to send {resp.strip()}"
)
return
return handle_policy_request
[docs]class RequestHandler(CascadingPolicyHandler):
"""Refactored intermediate base class for wrapping policy managers in an event loop
This class has been reimplemented as a subclass of
:class:`.CascadingPolicyHandler`, as a special case which has only one
policy to handle.
Instance attributes:
:policy: the :class:`~chapps.policy.EmailPolicy` manager instance
:config: a reference to the :class:`~chapps.config.CHAPPSConfig` stored
on the policy instance.
:pprclass: a reference to the particular kind of
:class:`~chapps.util.PostfixPolicyRequest` to instantiate
"""
[docs] def __init__(
self,
policy: EmailPolicy,
*,
pprclass: Type[PostfixPolicyRequest] = PostfixPolicyRequest,
):
"""Setup a Postfix policy request handler
:param chapps.policy.EmailPolicy policy: an instance of a policy
manager (a subclass of :class:`~chapps.policy.EmailPolicy`)
:param Type[PostfixPolicyRequest] pprclass: the subclass of :class:`~chapps.util.PostfixPolicyRequest` to instantiate from the Postfix payloads; defaults to :class:`~chapps.util.PostfixPolicyRequest`
.. note::
Unlike other class families within CHAPPS, the handlers in this
module do not accept config-override arguments. They obtain their
references to the config from their attached policy managers.
"""
super().__init__([policy], pprclass=pprclass)
@cached_property
def listen_address(self):
return self.policy.params.listen_address
@cached_property
def listen_port(self):
return self.policy.params.listen_port
@cached_property
def policy(self):
return self.policies[0]
[docs]class OutboundMultipolicyHandler(CascadingPolicyHandler):
"""Convenience subclass for combining outbound P/F policies
Could be thought of as a concrete subclass of
:class:`~.CascadingPolicyHandler`, but meant more as a convenience.
"""
[docs] def __init__(self, policies=[], *, pprclass=OutboundPPR):
"""Setup an OutboundMultipolicyHandler
:param List[EmailPolicy] policies: a list of policy manager instances
:param Type[PostfixPolicyRequest] pprclass: kind of
:class:`~chapps.util.PostfixPolicyRequest` to instantiate from
Postfix request payloads; defaults to
:class:`~chapps.outbound.OutboundPPR`
If none are provided, default-configured instances of
:class:`~chapps.policy.SenderDomainAuthPolicy` and
:class:`~chapps.policy.OutboundQuotaPolicy` are used, in that order.
"""
# note that we default to OutboundPPR here
policies = policies or [
SenderDomainAuthPolicy(),
OutboundQuotaPolicy(),
] # create a list of relevant outbound y/n policies
super().__init__(policies, pprclass=pprclass)
[docs]class OutboundQuotaHandler(RequestHandler):
"""Convenience class for wrapping :class:`~chapps.policy.OutboundQuotaPolicy`"""
[docs] def __init__(self, policy: OutboundQuotaPolicy = None):
"""Setup an OutboundQuotaHandler
:param chapps.policy.OutboundQuotaPolicy policy: an instance of
:class:`~chapps.policy.OutboundQuotaPolicy`
"""
p = policy or OutboundQuotaPolicy()
super().__init__(p, pprclass=OutboundPPR)
[docs]class GreylistingHandler(RequestHandler):
"""Convenience class for wrapping :class:`~chapps.policy.GreylistingPolicy`"""
[docs] def __init__(self, policy: GreylistingPolicy = None):
"""Setup a GreylistingHandler
:param chapps.policy.GreylistingPolicy policy: an instance of
:class:`~chapps.policy.GreylistingPolicy`
"""
p = policy or GreylistingPolicy()
super().__init__(p, pprclass=InboundPPR)
[docs]class SenderDomainAuthHandler(RequestHandler):
"""Convenience class for wrapping :class:`~chapps.policy.SenderDomainAuthPolicy`"""
[docs] def __init__(self, policy: SenderDomainAuthPolicy = None):
"""Setup a SenderDomainAuthHandler
:param chapps.policy.SenderDomainAuthPolicy policy: an instance of
:class:`~chapps.policy.SenderDomainAuthPolicy`
"""
p = policy or SenderDomainAuthPolicy()
super().__init__(p, pprclass=OutboundPPR)
[docs]class CascadingMultiresultPolicyHandler(CascadingPolicyHandler):
"""Cascading handler for policies which produce more than two results
Some policies, such as the SPF enforcement policy, need to be able to
generate responses with more nuance than simply pass/fail, and so there is
a need for a handler which can deal with policies which return strings
(Postfix directives), or possibly a custom class to encapsulate also some
idea of pass/fail, in order to know whether to abort the policy-evaluation
loop early.
"""
[docs] def async_policy_handler(self):
"""Returns a coroutine which handles results according to the configuration
The policy being enforced is stored in the SPF-related TXT record
on the sender's domain. The local configuration of this policy
amounts to instructions about responses to different outcomes of
the SPF check, along with what IP address and port to listen on.
This policy handler is different from others in that, because it
does not expect a PASS/FAIL response, it simply wraps the return
value of
:func:`~chapps.spf_policy.SPFEnforcementPolicy.approve_policy_request()`
in a Postfix response packet, and sends it. Rather than refer to
pre-configured acceptance and rejection messages, it expects the
approval routine to send a string which can be interpreted by
Postfix as a command.
TODO: In order to be able to cascade *through* this kind of policy,
it is going to have to return a first-class object which can be
annotated as being a pass or a fail, so that a cascading handler
can decide whether to continue. That object's `__str__()`
method will need to return the Postfix command.
.. todo::
In order to support more than one MTA (tho no such support is
planned), the action-translation layer might be refactored out of
the policy itself, to be applied here, in order to switch between
different types.
"""
### This override version for SPF enforcement does not assume a yes-or-no response pattern
logger.debug(
"Policy handler requested for"
f" {[type(p).__name__ for p in self.policies]!r}."
)
pprclass = self.pprclass
policies = self.policies
encoding = self.config.chapps.payload_encoding
logger.debug(
"Cascading Multipolicy handler requested for "
f"{[type(p) for p in policies]} using PPR "
f"class {pprclass.__name__}."
)
async def handle_policy_request(reader, writer):
"""Handles reading and writing the streams around policy approval messages"""
while True:
try:
policy_payload = await reader.readuntil(b"\n\n")
except ConnectionResetError:
logger.debug(
"Postfix said goodbye. Terminating this thread."
)
return
except asyncio.IncompleteReadError as e:
logger.debug(
"Postfix hung up before a read could be completed."
" Terminating this thread."
)
return
except CallableExhausted as e:
raise e
except Exception:
if reader.at_eof():
logger.debug(
"Postfix said goodbye oddly."
" Terminating this thread."
)
return
else:
logger.exception("UNEXPECTED ")
continue
logger.debug(
f"Payload received: {policy_payload.decode(encoding)}"
)
policy_data = pprclass(
policy_payload.decode(encoding).split("\n")
)
actions = ["DUNNO"]
# TODO:
# track all responses, and then extract non-DUNNO ones if any
for policy in policies:
try:
action = policy.approve_policy_request(policy_data)
actions.append(action)
logger.info(
f"{type(policy).__name__} "
# + ("PASS" if action else "FAIL")
+ f" {action!r} {policy_data}"
)
except CHAPPSException:
logger.exception("During policy evaluation:")
if not action:
break
if action: # i.e. the mail will be forwarded
non_dunno = [a for a in actions if a != "DUNNO"]
action = non_dunno[-1] if non_dunno else "DUNNO"
try:
writer.write((f"action={action}\n\n").encode(encoding))
except asyncio.CancelledError: # pragma: no cover
pass
except Exception:
logger.exception(
f"Exception raised trying to send {action}"
)
return
return handle_policy_request
[docs]class MultiresultPolicyHandler(CascadingMultiresultPolicyHandler):
"""A subclass for handling just one policy"""
[docs] def __init__(
self,
policy: EmailPolicy,
*,
pprclass: Optional[PostfixPolicyRequest] = PostfixPolicyRequest,
):
super().__init__([policy], pprclass=pprclass)
if HAVE_SPF:
[docs] class SPFEnforcementHandler(MultiresultPolicyHandler):
"""Special handler class for :class:`~chapps.spf_policy.SPFEnforcementPolicy`
This one came along last and forced a reconsideration of how all this
worked, because it produces more than two possible states as output.
The plan is to retrofit all the older policies so that they also can
use an action-translation layer, but that will also require some
adjustment of the cascading handler.
.. note::
This class will not be defined if the relevant SPF libraries could
not be loaded. They may be installed via `pip` using the extras
mechanism: ``pip install chapps[SPF]``
"""
[docs] def __init__(
self,
policy: Optional[SPFEnforcementPolicy] = None,
pprclass: Optional[PostfixPolicyRequest] = InboundPPR,
):
"""Set up an SPFEnforcementHandler
:param chapps.spf_policy.SPFEnforcementPolicy policy: an instance
of :class:`~chapps.spf_policy.SPFEnforcementPolicy`
"""
p = policy or SPFEnforcementPolicy()
super().__init__(p, pprclass=pprclass)
@cached_property
def policy(self):
return self.policies[0]
[docs] class InboundMultipolicyHandler(CascadingMultiresultPolicyHandler):
"""Implements SPF and Greylisting simultaneously
This is a template for an inbound multipolicy handler, which by default
checks SPF and also performs Greylisting, each of which depends upon
options which the email account administrator may set about whether
either of SPF or Greylisting or both should be applied to inbound mail
for the particular domain.
"""
[docs] def __init__(
self,
policies: Optional[List[EmailPolicy]] = None,
*,
pprclass: Optional[PostfixPolicyRequest] = InboundPPR,
):
"""Create an inbound policy handler for SPF + Greylisting"""
policies = policies or [
SPFEnforcementPolicy(),
GreylistingPolicy(),
]
super().__init__(policies, pprclass=pprclass)