Source code for chapps.rest.routers.live

"""
Routes for live access to CHAPPS state
--------------------------------------

Please note that documentation about API routes is best
viewed in the OpenAPI docs generated by FastAPI_ when
the API is launched.

"""
from typing import List, Optional
from fastapi import status, APIRouter, Body, HTTPException
from sqlalchemy.orm import sessionmaker
from chapps.rest.routers.common import (
    load_model_with_assoc,
    load_models_with_assoc,
)
from chapps.dbsession import sql_engine
from chapps.models import (
    User,
    Domain,
    Email,
    LiveQuotaResp,
    TextResp,
    TimeResp,
    InstanceTimesResp,
    SourceUserMapResp,
    BulkQuotaResp,
    DeleteResp,
    user_quota_assoc,
)
from chapps.policy import (
    OutboundQuotaPolicy,
    SenderDomainAuthPolicy,
    GreylistingPolicy,
)
from chapps.spf_policy import SPFEnforcementPolicy
from chapps.config import CHAPPSConfig
from chapps.util import hash_password
import logging

logger = logging.getLogger(__name__)
config = CHAPPSConfig.get_config()
Session = sessionmaker(sql_engine)

api = APIRouter(
    prefix="/live",
    tags=["live"],
    responses={
        status.HTTP_404_NOT_FOUND: {"description": "Resource not found."},
        status.HTTP_400_BAD_REQUEST: {
            "description": ("The request was missing some required data.")
        },
    },
)
"""The live routes API router"""

# define some useful functions
load_user_with_quota = load_model_with_assoc(User, assoc=[user_quota_assoc])
r"""Create a `load_user_with_quota` function, using the factory.

:param int item_id: ID of user to load, or 0 if loading by name

:param Optiona[str] name: exact name of **User** to load, if `item_id` is zero

:returns: a tuple of (**User**, dict(quota=\ **Quota**), [remarks,...])

Because the factory is meant to work with any number of associations, the
return format is a little clunky for this case but it seems a small price to
pay to avoid code duplication.

Also note that `FastAPI`_ interprets the docstrings of the coroutines defined
in this module, in order to produce the live API documentation.  As a result,
they have been kept simpler than in some other modules.

"""

load_users_with_quota = load_models_with_assoc(User, assoc=user_quota_assoc)
"""Create a username to quota id mapping function

:param item_ids: list of **User** ids to associate to quotas

:returns: a list of user ORM objects with associated quotas

"""


