"""
Policy managers
---------------
All email policy managers inherit from :class:`~.EmailPolicy`, which provides a
fair amount of base functionality useful to its subclasses. So far, all but
the :class:`~.SPFEnforcementPolicy` are contained here. That one has
extra dependencies which are thus kept isolated. Find it in
:mod:`.spf_policy`.
"""
import time
from contextlib import contextmanager
from collections import deque
from typing import List, Dict, Union, Optional, Tuple
import functools
import redis
import logging
from expiring_dict import ExpiringDict
from chapps.config import CHAPPSConfig, env
from chapps.signals import NullSenderException, NoRecipientsException
from chapps.models import Quota, SDAStatus, PolicyResponse
from chapps.util import PostfixPolicyRequest
from chapps.outbound import OutboundPPR
from chapps.inbound import InboundPPR
if env.get("CHAPPS_DB_MODULE", None) == "mysql":
from chapps.adapter import (
MariaDBQuotaAdapter as OBQAdapter, # outbound quota
MariaDBSenderDomainAuthAdapter as SDAAdapter,
MariaDBInboundFlagsAdapter as IBFAdapter,
)
else:
from chapps.sqla_adapter import (
SQLAQuotaAdapter as OBQAdapter,
SQLASenderDomainAuthAdapter as SDAAdapter,
SQLAInboundFlagsAdapter as IBFAdapter,
)
policy_response = PolicyResponse.policy_response # a parameterized decorator
logger = logging.getLogger(__name__)
seconds_per_day = 3600 * 24
SENTINEL_TIMEOUT = 0.1
TIME_FORMAT = "%d %b %Y %H:%M:%S %z"
# There are a number of commented debug statements in this module
# This is for convenience, because in production these routines need
# to be as performant as possible, but these messages are often very
# helpful for diagnosing problems during testing and debugging
[docs]class EmailPolicy:
"""Abstract policy manager
Subclasses must:
* set class attribute `redis_key_prefix` to a unique value.
* define an instance method called :meth:`~.approve_policy_request`.
This abstract superclass provides a standard framework for constructing
Redis keys, since the main purpose of the policy manager is to make
decisions about email by consulting Redis.
Instance attributes:
:config: :class:`chapps.config.CHAPPSConfig` either passed in
during initialization or inherited from the environment
:params: :class:`chapps.util.AttrDict` corresponding to the config
for the policy manager
:sentinel: a :class:`redis.Sentinel` handle, or `None` if using
only Redis
:redis: a :class:`redis.Redis` handle
"""
redis_key_prefix = "chapps"
"""A placeholder value, since this class is abstract
Subclasses should set this to a unique prefix identifying the policy
manager. For examples, see the included subclasses.
"""
[docs] @staticmethod
def rediskey(prefix: str, *args):
"""Format a string to serve as a Redis key for arbitrary data
:param str prefix: a prefix unique to the policy manager (subclass)
:param List[str] args: a list of strings to use to construct the rest
of the key
In CHAPPS, each policy has its own prefix. What other data the policy
uses to construct the key is not relevant to any other entities, though
it must be sent as a string.
This routine simply joins up all the tokens with colon (`:`)
characters, so it is not recommended to use them as part of the
key-components (although it should 'just work').
"""
return f"{ prefix }:{ ':'.join( args ) }"
[docs] @classmethod
def _fmtkey(cls, *args):
"""Convenience classmethod for Redis key construction
:param List[str] args: a list of key components
Subclasses may use this method which automatically discovers the
prefix.
:meta public:
"""
return cls.rediskey(cls.redis_key_prefix, *args)
[docs] def __init__(self, cfg: CHAPPSConfig = None):
"""Sets up a new policy manager
:param chapps.config.CHAPPSConfig cfg: optional :class:`CHAPPSConfig`
object for config override
Store the config and get the params for the specific policy class,
which are in a config block named for the class. Using that config, set
the policy manager up with a Redis handle, and an instance cache (from
:class:`expiring_dict.ExpiringDict`)
"""
self.config = cfg or CHAPPSConfig.get_config()
self.params = self.config.get_block(self.__class__.__name__)
self._adapter = None
self.sentinel = None
self.redis = self._redis() # pass True to get read-only
self.instance_cache = ExpiringDict(3) # entries expire after 3 seconds
[docs] @contextmanager
def _adapter_handle(self):
"""Context manager for obtaining a database handle
In order to acquire policy configuration data, the policy manager must
be able to reach the RDBMS or other policy-config data store. One of
the policy manager's priciple jobs is to obtain this data from the
database and stuff it into Redis for future reference.
Adapter configuration, in terms of how to access the database, is
obtained from the config object.
.. todo::
It is clear now that the adapter classes should also accept an
optional config argument, and then use it for default values, so that
this routine need not enumerate all the options.
:meta public:
"""
if not self._adapter:
self._adapter = self.adapter_class(cfg=self.config)
try:
yield self._adapter
finally:
if getattr(self._adapter, "conn", None):
self._adapter.conn.close()
self._adapter = None
@contextmanager
def _control_data_storage_context(self):
"""Context manager for storing control data in Redis
This is the most basic of routines, provided for policies which store
only one item in Redis, perhaps an option flag for instance.
The context manager yields a callable which expects up to three
arguments:
.. code::
dsc(token, setting, expire=seconds_per_day)
:token: is a token unique to the resource along with the setting. The
`redis_key_prefix` is automatically prepended to the token. If
the token is otherwise compound, it should be compounded
beforehand and presented as a string.
:setting: is the value to be stored in Redis
:expire: the Redis entry's TTL in seconds; defaults to 24hr
This pattern is used throughout the policy managers to handle making
Redis settings in pipelines. Some policy managers declare overrides
for this method, in order to automate creation of compound keys or to
store more than one piece of data at once. Where a policy has a
per-domain enforcement flag, that flag is generally being stored on a
Redis key formed by tacking the `redis_key_prefix` onto the front of
the domain name.
In practice, to use this context manager, capture its yielded output
and call it in order to store data:
.. code::
resource_settings_map = {d: True for d in domains}
with policy._control_data_storage_context() as dsc:
for domain, option in resource_settings_map.items():
dsc(domain, option)
If there are a number of settings to create, make sure to place the
loop within the context so that all the settings will be submitted as
part of the same pipeline.
"""
pipe = self.redis.pipeline()
fmtkey = self._fmtkey
def _dsc(token, setting, expire=seconds_per_day):
pipe.set(fmtkey(token), setting, ex=expire)
try:
yield _dsc
finally:
pipe.execute()
pipe.reset()
def _redis(self, read_only: bool = False):
"""Get a Redis handle, possibly from Sentinel
:param bool read_only: if Sentinel is in use, get a read-only handle
If you're not using Sentinel, the `read_only` parameter is
meaningless.
"""
try:
if self.config.redis.sentinel_servers and not self.sentinel:
self.sentinel = redis.Sentinel(
[
s.split(":")
for s in self.config.redis.sentinel_servers.split(" ")
],
socket_timeout=SENTINEL_TIMEOUT,
)
if self.sentinel:
if read_only:
rh = self.sentinel.slave_for(
self.config.redis.sentinel_dataset,
socket_timeout=SENTINEL_TIMEOUT,
)
else:
rh = self.sentinel.master_for(
self.config.redis.sentinel_dataset,
socket_timeout=SENTINEL_TIMEOUT,
)
return rh
except AttributeError:
pass
return redis.Redis(
host=self.config.redis.server, port=self.config.redis.port
)
[docs] def approve_policy_request(
self, ppr: PostfixPolicyRequest, **opts
) -> Union[str, bool]:
"""Determine a policy outcome based on the PPR provided
This routine may return a boolean PASS/FAIL response, or it may for
some policy classes return a string, which represents the policy
outcome and is suitable for sending to Postfix.
The result of the policy approval is cached based on the instance value
provided by Postfix. The memoization is done here in the superclass in
order to avoid duplication of memoization code.
"""
response = self.instance_cache.get(ppr.instance, None)
if response is None:
response = self._approve_policy_request(ppr, **opts)
self.instance_cache[ppr.instance] = response
return response
def _approve_policy_request(
self, ppr: PostfixPolicyRequest, **opts
) -> Union[str, bool]:
"""Placeholder method which must be implemented by subclasses."""
raise NotImplementedError(
"Subclasses of EmailPolicy must implement this function."
)
[docs]class PostfixActions:
"""Superclass for Postfix action adapters"""
[docs] @staticmethod
def dunno(*args, **kwargs):
"""Return the Postfix directive `DUNNO`"""
return "DUNNO"
[docs] @staticmethod
def okay(*args, **kwargs):
"""Return the Postfix directive `OK`"""
return "OK"
ok = okay
"""`ok()` is an alias for `okay()`"""
[docs] @staticmethod
def defer_if_permit(msg, *args, **kwargs):
"""
Return the Postfix `DEFER_IF_PERMIT` directive with the provided
message
"""
return f"DEFER_IF_PERMIT {msg}"
[docs] @staticmethod
def reject(msg, *args, **kwargs):
"""
Return the Postfix `REJECT` directive along with the provided message
"""
return f"REJECT {msg}"
[docs] @staticmethod
def prepend(*args, **kwargs):
"""
Return the Postfix `PREPEND` directive.
Include the header to prepend as keyword-argument `prepend`
"""
new_header = kwargs.get("prepend", None)
if new_header is None or len(new_header) < 5:
raise ValueError(
f"Prepended header expected to be at least 5 chars in length."
)
return f"PREPEND {new_header}"
[docs] def __init__(self, cfg=None):
"""
Optionally supply a :py:class:`chapps.config.CHAPPSConfig` instance as
the first argument.
"""
self.config = cfg or CHAPPSConfig.get_config()
self.params = self.config # later this is overridden, in subclasses
def _get_closure_for(
self, decision: str, *, passing: Optional[bool] = None
):
"""Setup the prescribed closure for generating SMTP action directives"""
action_config = getattr(self.params, decision, None)
if not action_config:
raise ValueError(
f"Action config for {self.__class__.__name__} does not contain a key named {decision}"
)
action_tokens = action_config.split(" ")
action = action_tokens[0]
try:
i = int(action) # if the first token is a number, its a directive
except ValueError: # first token was a string, and therefore refers to a method
# look for predefined or memoized version
af = getattr(self, action, None)
if af:
return af
# no local version found, find function reference
action_func = getattr(PostfixActions, action, None)
if not action_func:
action_func = getattr(self.__class__, action, None)
if not action_func:
raise NotImplementedError(
f"Action {action} is not implemented by PostfixActions"
f" or by {self.__class__.__name__}"
)
else:
# construct closure from configured message string
action_func = (
lambda reason, ppr, *args, **kwargs: action_config.format(
reason=reason
)
)
passing = (
(action_func in [self.reject, self.defer_if_permit])
if passing is None
else passing
)
action_func = policy_response(passing, action)(action_func)
# memoize the action function for quicker reference next time
setattr(self, action, action_func)
return action_func
def _get_message_for(self, decision, config_name=None):
"""Grab a status message for a decision from the config, optionally with another name"""
msg_key = config_name or decision
msg = getattr(self, msg_key, None)
if not msg:
raise ValueError(
f"There is no key {msg_key} in the config for {self.__class__.__name__} or its policy"
)
return msg
def _mangle_action(self, action):
"""
Policy decisions which are also reserved words need to be altered.
Currently, this routine handles only the action 'pass'
"""
if action == "pass":
return "passing"
return action
[docs] def action_for(self, *args, **kwargs):
"""Abstract method which must be implemented in subclasses.
This method is intended to map responses from a policy
manager onto Postfix directives.
Not all policy managers return only yes/no answers. Some, like
:py:class:`chapps.spf_policy.SPFEnforcementPolicy`, return a handful of
different possible values, and so there must be a mechanism for
allowing sites to determine what happens when each of those different
outcomes occurs.
"""
raise NotImplementedError(
f"Subclasses of {self.__class__.__name__} must define the method"
" action_for() for themselves, to map policy module responses"
" (decisions) onto Postfix action directives."
)
[docs]class PostfixPassfailActions(PostfixActions):
"""Postfix Actions adapter for PASS/FAIL policy responses.
Many policies return `True` if the email should be accepted/forwarded and
return `False` if the email should be rejected/dropped. This class
encapsulates the common case, and includes some logic to extract precise
instructions from the config.
"""
[docs] def __init__(self, cfg=None):
super().__init__(cfg)
def _get_closure_for(
self, decision, *, passing: bool = None, msg_key: str = None
):
"""Create a closure for formatting these messages and store it on
self.<decision>, and also return it
"""
msg_key = msg_key or decision
msg = getattr(self.params, msg_key, None)
if not msg:
raise ValueError(
f"The key '{msg_key}' is not defined in the config for"
f" {self.__class__.__name__} or its policy"
)
msg_tokens = msg.split(" ")
msg_text = ""
if msg_tokens[0] == "OK":
func = PostfixActions.okay
elif msg_tokens[0] == "DUNNO":
func = PostfixActions.dunno
elif msg_tokens[0] == "DEFER_IF_PERMIT":
func = PostfixActions.defer_if_permit
msg_text = " ".join(msg_tokens[1:])
elif msg_tokens[0] == "REJECT" or msg_tokens[0] == "554":
func = PostfixActions.reject
msg_text = " ".join(msg_tokens[1:])
else:
raise NotImplementedError(
"Pass-fail closure creation for Postfix directive"
f" {msg_tokens[0]} is not yet available."
)
action = policy_response(
(
passing
if passing is not None
else (func != PostfixActions.reject)
),
decision,
)(self.__prepend_action_with_message(func, msg_text))
setattr(self, decision, action)
return action
def __prepend_action_with_message(self, func, prepend_msg_text):
"""Wrap an action func in order to prepend an additional message"""
# avoiding use of nonlocal required if definition is embedded inline
# in calling procedure
def action(message="", *args, **kwargs):
msg_text = prepend_msg_text
if len(message) > 0:
msg_text = " ".join([msg_text, message])
return func(msg_text, *args, **kwargs)
return action
[docs] def action_for(self, pf_result):
"""Return an action closure for a pass/fail policy
Evaluates its argument `pf_result` as a boolean and returns the
action closure for 'passing' if True, otherwise the action closure for
'fail'. To provide backwards-compatibility with older versions, and to
allow for more descriptive configuration elements, the actions may be
attached to keys named `acceptance_message` or `rejection_message`
instead of `passing` and `fail` respectively. This is only true
of policies with action factories inheriting from
:py:class:`.PostfixPassfailActions`
"""
if pf_result: # True / pass
action_name = "passing"
else: # False / fail
action_name = "fail"
return getattr(self, action_name, None)
def __getattr__(self, attrname, *args, **kwargs):
"""Allow use of old config elements with long descriptive names."""
attrname = self._mangle_action(attrname)
if attrname == "passing":
msg_key = "acceptance_message"
elif attrname == "fail":
msg_key = "rejection_message"
else:
raise NotImplementedError(
f"Pass-fail actions do not include {attrname}"
)
return self._get_closure_for(
attrname, msg_key=msg_key, passing=(attrname == "passing")
)
[docs]class PostfixOQPActions(PostfixPassfailActions):
"""Postfix Action translator for :py:class:`chapps.policy.OutboundQuotaPolicy`"""
[docs] def __init__(self, cfg=None):
"""
Optionally provide an instance of :py::class`chapps.config.CHAPPSConfig`.
All this class does is wire up `self.config` to
point at the :py:class:`chapps.policy.OutboundQuotaPolicy` config block.
"""
super().__init__(cfg)
self.params = self.config.policy_oqp
[docs]class PostfixGRLActions(PostfixPassfailActions):
"""Postfix Action translator for :py:class:`chapps.policy.GreylistingPolicy`"""
[docs] def __init__(self, cfg=None):
"""
Optionally provide an instance of :py:class:`chapps.config.CHAPPSConfig`.
All this class does is wire up `self.config` to
point at the :py:class:`chapps.policy.GreylistingPolicy` config block.
"""
super().__init__(cfg)
self.params = self.config.policy_grl
[docs]class InboundPolicy(EmailPolicy):
adapter_class = IBFAdapter # inbound flags adapter indirection
[docs] def domain_option_key(self, ppr: InboundPPR):
"""Return the Redis key for the domain's Greylisting option
Uses the first of the list of tokenized recipients. Generally,
inbound mail is expected to contain only one recipient per email.
"""
return self._domain_option_key(ppr.recipient_domain)
def _domain_option_key(self, recipient_domain):
return self._fmtkey(recipient_domain)
def _store_control_data(self, domain: str, flag: bool):
with self._control_data_storage_context() as dsc:
dsc(domain, 1 if flag else 0)
def _whitelisted(self, ppr: InboundPPR) -> bool:
return ppr.helo_match(self.config.helo_whitelist)
[docs]class GreylistingPolicy(InboundPolicy):
"""Policy manager which implements greylisting
`Greylisting <https://en.wikipedia.org/wiki/Greylisting_(email)>`_ is a
`well-defined <https://datatracker.ietf.org/doc/html/rfc6647>`_ and
frequently-implemented pattern. This implementation stores the tracking
information in Redis.
Instance attributes (in addition to those of :class:`.EmailPolicy`):
:min_defer: minimum time between retries, in seconds
:cache_ttl: how long to store tracking data, in seconds
:allow_after: success threshold, after which the client may be
whitelisted
"""
redis_key_prefix = "grl"
"""Greylisting Redis key prefix"""
[docs] def __init__(
self,
cfg: CHAPPSConfig = None,
*,
minimum_deferral: int = 60,
cache_ttl: int = seconds_per_day,
auto_allow_after: int = None,
):
"""Initialize a greylisting policy manager
:param chapps.config.CHAPPSConfig cfg: optional config override
:param int minimum_deferral: min time between retries, in seconds
:param int cache_ttl: tracking data cache expiration time in seconds
:param int auto_allow_after: number of successful attempts needed
before source client is considered trusted, and no longer incurs
deferrals
"""
super().__init__(cfg)
self.actions = PostfixGRLActions(self.config)
self.min_defer = minimum_deferral
self.cache_ttl = cache_ttl
self.allow_after = (
auto_allow_after
if auto_allow_after is not None
else self.params.whitelist_threshold
)
if self.cache_ttl <= self.min_defer:
logger.warning(
f"Cache TTL (={datetime.timedelta(seconds=self.cache_ttl)}) is not allowed to be smaller than or equal to the minimum deferral window (={datetime.timedelta(seconds=self.min_defer)}). Defaulting to 24 hr."
)
self.cache_ttl = seconds_per_day
if self.min_defer > 60 * 15:
logger.warning(
f"It may be unreasonable to expect the sending server to defer for more than 15 minutes. (={self.min_defer/60.0:.2f}m)"
)
if self.allow_after == 0:
logger.warning(f"Sender auto-approval is turned off.")
elif self.allow_after < 2:
logger.warning(
f"Sender auto-approval is set to a fairly low threshold. (={self.allow_after})"
)
[docs] def tuple_key(self, ppr: PostfixPolicyRequest) -> str:
"""Return the greylisting tuple as a Redis key
The names of the values taken from `ppr` are as follows (in order):
- `client_address`
- `sender`
- `recipient`
"""
return self._tuple_key(ppr.client_address, ppr.sender, ppr.recipient)
def _tuple_key(self, client_address, sender, recipient):
return self._fmtkey(client_address, sender, recipient)
[docs] def client_key(self, ppr: PostfixPolicyRequest):
"""Return the greylisting client key
This key indicates whether the client has enough successful
resubmissions to be whitelisted.
"""
return self._client_key(ppr.client_address)
def _client_key(self, client_address):
return self._fmtkey(client_address)
[docs] def acquire_policy_for(self, ppr: InboundPPR):
with self._adapter_handle() as adapter:
result = adapter.do_greylisting_on(ppr.recipient_domain)
logger.debug(
"Got greylisting option flag "
+ str(result)
+ " from RDBMS for domain "
+ ppr.recipient_domain
)
self._store_control_data(ppr.recipient_domain, 1 if result else 0)
return result
[docs] def _approve_policy_request(self, ppr: InboundPPR, **opts):
"""Perform greylisting
.. todo::
It would be possible to allow domains to set the whitelisting
threshhold, since this routine has to obtain option flag data from
the policy config source at this point anyway. Because we're also
concerned with time between attempts, we could also do some
time-based things here, such as starting to refuse clients who
retry much too quickly, implementing per-domain rules about how
frequently a client is allowed to send email to their addresses,
etc.
:meta public:
"""
if self._whitelisted(ppr): # forward without SPF header
logger.info(f"Whitelisting traffic from {ppr.helo_name}.")
return "DUNNO" # needs to be the string
option_set, tuple_seen, client_tally = None, None, None
try:
option_set, tuple_seen, client_tally = self._get_control_data(ppr)
if option_set is None:
option_set = self.acquire_policy_for(ppr)
except NoRecipientsException:
logger.exception(f"No recipient in PPR {ppr.instance}.")
return self.actions.action_for(False)("", ppr=ppr)
except Exception: # pragma: no cover
logger.exception("UNEXPECTED")
logger.debug(
f"Returning denial for {ppr.instance} (unexpected exception)."
)
if opts.get("force", False):
option_set = True
if not option_set:
logger.debug(
"Not enforcing greylisting for domain "
f"{ppr.recipient_domain or 'N/A'}"
)
return "DUNNO" # needs to be a string
# if not whitelisting, client_tally will be None
if client_tally is not None and client_tally >= self.allow_after:
self._update_client_tally(ppr)
return self.actions.action_for(True)("", ppr=ppr)
if tuple_seen:
# The tuple is recognized; need to check if it was long enough ago
now = time.time()
if now - tuple_seen >= self.min_defer:
# the email will be approved; some housekeeping is necessary
self._update_client_tally(ppr)
return self.actions.action_for(True)("", ppr=ppr)
# if we get here, the tuple either isn't stored or was stored too
# recently; either way, we update it
self._update_tuple(ppr)
return self.actions.action_for(False)("", ppr=ppr)
def _get_control_data(self, ppr: InboundPPR):
"""Extract data from Redis in order to answer the policy request"""
now = time.time()
tuple_key = self.tuple_key(ppr)
client_key = self.client_key(ppr)
option_key = self.domain_option_key(ppr)
logger.debug(
f"Redis keys: tuple={tuple_key} opt={option_key} client={client_key} (zrange)"
)
pipe = self.redis.pipeline()
pipe.zremrangebyscore(client_key, 0, now - float(self.cache_ttl))
pipe.get(tuple_key)
pipe.get(option_key)
if self.allow_after > 0:
pipe.zrange(client_key, 0, -1)
result = pipe.execute()
logger.debug(f"Redis result: {result!r}")
tuple_bits = result[1]
option_bits = result[2]
if len(result) == 4:
client_tally_bits = result[3]
tuple_seen = (
float(tuple_bits) if tuple_bits else None
) # UNIX epoch time
option_set = int(option_bits) if option_bits else None
client_tally = None
if self.allow_after > 0 and client_tally_bits:
client_tally = len(client_tally_bits)
return (option_set, tuple_seen, client_tally)
def _update_client_tally(self, ppr: InboundPPR):
"""Update client reliability score in Redis
When an email is allowed, increment the reliability score of the
client.
"""
if self.allow_after == 0: # if we're not keeping a tally, return
return
now = time.time()
client_key = self.client_key(ppr)
with self.redis.pipeline() as pipe:
pipe.zadd(client_key, {ppr.instance: now})
pipe.zremrangebyrank(
client_key, 0, -(self.allow_after + 2)
) # keep one extra
pipe.expire(client_key, self.cache_ttl)
pipe.execute()
def _update_tuple(self, ppr: InboundPPR):
"""Set or update a greylisting tuple in Redis"""
self.redis.setex(self.tuple_key(ppr), self.cache_ttl, time.time())
[docs]class OutboundQuotaPolicy(EmailPolicy):
"""Policy manager which implements an outbound quota limitation
Outbound email is controlled based on the count of (attempted)
transmissions in the last 24 hours. Some parameters are provided to
fine-tune the behavior of the limiting algorithm.
Instance attributes (in addition to those of :class:`.EmailPolicy`):
:interval: number of seconds to store transmission attemps, and
to use for quota evaluation; defaults to one day
:counting_recipients: a boolean determined from the config; whether to
count each recipient of a multi-recipient email as a separate
transmission for quota purposes
:min_delta: defaults to 0; if set, the number of seconds which
must elapse between send attempts. **Currently experimental**
"""
adapter_class = OBQAdapter
redis_key_prefix = "oqp"
"""OutboundQuotaPolicy Redis prefix"""
[docs] def __init__(self, cfg=None, *, enforcement_interval=None, min_delta=0):
"""Set up an outbound quota policy manager
:param chapps.config.CHAPPSConfig cfg: optional config override
:param int enforcement_interval: number of seconds to store
transmission attemps, and to use for quota evaluation; defaults to
one day
:param int min_delta: Minimum time which must pass between
transmission attempts; defaults to 5 seconds to prevent spamming.
Set to 0 to disable
"""
super().__init__(cfg) # sets attrs 'config', 'params', and 'redis'
self.interval = (
enforcement_interval if enforcement_interval else seconds_per_day
)
if hasattr(self.params, "min_delta"):
self.min_delta = self.params.min_delta
elif min_delta:
self.min_delta = min_delta
else:
self.min_delta = 0
self.min_delta = float(self.min_delta)
self.counting_recipients = (
self.params.counting_recipients
if hasattr(self.params, "counting_recipients")
else False
)
@contextmanager
def _control_data_storage_context(self):
"""Atomic context manager for Redis updates
Yields a closure which takes the tuple (email, quota, margin) and
adds it to a Redis pipeline, which will set it in Redis once the
context is closed.
Intended to be used as a context manager, like so:
.. code::python
with self._control_data_storage_context() as store:
store(user_identifier, quota_count, margin_count)
This is most useful when collections of data are involved, but also
encapsulates and hides the pipeline management foo required by the
Redis library.
"""
pipe = self.redis.pipeline()
fmtkey = self._fmtkey
def _dsc(user, quota, margin):
pipe.set(fmtkey(user, "limit"), quota, ex=seconds_per_day)
pipe.set(fmtkey(user, "margin"), margin, ex=seconds_per_day)
try:
yield _dsc
finally:
pipe.execute()
pipe.reset()
def _get_control_data(self, ppr):
"""Obtain essential data for policy decisionmaking
This is the routine which keeps track of emails in Redis. It
combines all of its requests into a single pipelined (atomic)
transaction. When counting recipients, the record is a string
consisting of the timestamp and the recipient serial number
separated by a colon, in order to ensure that each recipient
is listed as an attempt in the log. The score is always the
floating-point return value of time.time()
"""
# cache the current(ish) time
time_now = time.time()
time_now_s = str(time_now)
user = ppr.user
# create a dict for Redis.zadd()
if self.counting_recipients:
tries_dict = {
(time_now_s + f":{i:05d}"): time_now
for i, r in enumerate(ppr.recipients)
}
else:
tries_dict = {time_now_s: time_now}
# set up the Redis keys
tries_key = self._fmtkey(user, "attempts")
limit_key = self._fmtkey(user, "limit")
margin_key = self._fmtkey(user, "margin")
# Create a Redis pipeline to atomize a set of instructions
pipe = self.redis.pipeline()
# Clear the list down to just the last interval seconds, generally a day
pipe.zremrangebyscore(tries_key, 0, time_now - float(self.interval))
# Add this/these try(es)
pipe.zadd(tries_key, tries_dict)
# Get control data: the limit, the margin, the attempts list
pipe.get(limit_key)
pipe.get(margin_key)
pipe.zrange(tries_key, 0, -1)
# Set expires on all this stuff so that if they don't send
# email for a day, their data won't still be sitting around
# takin' up space; these return nil values so we have to
# ignore them when we get the results
pipe.expire(tries_key, self.interval)
pipe.expire(limit_key, self.interval)
pipe.expire(margin_key, self.interval)
# Do the thing!
results = pipe.execute()
# The non-retrieval operations still have return values, so we ignore them
removed, _, limit, margin, attempts, _, _, _ = results
# Always polite to reset your pipe
pipe.reset()
# Attempt typecasting on the margin number, which is allowed
# to be either int or float
m = self._cast_margin(margin)
# If no limit is defined, that means there is no quota profile
# for the user, and we just return None here; we must test against
# None because it might be 0
return (int(limit) if limit is not None else None, m, attempts)
def _cast_margin(self, margin_bytes):
"""Convenience method
:param bytes margin_bytes: margin value from Redis
Get the correct type (int or float) from the provided bytestring.
"""
try:
m = int(margin_bytes)
except:
try:
m = float(margin_bytes)
except:
m = 0
return m
[docs] def current_quota(
self, user: str, quota: Optional[Quota] = None
) -> Tuple[int, List[str]]:
"""Provide real-time remaining quota for a user
:param str user: user-identifier
:param chapps.models.Quota quota: optional
:class:`~chapps.models.Quota` record
:returns: (*remaining quota count*, [*remarks*,...])
:rtype: Tuple[int, List[str]]
The caller is anticipated to be the API. The **User** and **Quota**
are both available, so the **Quota** may be provided, but it is not
required. The `user` parameter is expected to contain a string at
present, though this may change as the :mod:`pydantic` data models
become more tightly integrated into the codebase.
The return value, intended for wrapping by the API and transmission to a client, is a tuple composed of:
1. the number of transmission attempts remaining to the user at the
moment the query executed
2. a list of remarks created by the inspection routine
"""
attempts_key = self._fmtkey(user, "attempts")
limit_key = self._fmtkey(user, "limit")
pipe = self.redis.pipeline()
pipe.zremrangebyscore(
attempts_key, 0, time.time() - float(self.interval)
)
pipe.get(limit_key)
pipe.zrange(attempts_key, 0, -1)
results = pipe.execute()
_, limit_bytes, attempts_bytes = results
pipe.reset()
limit = (
int(limit_bytes)
if limit_bytes is not None
else quota.quota
if quota is not None
else None
)
response = limit - len(attempts_bytes) if limit else 0
remarks = []
if attempts_bytes:
last = attempts_bytes[-1]
try:
last = float(last)
except ValueError:
last = float(last.split(b":")[0])
last_try = time.strftime(TIME_FORMAT, time.gmtime(last))
remarks.append(f"Last send attempt was at {last_try}")
if not limit_bytes:
remarks.append(f"There is no cached quota limit for {user}.")
if not quota:
remarks.append(f"There is no quota configured for user {user}.")
if not limit:
remarks.append(
f"No limit could be found; returning zero xmits remaining."
)
return (response, remarks)
[docs] def reset_quota(self, user: str) -> Tuple[int, List[str]]:
"""Reset quota for user
:param str user: user-identifier
:returns: (*number of records dropped*, [*remarks*,...])
:rtype: Tuple[int,List[str]]
This method is intended for real-time management of the Redis
configuration mirror. It will drop all the attempts from the
outbound-quota transmission-tracking list for the named user.
"""
attempts_key = self._fmtkey(user, "attempts")
pipe = self.redis.pipeline()
pipe.zrange(attempts_key, 0, -1)
pipe.delete(attempts_key)
results = pipe.execute()
pipe.reset()
attempts = results[0]
if attempts:
msg = f"Attempts (quota) reset for {user}:"
n_att = len(attempts)
else:
n_att = 0
msg = f"No attempts to reset for {user}:"
msg += f" {n_att} xmits dropped"
return (n_att, [msg])
[docs] def refresh_policy_cache(self, user: str, quota: Quota):
"""API adapter method for refreshing policy config cache"""
self.acquire_policy_for(user, quota.quota)
return self.current_quota(user, quota)
def _store_control_data(self, user, quota, margin=0):
"""Using a context manager, build up a set of instructions to store control data"""
with self._control_data_storage_context() as dsc:
if type(margin) == float:
if margin > 1.0:
if margin < 100.0:
margin = margin / 100.0
else:
raise TypeError(
"margin must be a positive integer or a positive float less than 1 (a percentage)"
) # pragma: no cover
margin = int(margin * float(quota))
dsc(user, quota, margin)
def _detect_control_data(self, user):
"""See if there is control data in Redis for a particular sender"""
key = self._fmtkey(user, "limit")
res = None
try:
res = self.redis.get(key)
except redis.exceptions.ResponseError: # pragma: no cover
# sometimes a key which should not exist still shows up,
# and accessing them causes this error. we choose to
# pretend nothing happened and just delete the key
self.redis.delete(key)
return res
[docs] def acquire_policy_for(self, user: str, quota: Optional[int] = None):
"""Populate Redis with policy config data for a user
:param str user: user-identifier
:param int quota: optional quota to load for the user. This is
provided mainly to optimize actions taken by the API.
Go get the policy for a sender from the policy adapter.
If the margin needs to be configured on a per-sender basis, this is the
place to adjust that. Right now, the margin is set in the config file,
and applied to each user as policy config is loaded.
"""
if not quota:
with self._adapter_handle() as adapter:
quota = adapter.quota_for_user(user)
if quota:
self._store_control_data(user, quota, self.params.margin)
[docs] def _approve_policy_request(self, ppr: OutboundPPR):
"""Determine whether this email falls within the quota
:param chapps.outbound.OutboundPPR ppr: the Postfix payload
Returns True if this email is within the quota.
This routine implements memoization on `ppr.instance` in order to
overcome the Postfix double-checking weirdness. Sometimes, Postfix
sends a request about a given email twice, but this is easy to spot
because they will have the same value for `ppr.instance`.
:meta public:
"""
user = ppr.user
if not self._detect_control_data(user):
self.acquire_policy_for(user)
return self._evaluate_policy_request(ppr)
def _get_delta(self, ppr, attempts):
"""Obtain the number of seconds between successive attempts
This routine should calculate the number of seconds between this
attempt and the previous. To do so, it must take into account whether
we are counting each recipient, and parse the attempt record
accordingly. When counting recipients, the record is a string
consisting of the timestamp and the recipient serial number separated
by a colon, in order to ensure that each recipient is listed as an
attempt in the log.
.. admonition: Experimental
There is some subtle problem with either or both of the logic here
and in the tests, and so since it was initially provided as an
interesting extra feature, it is currently disabled and considered
experimental. At some point, I intend to come back to it.
"""
if len(attempts) < 2:
return float("inf")
delta_index = [-1, -2]
if not self.counting_recipients:
if len(attempts) < 2:
return float("inf")
elif len(attempts) > len(ppr.recipients):
# skip back all but one recipient
recipients_offset = 0 - len(ppr.recipients)
delta_index = [d + recipients_offset for d in delta_index]
else:
return float("inf") # automatically wins
logger.debug(
f"Looking at time-delta for {ppr}: indices {delta_index!r}"
)
try:
timestamps = [
float(t.decode("utf-8").split(":")[0])
if ":" in t.decode("utf-8")
else float(t.decode("utf-8"))
for t in [
attempts[i] for i in delta_index if i < len(attempts)
]
]
except IndexError:
msg = (
f"Recipients={-recipients_offset}"
f" delta_indices={delta_index!r}"
f" Attempts: (#{len(attempts)})"
)
if len(attempts) < 10:
msg += f" {attempts!r}"
logger.exception(msg)
return float("inf")
if len(timestamps) == 2:
logger.debug(
f"attempts: {[attempts[i] for i in delta_index]}; timestamps: {timestamps!r}"
)
return timestamps[0] - timestamps[1]
return float("inf") # return a large value
def _evaluate_policy_request(self, ppr):
"""This actually checks to see if it's okay to send the email.
.. todo::
in this routine, it would be possible to send pub/sub messages via
Redis to consumers who might be interested to know that a particular
user's send-attempts list is over a certain length
"""
instance, user = ppr.instance, ppr.user
try: # this may raise TypeError if the user is unknown
limit, margin, attempts = self._get_control_data(ppr)
except Exception: # pragma: no cover
logger.exception("UNEXPECTED")
logger.debug(
f"Returning denial indicator for {instance} (unexpected exception)."
)
return False
if not limit: # user does not have a quota profile
return False
if len(attempts) < 2: # this is the first attempt in the Redis history
logger.debug(f"Returning OK for {instance} (first attempt).")
return True
if self.min_delta != 0: # set up for checking on throttle
logger.debug(
f"Checking throttle: {ppr.user}:{instance} limit: {limit} margin: {margin} tries: {len(attempts)}"
)
this_delta = self._get_delta(ppr, attempts)
if this_delta < float(self.min_delta): # trying too fast
logger.debug(
f"Rejecting {instance} of {user} for trying too fast. ({this_delta}s since last attempt)"
)
return False
if (
len(attempts) > limit
): # not too fast, check how many send attempts on record
### TODO: alert when the attempts list is really long -- perhaps via Redis pub/sub
if (
len(attempts) - margin > limit
or len(attempts) - len(ppr.recipients) >= limit
):
logger.debug(
f"Rejecting {instance} of {user} for having too many attempts in the last interval: recip: {len(ppr.recipients)} limit: {limit}; tries: {len(attempts)}"
)
return False
else:
logger.debug(
f"Returning OK for {instance} recip: {len(ppr.recipients)} limit: {limit}; tries: {len(attempts)} (within margin)."
)
logger.debug(f"Returning OK for {instance} (under quota).")
return True
[docs]class SenderDomainAuthPolicy(EmailPolicy):
"""Policy manager implementing domain and whole-email matching for senders
This class encapsulates explicit policy regarding what sorts of
masquerading authenticated users are allowed to do. Currently, two sorts
of matches are handled, in succession.
First, the domain part of the email address, the entire string after the
`@`, is matched against **Domain** entries linked to the **User**.
If there is no **Domain** match, then **Email** entries linked to the
**User** are checked. **Email** entries must match the entirety of a
policy request's `sender` attribute in order to pass.
"""
adapter_class = SDAAdapter
redis_key_prefix = "sda"
"""Sender domain auth Redis key prefix"""
# initialization is when we plug in the config
[docs] def __init__(self, cfg: CHAPPSConfig = None):
"""Set up a new sender domain authorization policy manager
:param chapps.config.CHAPPSConfig cfg: optional config override
"""
super().__init__(cfg) # sets attrs 'config' and 'redis'
# every subclass has one of these, with a unique name, and fine
# but maybe there should also be a generic single entry point
[docs] def sender_domain_key(self, ppr: OutboundPPR) -> str:
"""Create a Redis key for a user->domain mapping
:param chapps.outbound.OutboundPPR ppr: a Postfix payload
:returns: the sender domain key, by obtaining the domain part of the
email address from `ppr.sender`
:rtype: str
"""
return self._sender_domain_key(ppr.user, self._get_sender_domain(ppr))
[docs] def sender_email_key(self, ppr) -> str:
"""Create a Redis key for a user->email mapping
:param chapps.outbound.OutboundPPR ppr: a Postfix payload
:returns: the sender email key, by obtaining the email address from
`ppr.sender`
:rtype: str
"""
return self._sender_domain_key(ppr.user, ppr.sender)
# factored out for use in API
[docs] def _sender_domain_key(self, user: str, domain: str) -> str:
"""Passes its two string params to _fmtkey
:meta public:
:param str user: user-identifier
:param str domain: origin domain or email address
:returns: a Redis key
:rtype: str
Should be called `_sender_auth_key` since it works with both domains
and email addresses.
"""
return self._fmtkey(user, domain)
# determine the domain of the sender address, if any
[docs] @functools.lru_cache(maxsize=2)
def _get_sender_domain(self, ppr: OutboundPPR) -> str:
"""Returns the domain portion of `ppr.sender`
:param chapps.outbound.OutboundPPR ppr: a Postfix payload
:returns: the domain part of `ppr.sender`
:rtype: str
:raise chapps.signals.TooManyAtsException: if there are more than one
`@` in `ppr.sender`
:raise chapps.signals.NotAnEmailAddressException: if there is no `@`
in `ppr.sender`
:raise chapps.signals.NullSenderException: if `ppr.sender` is
`None`
:meta public:
"""
if ppr.sender:
return ppr.domain_from(ppr.sender)
raise NullSenderException
# We will need to be able to access policy data in the RDBMS
def _detect_control_data(self, user, domain):
"""Look for SDA control data for a user"""
key = self._sender_domain_key(user, domain)
res = None
try:
res = self.redis.get(key)
except redis.exceptions.ResponseError: # pragma: no cover
self.redis.delete(key)
# logger.debug(f"Found {key} = {res!r} in Redis")
return res if res is None else int(res)
def _get_control_data(self, ppr):
"""Cascade through control data searches: domain, email"""
return self._detect_control_data(
ppr.user, self._get_sender_domain(ppr)
) or self._detect_control_data(ppr.user, ppr.sender)
# We will need to be able to store data in Redis
def _store_control_data(self, ppr: OutboundPPR, allowed: int):
"""Stuff control data into Redis for domain auth"""
# logger.debug(
# f"store request: {ppr.user} {self._get_sender_domain(ppr)}"
# f" {allowed!r}"
# )
with self._control_data_storage_context() as dsc:
dsc(ppr.user, self._get_sender_domain(ppr), allowed)
def _store_email_control_data(self, ppr, allowed):
"""Stuff control data into Redis for email auth"""
# logger.debug(f"store request: {ppr.user} {ppr.sender} {allowed!r}")
with self._control_data_storage_context() as dsc:
dsc(ppr.user, ppr.sender, allowed)
# We will need a Redis storage context manager in order to mimic
# the structure of OQP -- metaprogramming opportunity
@contextmanager
def _control_data_storage_context(
self, expire_time: int = seconds_per_day
):
"""Context manager for storing SDA policy cache data in Redis"""
pipe = self.redis.pipeline()
fmtkey = self._fmtkey
def _dsc(user, domain, allowed):
key = fmtkey(user, domain)
# logger.debug(f"Storing {key} = {allowed} in Redis")
pipe.set(key, allowed, ex=seconds_per_day)
try:
yield _dsc
finally:
pipe.execute()
pipe.reset()
# How to obtain control data
[docs] def acquire_policy_for(self, ppr) -> bool:
"""Populate Redis with policy config
:param chapps.outbound.OutboundPPR ppr: a Postfix payload
:returns: whether the policy allows `ppr`
:rtype: bool
Populates Redis and return the policy result for `ppr`.
"""
# logger.debug(f"acq pol for {ppr!r}")
with self._adapter_handle() as adapter:
allowed = adapter.check_domain_for_user(
ppr.user, self._get_sender_domain(ppr)
)
self._store_control_data(ppr, 1 if allowed else 0)
# logger.debug(
# f"RDBMS: policy {allowed!r} for {ppr.user} from domain"
# f" {self._get_sender_domain(ppr)}"
# )
if not allowed: # domain not allowed, check email
allowed = adapter.check_email_for_user(ppr.user, ppr.sender)
if allowed is not None:
self._store_email_control_data(ppr, 1 if allowed else 0)
# logger.debug(
# f"RDBMS: policy {allowed!r} for {ppr.user} as"
# f" {ppr.sender}"
# )
return allowed
# This is the main purpose of the class, to answer this question
def _approve_policy_request(self, ppr: OutboundPPR) -> bool:
"""Returns true if `ppr` represents an authorized email
:param chapps.outbound.OutboundPPR ppr: a Postfix payload
:returns: whether the email represented by `ppr` should be
transmitted
:rtype: bool
Given a PPR, say whether this user is allowed to send as the
apparent sender domain or address. Memoize result in the
instance cache.
"""
result = self._get_control_data(ppr)
if result is None:
result = self.acquire_policy_for(ppr)
# logger.debug(f"Obtained {result!r} from RDBMS.")
else:
# logger.debug(f"Returning {result!r} from Redis.")
pass
return bool(int(result))
def _decode_policy_cache(self, result) -> SDAStatus:
"""Decode the {None, 0, 1} response from Redis into an Enum
:params int result: b'0', b'1', or None
:returns: an SDAStatus corresponding to `result`
:rtype: chapps.models.SDAStatus
The :class:`Enum` :class:`~chapps.models.SDAStatus` represents
these results as nonexistent, prohibited or authorized respectively.
"""
if result is not None:
result = int(result)
if result:
return SDAStatus.AUTH
else:
return SDAStatus.PROH
else:
return SDAStatus.NONE
# For the API -- inspect state
[docs] def check_policy_cache(self, user: str, domain: str) -> SDAStatus:
"""Check a particular policy cache entry for the API
:param str user: user-identifier
:param str domain: domain or email to check
:returns: the cached policy
:rtype: chapps.models.SDAStatus
"""
return self._decode_policy_cache(
self._detect_control_data(user, domain)
)
[docs] def clear_policy_cache(self, user: str, domain: str) -> SDAStatus:
"""Clear a specific policy cache entry
:param str user: user-identifier
:param str domain: domain or email to clear
:returns: the previous policy
:rtype: SDAStatus
"""
prev = self.check_policy_cache(user, domain)
if prev != SDAStatus.NONE:
self.redis.delete(self._sender_domain_key(user, domain))
return prev
[docs] def bulk_clear_policy_cache(
self,
users: List[str],
domains: List[str] = None,
emails: List[str] = None,
):
r"""Clear SDA policy cache
for **User**\ s x [**Domain**\ s + **User**\ s]
:param List[str] users: a list of user-identifiers
:param Optional[List[str]] domains: a list of domain names
:param Optional[List[str]] emails: a list of email addresses
:rtype: None
"""
# there seems to be no max pipeline size
# but if things get sketchy, we can chunk this
domains = domains or []
emails = emails or []
with self.redis.pipeline() as pipe:
for d in domains + emails:
for u in users:
# logger.debug(f"bulk_clear: erasing {u}:{d}")
pipe.delete(self._sender_domain_key(u, d))
pipe.execute()
pipe.reset()
[docs] def bulk_check_policy_cache(
self,
users: List[str],
domains: List[str] = None,
emails: List[str] = None,
) -> Dict[str, Dict[str, SDAStatus]]:
"""Map auth subject onto user status
:param List[str] users: a list of user-identifiers
:param Optional[List[str]] domains: a list of domain names
:param Optional[List[str]] emails: a list of email addresses
:returns: an auth subject => user => status map as described below
:rtype: Dict[str, Dict[str, SDAStatus]]
Builds a map keyed on auth subject (**Domain** and/or **Email**), full
of maps from username to status. It looks a bit like this:
.. code::python
bulk_check_result = {
'example.com': { 'user@example.com': SDAStatus.AUTH,
'terminated@example.com': SDAStatus.PROH,
},
'chapps.io': { 'user@example.com': SDAStatus.NONE,
'terminated@example.com': SDAStatus.NONE,
}
}
Mainly intended for use by the API.
"""
emails = emails or []
domains = domains or []
with self.redis.pipeline() as pipe:
for d in domains + emails:
for u in users:
# logger.debug(f"bcpc seeking SDA for {u} from {d}")
pipe.get(self._sender_domain_key(u, d))
results = deque(pipe.execute())
pipe.reset()
# logger.debug(f"bcpc results: {results!r}")
return {
d: {u: self._decode_policy_cache(results.popleft()) for u in users}
for d in domains + emails
}