Source code for chapps.config

"""
Operational Configuration
-------------------------

CHAPPS configures itself at the library level.  When it is first launched,
the library will create a config file for itself if it does not find one
at its default config path, `/etc/chapps/chapps.ini`, or the value of
the environment variable `CHAPPS_CONFIG` if it is set.  When it does,
default settings for all available submodules will be produced.

Any instance of CHAPPS requires the general CHAPPS settings, adapter settings,
and the Redis settings.  These control basic features of CHAPPS and tell it
how to access its brains.

Each service script most likely runs a unique policy handler.  If only one
service is being used, only the settings for the policies of that handler will
be needed, plus the ones mentioned above.

Policy handlers (from :py:mod:`chapps.switchboard`) take their listener
configuration from that of their related policy.  Each may each be configured
to use separate listening addresses and ports, so that they may run
simultaneously on the same server.

.. note::

    For multi-policy handlers, the settings used are taken from the first
    handler found to have config elements named `listen_address` and
    `listen_port`.  It is recommended to configure those elements only on one
    active policy, or to keep them in sync on all policies which are handled
    together.

"""
import collections.abc
import configparser
import dns.resolver
import dns.exception
from functools import cached_property
from pathlib import Path
from os import environ as env
from chapps.util import AttrDict, VenvDetector
from chapps._version import __version__
from chapps.signals import (
    HELOWLException,
    AddressDoesNotMatchDNS,
    NameDoesNotMatchPTR,
)
import logging
from typing import Union

try:
    from dotenv import load_dotenv

    load_dotenv()
except Exception:
    pass

logger = logging.getLogger(__name__)