[docs]@api.get("/quota/", response_model=BulkQuotaResp) async def get_bulk_quota_remaining(user_ids: List[int]) -> BulkQuotaResp: """Accepts a list of user ids. Returns a list of JSON objects with attributes named `user_name` and `quota_avail`, representing the remaining number of email transmissions available to that user at the moment the query was executed. """ remarks = [] response = [] oqp = OutboundQuotaPolicy() uqm = load_users_with_quota(user_ids) for user in uqm: avail, rmks = oqp.current_quota(user.name, user.quota) response.append(dict(user_name=user.name, quota_avail=avail)) remarks.extend(rmks) return BulkQuotaResp.send(response, remarks=remarks)
[docs]@api.get("/quota/{user_id}", response_model=LiveQuotaResp) async def get_current_quota_remaining_for_user( user_id: int = 0, name: Optional[str] = Body(None) ) -> LiveQuotaResp: """Accepts the id of the user whose remaining quota should be checked. Returns the instantaneous number of available send attempts in `response` """ user, assoc_d, remarks = load_user_with_quota(user_id, name) quota = assoc_d[user_quota_assoc.assoc_name] oqp = OutboundQuotaPolicy() response, more_remarks = oqp.current_quota(user.name, quota) return LiveQuotaResp.send(response, remarks=remarks + more_remarks)
[docs]@api.delete("/quota/{user_id}", response_model=LiveQuotaResp) async def reset_live_quota_for_user( user_id: int = 0, name: Optional[str] = Body(None) ) -> LiveQuotaResp: """Accepts the id of the user whose quota should be reset. Returns the number of send attempts dropped in `response` """ user, assoc_d, remarks = load_user_with_quota(user_id, name) quota = assoc_d[user_quota_assoc.assoc_name] oqp = OutboundQuotaPolicy() response, more_remarks = oqp.reset_quota(user.name) if not quota: remarks.append(f"User {user.name} has no assigned quota.") logger.info(" ".join(more_remarks)) # log only reset message return LiveQuotaResp.send(response, remarks=remarks + more_remarks)
[docs]@api.post("/quota/{user_id}", response_model=LiveQuotaResp) async def refresh_quota_policy_for_user( user_id: int = 0, name: Optional[str] = Body(None) ) -> LiveQuotaResp: """ Accepts the id of the user whose quota policy should be refreshed. Returns the new remaining quota after the policy update in `response` """ user, assoc_d, remarks = load_user_with_quota(user_id, name) quota = assoc_d[user_quota_assoc.assoc_name] oqp = OutboundQuotaPolicy() if not quota: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Cannot load quota for user {user.name}: none assigned.", ) remarks.append(f"Quota policy config cache reset for {user.name}") response, more_remarks = oqp.refresh_policy_cache(user.name, quota) return LiveQuotaResp.send(response, remarks=remarks + more_remarks)
[docs]@api.post( "/config/write/", response_model=TextResp, responses={ status.HTTP_401_UNAUTHORIZED: { "description": "Password does not match." } }, ) async def refresh_config_on_disk(passcode: str = Body(...)) -> TextResp: """Writes the current effective config to disk. Requires the CHAPPS password to be provided as the body. If line transmission security is an issue an SSL proxy layer will be required. This is true for the entire application. """ if ( hash_password(passcode, config.chapps.payload_encoding) == config.chapps.password ): response = config.write() return TextResp.send(str(response)) raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Password does not match.", )
[docs]@api.get("/sda/", response_model=SourceUserMapResp) async def sda_batch_peek( user_ids: List[int], domain_ids: List[int] = [], email_ids: List[int] = [] ) -> SourceUserMapResp: """Accepts `domain_ids`, `email_ids` and `user_ids` as body arguments: lists of integer object ids. Looks at current authorizations for all domain-user combinations. Returns their cache status as a dict of dicts, keyed as: `sda[domain-or-email][user] = SDAStatus` """ sda = SenderDomainAuthPolicy() with Session() as sess: if domain_ids: domain_names = list( sess.scalars(Domain.select_names_by_id(domain_ids)) ) else: domain_names = [] if email_ids: email_names = list( sess.scalars(Email.select_names_by_id(email_ids)) ) else: email_names = [] if not domain_names and not email_names: raise HTTPException( status=status.HTTP_400_BAD_REQUEST, detail="No domains or emails were specified.", ) user_names = list(sess.scalars(User.select_names_by_id(user_ids))) return SourceUserMapResp.send( sda.bulk_check_policy_cache(user_names, domain_names, email_names) )
[docs]@api.get("/sda/{source_name}/for/{user_name}", response_model=TextResp) async def sda_peek(source_name: str, user_name: str) -> TextResp: """Accepts url-encoded domain name or email address and user name as path arguments. Returns status of cached SDA for the named user and source, i.e. is this user allowed to transmit email apparently from this domain or email address """ sda = SenderDomainAuthPolicy() result = sda.check_policy_cache(user_name, source_name) # logger.debug(f"Peeking at {source_name} auth for {user_name}: {result!r}") return TextResp.send(result)
[docs]@api.delete("/sda/", response_model=TextResp) async def sda_batch_clear( user_ids: List[int], domain_ids: List[int] = [], email_ids: List[int] = [] ) -> TextResp: """ Clears all source - user mappings by iterating through both lists: users and domains+emails. """ sda = SenderDomainAuthPolicy() with Session() as sess: if domain_ids: domain_names = list( sess.scalars(Domain.select_names_by_id(domain_ids)) ) else: domain_names = [] if email_ids: email_names = list( sess.scalars(Email.select_names_by_id(email_ids)) ) else: email_names = [] if not domain_names and not email_names: raise HTTPException( status=status.HTTP_400_BAD_REQUEST, detail="No domains or emails specified.", ) user_names = list(sess.scalars(User.select_names_by_id(user_ids))) sda.bulk_clear_policy_cache(user_names, domain_names, email_names) return TextResp.send( "SDA cache cleared for specified domains and/or emails x users." )
[docs]@api.delete("/sda/{source_name}/for/{user_name}", response_model=TextResp) async def sda_clear(source_name: str, user_name: str) -> TextResp: """ Accepts url-encoded domain name or email address and user name of SDA to clear. Returns the status of the SDA prior to clearing. """ sda = SenderDomainAuthPolicy() return TextResp.send(sda.clear_policy_cache(user_name, source_name))
[docs]@api.get("/grl/tuple/", response_model=TimeResp) async def grl_peek_tuple( client_address: str = Body(...), sender: str = Body(...), recipient: str = Body(...), ): """ Accepts client IP address, sender email address and recipient email address as required arguments in the request body. Returns a float which is the UNIX epoch time of the last time that tuple was encountered by greylisting. """ grl = GreylistingPolicy() try: timestamp = float( grl.redis.get(grl._tuple_key(client_address, sender, recipient)) ) except TypeError as e: return TimeResp.send(0.0) return TimeResp.send(timestamp)
[docs]@api.get("/grl/tally/{client_address}", response_model=InstanceTimesResp) async def grl_list_tally(client_address: str): """Accepts the client IP address as the path argument. Returns a list of instance IDs and their timestamps as floats in UNIX epoch time (UTC). In the standard time library, `localtime()` will convert them to a time struct in local time based on locale, or `gmtime()` will convert them to a struct in UTC. Then `strftime()` may be used to format them for a human to read. """ grl = GreylistingPolicy() tally = grl.redis.zrange( grl._client_key(client_address), 0, -1, withscores=True ) tally_decoded = [] if tally: tally_decoded = [(i.decode("utf-8"), float(t)) for i, t in tally] return InstanceTimesResp.send(tally_decoded)
[docs]@api.delete("/grl/tally/{client_address}", response_model=DeleteResp) async def grl_clear_tally(client_address: str): """ Accepts the client IP address as the path argument. Returns "deleted" if successful. """ grl = GreylistingPolicy() grl.redis.delete(grl._client_key(client_address)) return DeleteResp.send()
[docs]@api.delete("/grl/option_cache/{recipient_domain}", response_model=DeleteResp) async def grl_clear_option_cache(recipient_domain: str): """ Accepts a domain name as the path argument. Returns 'deleted' on success. """ grl = GreylistingPolicy() grl.redis.delete(grl._domain_option_key(recipient_domain)) return DeleteResp.send()
[docs]@api.delete("/spf/option_cache/{recipient_domain}", response_model=DeleteResp) async def spf_clear_option_cache(recipient_domain: str): """ Accepts a domain name as the path argument. Returns 'deleted' on success. """ spf = SPFEnforcementPolicy() spf.redis.delete(spf._domain_option_key(recipient_domain)) return DeleteResp.send()