Source code for chapps.util

"""
Utility classes
---------------

Since CHAPPS mainly deals with policy requests coming from Postfix,
there is a utility object for representing them in a way which makes
the code easier to read while also providing access optimizations.

There is another utility class for providing the configuration
data via an object which presents dictionary keys as attributes.

In order to create different defaults and access documentation files,
an object is provided which detects whether the library is running
within a virtual environment, and serves as a source of local paths
to package resources.

.. todo::

  add Postfix command class, to store action output along with
  status information.

"""
from collections.abc import Mapping
import re
import logging
import sys
import hashlib
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
from chapps.signals import TooManyAtsException, NotAnEmailAddressException

logger = logging.getLogger(__name__)


[docs]def hash_password(password: str, encoding: str = "utf-8") -> str: return hashlib.sha256(password.encode(encoding)).hexdigest()
[docs]class VenvDetector: """Detect use of a virtual environment and calculate local paths The detector encapsulates the job of determining whether a virtual environment is activated, and assists in composing some important paths in order to locate certain files the application needs. One of those files is Markdown source which is imported into the live API documentation. Another is the config file. Instance attributes: :datapath: :class:`~pathlib.Path` pointing at location where data files are installed :ve: :obj:`bool` indicating whether a virtual environment is active :docpath: :class:`~pathlib.Path` pointing at the location of the Markdown :confpath: :class:`~pathlib.Path` pointing at the config file :venvpath: :class:`~pathlib.Path` to the root of the active virtual environment, or None if none is active """
[docs] def __init__(self, *, datapath: Optional[Union[str, Path]] = None): """Detect virtual environments and provide local package paths :param datapath: optional override to point at the data installation path of the package (or a surrogate). The API uses this value to load the API readme into the live docs. """ if datapath: self.datapath = Path(datapath) elif self.ve: self.datapath = Path(sys.prefix) else: self.datapath = Path("/usr/local")
# find the base prefix; hopefully pyenv-compatible
[docs] def get_base_prefix_compat(self) -> str: """Return the non-virtual base prefix Sometimes called `sys.real_prefix`, so we check for both. :returns: the base path prefix :rtype: str """ return ( getattr(sys, "base_prefix", None) or getattr(sys, "real_prefix", None) or sys.prefix )
# return whether we are in a venv
[docs] def in_virtualenv(self) -> bool: """Compare prefixes to determine if a virtual environment is active. :returns: true if a virtual environment is active, otherwise False :rtype: bool """ return self.get_base_prefix_compat() != sys.prefix
# return whether a Sphinx build launched the library
[docs] def sphinx_build(self) -> bool: """Determine whether invoked by Sphinx :returns: `True` if Sphinx invoked the library """ try: if __sphinx_build__: return True except NameError: return False
@property def ve(self) -> bool: """Property which memoizes :meth:`~.in_virtualenv`""" if "_ve" not in vars(self): self._ve = self.in_virtualenv() return self._ve @property def sb(self) -> bool: """Property which memoizes :meth:`~.sphinx_build`""" if "_sb" not in vars(self): self._sb = self.sphinx_build() return self._sb @property def docpath(self) -> Path: """Memoizes the documentation location :returns: a :class:`~pathlib.Path` pointing at the Markdown files' location :rtype: pathlib.Path """ if "_docpath" not in vars(self): self._docpath = self.datapath / "chapps" return self._docpath @property def confpath(self) -> Path: """Memoizes the config file's full path :returns: a :class:`~pathlib.Path` pointing at the config file :rtype: pathlib.Path """ if "_confpath" not in vars(self): try: self._confpath = self.venvpath / "etc" / "chapps.ini" except TypeError: self._confpath = Path("/") / "etc" / "chapps" / "chapps.ini" return self._confpath @property def venvpath(self) -> Optional[Path]: """The virtual environment root, if any :returns: None or the value of :const:`sys.prefix` as a :class:`Path` :rtype: Optional[pathlib.Path] If no virtual environment is active, then `None` is returned, otherwise a :class:`Path` instance is returned, containing the path to the virtual environment. This hasn't been tested with all types of virtual environment. """ if self.ve: return Path(sys.prefix)
[docs]class AttrDict(Mapping): """Attribute Dictionary This simple class allows accessing the keys of a hash as attributes on an object. As a useful side effect it also casts floats, integers and booleans in advance. This object is used in :class:`~chapps.config.CHAPPSConfig` for holding the configuration data. .. note:: The purpose of this class is to map all the keys of a fairly small :obj:`dict` as attributes of the instance onto their values in the source dict. This class does not perform the same sort of lazy-loading as the :class:`~.PostfixPolicyRequest` class below; it pre-maps all the elements in the source :obj:`dict`. So be careful about passing large dictionaries to it. .. admonition:: Subclassing Given the stated purpose of the class, all *internal instance attributes*, i.e. ones not associated to a key-value pair in the source object, should begin with `_` (an underscore). """ boolean_pattern = re.compile("^[Tt]rue|[Ff]alse$") """A regex to detect text-string boolean values"""
[docs] def __init__( self, data: Dict[str, Any] = None, **kwargs: Optional[Dict[str, Any]] ): r"""Populate an instance with attributes :param data: a :obj:`dict` mapping strings (attribute names) onto arbitrary values :param kwargs: arbitrary keyword arguments If, and only if, `data` is not provided, then the keyword arguments will be used in place of data provided as a :obj:`dict`. .. todo:: add any `kwargs` to an existing `data` :obj:`dict` Henceforth whatever is rounded up to use shall be referred to as the `data`. The initialization routine creates an attribute on the instance for each key in the `data`, and then attempts to cast the value: 1. to an :obj:`int`. 2. If a :exc:`TypeError` is encountered, the unadulterated value is used. 3. If only :exc:`ValueError` is raised, then it is casted to :obj:`float` 4. if that causes another :exc:`ValueError` then it is matched against the :const:`.boolean_pattern` to see whether it matches, which is to say, whether it is a string containing "true" or "false". 5. If so, a simple check is conducted to determine whether the match was four characters long: `True`. 6. If it does not test positive for truth, it is considered to be `False`. 7. But if it wasn't a match for the :const:`.boolean_pattern` at all, then its original value is preserved. Generally, instances of this class are used to present a particular module, such as a policy manager, with its configuration in a form which can be dereferenced with dot notation. As such, values which cannot be casted to some other type are almost always left in their original form as `str`\ ings, because the `AttrDict` is being initialized with a sub-block of a :class:`~configparser.ConfigParser` as the source object, and its values will all be strings. """ if not data: data = kwargs for k, v in data.items(): if k[0:2] != "__": val = v try: val = int(v) except ValueError: try: val = float(v) except ValueError: m = self.boolean_pattern.match(v) if m: val = ( m.span(0)[1] == 4 ) # if the match is 4 chars long, it is True except TypeError: pass setattr(self, k, val)
def __iter__(self): return self.keys() def __len__(self) -> int: return len(self.keys()) def __getitem__(self, item) -> Optional[str]: return getattr(self, item) def __contains__(self, item): return item in self.__dict__
[docs] def keys(self): return self.__dict__.keys()
[docs]class PostfixPolicyRequest(Mapping): """Lazy-loading Policy Request Mapping Interface An implementation of :class:`~collections.abc.Mapping` which by default only processes and caches values from the data payload when they are accessed, to avoid a bunch of useless parsing. Instances may be dereferenced like hashes, but the keys are also attributes on the instance, so they can be accessed without brackets and quotation marks. Once parsed, results are memoized. For example, a payload might look a bit like this, when it is first received from Postfix and turned into an array of one string per line: .. code:: python payload = [ "request=smtpd_access_policy", "protocol_state=RCPT", "protocol_name=SMTP", "helo_name=helo.chapps.io", "queue_id=8045F2AB23", "sender=unauth@easydns.com", "recipient=bar@foo.tld", "recipient_count=0", "client_address=10.10.10.10", "client_name=mail.chapps.io", "reverse_client_name=mail.chapps.io", "instance=a483.61706bf9.17663.0", "sasl_method=plain", "sasl_username=somebody@chapps.io", "sasl_sender=", "size=12345", "ccert_subject=", "ccert_issuer=Caleb+20Cullen", "ccert_fingerprint=DE:AD:BE:EF:FE:ED:AD:DE:D0:A7:52:F3:C1:DA:6E:04", "encryption_protocol=TLSv1/SSLv3", "encryption_cipher=DHE-RSA-AES256-SHA", "encryption_keysize=256", "etrn_domain=", "stress=", "ccert_pubkey_fingerprint=68:B3:29:DA:98:93:E3:40:99:C7:D8:AD:5C:B9:C9:40", "client_port=1234", "policy_context=submission", "server_address=10.3.2.1", "server_port=54321", "", ] Refer to the `Postfix policy delegation documentation <http://www.postfix.org/SMTPD_POLICY_README.html>` for more information. As an example of the class's utility, and using the above definition of `payload`, consider: .. code:: python from chapps.util import PostfixPolicyRequest ppr = PostfixPolicyRequest(payload) # all the following are true: ppr.sender == 'unauth@easydns.com' ppr.sasl_username == 'somebody@chapps.io' ppr.client_address == '10.10.10.10' # demonstrating the pseudo-attribute: ppr.recipients == ['bar@foo.tld'] Instance attributes (apart from Postfix payload parameters): :_payload: the Postfix policy delegation request payload in string-per-line format :recipients: a pseudo-attribute of the policy request derived from the value of `recipient`, provided by Postfix, which may contain more than one comma-separated email address. For reasons unknown, Postfix always provides a `recipient_count` of 0 before the DATA phase, so we rely upon counting the email addresses directly. :_recipients: memoization attribute for :meth:`.recipients` .. admonition:: Subclassing Because the purpose of the class is to present the contents of the initial payload as attributes, all internal attributes are prefaced with an underscore. .. document private functions .. automethod:: __getattr__ """
[docs] def __init__(self, payload: List[str], *args, **kwargs): """Store the payload. :param List[str] payload: strings which are formatted as 'key=val', including an empty entry at the end. This routine discards the last element of the list and stores the rest as `self._payload`. """ self._payload = payload[0:-2]
# the main reason for this class: # find and memoize as attributes the values in the request payload # means we cannot test existence of attrs by getattr or self.<attr>
[docs] def __getattr__(self, attr: str) -> Optional[str]: """Overloaded in order to search for missing attributes in the payload :param str attr: the attribute which triggered this call :returns: the value found in the payload, or :obj:`None` :rtype: Optional[str] First, if the value of `attr` starts with an underscore, `None` is returned. No lines of the payload start with an underscore. This ensures that references to internal attributes of the class are not snarled up with the payload searches. Next, the payload is searched for the requested key-value pair, attempting to match `attr` against everything before the `=` sign. When a line is found, the contents after the `=` are stored as an attribute named `attr` (and so memoized), and the value is returned. Future attempts to obtain the value will encounter the attribute and not invoke :meth:`.__getattr__` again. A `DEBUG` level message is currently produced if no lines in the payload matched the requested payload data. No errors are produced if a nonexistent `attr` starting with `_` is encountered. """ if attr[0] == "_": # leading underscores do not occur in the payload return None line = next( (l for l in self._payload if attr == l.split("=")[0]), None ) if line: key, *values = line.split("=") value = "=".join(values) setattr(self, key, value) return value else: logger.debug(f"No lines in {self} matched {attr}.") return None
# Since the datastructure can function as a hash, provide optimization def __getitem__(self, key) -> Optional[str]: """ Getting an item should optimize the result via the attribute mechanism """ return getattr(self, key) # The datastructure is iterable, in case we need to enumerate the request def __iter__(self): """Return an iterable representing the mapping There should be few reasons to ever do this, though it comes in quite handy for testing. This routine memoizes the dict it creates and also stores all the keys as attributes for future accesses. """ if not getattr(self, "_mapping", None): self._mapping = { k: "=".join(vs) for k, *vs in [l.split("=") for l in self._payload] } # Since we end up parsing the entire payload # optimize it for future random access for k, v in self._mapping.items(): setattr(self, k, v) yield from self._mapping # The length of the PPR is considered to be the number of items stored def __len__(self) -> int: """Act like a dict and return the number of k,v pairs :returns: number of lines in the payload :rtype: int """ return len(self._payload) # Representations of PPR use their own class name, and otherwise dump # the payload, but not a list of attributes def __repr__(self) -> str: """Dump the data in _payload""" return "%s( %r )" % (self.__class__.__name__, self._payload) # In order to use memoization with PPRs, a hash function is required def __hash__(self): """Create a reliable hash for this PPR""" if "_hash" not in vars(self): self._hash = hash(f"{self.instance}:{self.queue_id}") return self._hash # This convenience property provides a list of recipient email addresses # since the line may contain more than one @property def recipients(self) -> List[str]: """Memoize recipients as a list :returns: a list of strings which are the email addresses of recipients :rtype: List[str] A convenience method to split the 'recipient' datum into comma-separated tokens for easier counting. """ if "_recipients" not in vars(self): self._recipients = ( self.recipient.split(",") if self.recipient and len(self.recipient) > 0 else [] ) return self._recipients
[docs] def domain_from(self, email_address: str) -> str: """Given an email address, return the domain part Raises meaningful errors if nonconforming conditions are encountered. """ parts = email_address.split("@") if len(parts) > 2: logger.info( "Found sender email with more than one at-sign: " f"sender={email_address} instance={self.instance} " f"parts={parts!r}" ) raise TooManyAtsException(f"{email_address}=>{parts!r}") elif len(parts) == 1: logger.info( "Found sender string without at-sign: " f"sender={email_address} instance={self.instance} " f"parts={parts!r}" ) raise NotAnEmailAddressException return parts[-1]
[docs] def helo_match(self, candidates: Dict[str, str]) -> bool: """HELO Whitelisting indicator Given a mapping of HELO name to expected IP address, indicate whether this PPR represents traffic from a whitelisted server. """ name = self.helo_name or self.client_name if not (candidates and name in candidates): return False if candidates[name] == self.client_address: return True return False