[docs]class CHAPPSConfig: """The configuation object Mostly a wrapper around :py:mod:`configparser`, with the most commonly used portions wrapped in :py:mod:`chapps.util.AttrDict` """ configs = dict()
[docs] @classmethod def get_config(cls): cfg_fname = str(cls.what_config_file(VenvDetector().confpath)) if len(cls.configs) and cfg_fname in cls.configs: return cls.configs[cfg_fname] new_config = CHAPPSConfig() cls.configs[cfg_fname] = new_config return new_config
# ultimately, we may need also to allow for a command-line option
[docs] @staticmethod def what_config_file( default_pathname: str = "/etc/chapps/chapps.ini", ) -> Path: """Determine what config file to read. This is to allow for easier addition of a command-line option. Also encapsulates search for possible file pointed to by the environment setting `CHAPPS_CONFIG` """ config_file = Path(env.get("CHAPPS_CONFIG", default_pathname)) return config_file
[docs] @staticmethod def setup_config( cp: configparser.ConfigParser, ) -> configparser.ConfigParser: """Setup default config pattern on the parser passed in :param configparser.ConfigParser cp: a :py:class:`configparser.ConfigParser` instance to hold the default config This routine establishes the default configuration. It returns the same object which was passed to it. """ cp["CHAPPS"] = { "payload_encoding": "utf-8", "user_key": "sasl_username", "require_user_key": True, "no_user_key_response": "REJECT Rejected - Authentication failed", "password": ( "effda33d276c1d5649f3933a6d6b286e" "d7eaaede0b944221e7699553ce0558e2" ), } cp["PolicyConfigAdapter"] = { "adapter": "mariadb", "db_host": "localhost", "db_port": "3306", "db_name": "chapps", "db_user": "chapps", "db_pass": "chapps", } cp["Redis"] = { "sentinel_servers": "", "sentinel_dataset": "", "server": "localhost", "port": "6379", } cp["OutboundQuotaPolicy"] = { "listen_address": "localhost", "listen_port": 10225, "margin": 0.10, "min_delta": 0, "counting_recipients": True, "rejection_message": "REJECT Rejected - outbound quota fulfilled", "acceptance_message": "DUNNO", "null_sender_ok": False, } cp["GreylistingPolicy"] = { "listen_address": "localhost", "listen_port": 10226, "rejection_message": ( "DEFER_IF_PERMIT Service temporarily" " unavailable - greylisted" ), "acceptance_message": "DUNNO", "null_sender_ok": False, "whitelist_threshold": 10, } cp["SPFEnforcementPolicy"] = { "listen_address": "localhost", "listen_port": 10227, "whitelist": [], "null_sender_ok": False, } cp["PostfixSPFActions"] = { "passing": "prepend", "fail": "550 5.7.1 SPF check failed: {reason}", "temperror": "451 4.4.3 SPF record(s) temporarily unavailable: {reason}", "permerror": "550 5.5.2 SPF record(s) are malformed: {reason}", "none_neutral": "greylist", "softfail": "greylist", } cp["SenderDomainAuthPolicy"] = { "listen_address": "localhost", "listen_port": 10225, "rejection_message": "REJECT Rejected - not allowed to send mail from this domain", "acceptance_message": "DUNNO", "null_sender_ok": False, } return cp
[docs] @staticmethod def write_config(cp, fn) -> Path: """Write the ConfigParser contents to disk. :param configparser.ConfigParser cp: a ConfigParser object :param Union[str, pathlib.Path] fn: path of the config file to write If the location's parent directory does not exist, CHAPPS will attempt to create it. If CHAPPS can open the file, it writes the contents of `cp` into the file referred to by `fn`. Returns a :class:`pathlib.Path` which points at the newly-written file. """ config_file = Path(fn) if not config_file.parent.exists(): try: # attempt to make any missing parent directories config_file.parent.mkdir(0o777, True) except OSError as e: logger.error( "The specified config file's directory did not exist and" f" could not be created. File: {str(config_file)}" ) raise e # possibly this should not be re-raised with config_file.open("w") as fh: cp.write(fh) return config_file
[docs] def __init__(self): """Setup a new CHAPPSConfig instance This routine does a bunch of different setup to provide more than just the on-disk configuration to the running instance. It provides the path to the config file that was used to configure the session. It also provides a symbol to refer to the version number. It creates a :mod:`chapps.util.VenvDetector` in order to set up the path to the README-API.md file. This could also be used to change the default config location when running in a `venv`. That would eliminate the need for prefacing commands with a phrase setting the config-file location. It causes a config file full of defaults to be written to disk if it does not find a file to read. If it does find a file, it uses the settings from that file to overlay the defaults already set up on the config object. It is for this reason that an API method is provided to refresh the file on disk with any new settings which might have been introduced since the software was last configured. """ # Create and initialize the config self.venvdetector = VenvDetector() config_file = CHAPPSConfig.what_config_file(self.venvdetector.confpath) self.configparser = configparser.ConfigParser(interpolation=None) CHAPPSConfig.setup_config(self.configparser) # if this is a real run and not a doc build... if not self.venvdetector.sb: # Initialize a config file if none if not config_file.exists(): logger.debug("Writing new config file " + str(config_file)) CHAPPSConfig.write_config(self.configparser, config_file) else: logger.debug("Reading from config file " + str(config_file)) self.configparser.read(str(config_file)) self.configparser["CHAPPS"]["config_file"] = str(config_file) self.configparser["CHAPPS"]["version"] = f"CHAPPS v{__version__}" self.configparser["CHAPPS"]["docpath"] = str(self.venvdetector.docpath) self.chapps = AttrDict(self.configparser["CHAPPS"]) self.adapter = AttrDict(self.configparser["PolicyConfigAdapter"]) self.actions_spf = AttrDict(self.configparser["PostfixSPFActions"]) self.redis = AttrDict(self.configparser["Redis"]) # these are somewhat obsolete now self.policy_oqp = AttrDict(self.configparser["OutboundQuotaPolicy"]) self.policy_sda = AttrDict(self.configparser["SenderDomainAuthPolicy"]) self.policy_grl = AttrDict(self.configparser["GreylistingPolicy"]) self.policy_spf = AttrDict(self.configparser["SPFEnforcementPolicy"]) if not self.venvdetector.sb: logger.debug("Returning config built from " + str(config_file))
[docs] def get_block(self, blockname) -> AttrDict: """Attempt to get a top-level block of the config as an AttrDict. :param str blockname: the name of the block Return `None` if it cannot be found. """ try: return AttrDict(self.configparser[blockname]) except Exception: pass return None
[docs] def write(self, location: Union[str, Path] = None): """Write the current config to disk. :param Union[str,pathlib.Path] location: where to write the file Currently manages exceptions for elements of the `[CHAPPS]` section of the config, for parameters which are not specified in the file. It removes them from the config, writes the file, then restores their values. These are: * the config file itself (as a :class:`Path`) * the CHAPPS version * the READMEs' directory (as a :class:`Path`) """ location = Path(location or self.chapps.config_file) config_file = self.chapps.config_file version = self.chapps.version docpath = self.chapps.docpath self.configparser.remove_option("CHAPPS", "config_file") self.configparser.remove_option("CHAPPS", "version") self.configparser.remove_option("CHAPPS", "docpath") result = CHAPPSConfig.write_config(self.configparser, location) if config_file: self.configparser["CHAPPS"]["config_file"] = config_file self.configparser["CHAPPS"]["version"] = version self.configparser["CHAPPS"]["docpath"] = docpath return result
@cached_property def helo_whitelist(self): if "helo_whitelist" not in self.chapps: return {} return { k: v for k, v in self._parse_whitelist(self.chapps.helo_whitelist) } @staticmethod def _parse_whitelist(speclist): loopback = "127.0.0.1" for spec in speclist.split(";"): name, src_ip = spec, None if ":" in spec: name, src_ip = spec.split(":", 1) if src_ip and src_ip == loopback: yield (name, src_ip) continue try: lookup = dns.resolver.resolve(name) dns_ip = str(lookup[0]) if src_ip: if src_ip != dns_ip: raise AddressDoesNotMatchDNS( f"source {src_ip} != {dns_ip} from DNS for {name}" ) else: src_ip = dns_ip ptr = dns.resolver.resolve_address(src_ip) ptr_name = str(ptr[0]) if name + "." != ptr_name: raise NameDoesNotMatchPTR( f"name {name}. != {ptr_name} " f"from DNS (PTR for {src_ip})" ) yield (name, src_ip) except HELOWLException as e: logger.exception(e) pass except dns.exception.DNSException as e: logger.exception(e) pass