Source code for ampel.abstract.AbsEventUnit

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# File:                Ampel-core/ampel/abstract/AbsEventUnit.py
# License:             BSD-3-Clause
# Author:              valery brinnel <firstname.lastname@gmail.com>
# Date:                04.02.2020
# Last Modified Date:  05.04.2023
# Last Modified By:    valery brinnel <firstname.lastname@gmail.com>

from typing import Any
from typing_extensions import Self
from collections.abc import Sequence
from ampel.types import ChannelId, OneOrMany, Traceless
from ampel.base.AmpelABC import AmpelABC
from ampel.base.decorator import abstractmethod, defaultmethod
from ampel.core.EventHandler import EventHandler
from ampel.core.ContextUnit import ContextUnit
from ampel.enum.EventCode import EventCode
from ampel.log.LogFlag import LogFlag
from ampel.log.utils import get_logger
from ampel.util.template import apply_templates


[docs] class AbsEventUnit(AmpelABC, ContextUnit, abstract=True): """ Base class for units orchestrating an ampel event. Ampel processes (from the ampel config) are the most common events. They define specifications of an event using a standardized model/schema: which subclass of this abstract class is to be run by a given controller at (a) given time(s) using a specific configuration. An ampel event must be registered and traceable and thus generates: - a new run-id - a new event document (in the event collection) - possibly Log entries (less is better) - new trace ids if not registered previously Subclassses of this class typically modify tier documents whereby the changes originate from results of logical unit(s) instantiated by this class. """ #: name of the associated process process_name: Traceless[str] #: hash of potentially underlying job schema job_sig: None | int = None #: channels associated with the process channel: None | OneOrMany[ChannelId] = None #: raise exceptions instead of catching and logging #: if True, troubles collection will not be populated if an exception occurs raise_exc: bool = False #: logging configuration to use, from the logging section of the Ampel config log_profile: str = "default" #: flag to apply to all log records created base_log_flag: LogFlag = LogFlag.SCHEDULED_RUN #: optional additional kwargs to pass to :class:`~ampel.mongo.update.var.DBLoggingHandler.DBLoggingHandler` db_handler_kwargs: None | dict[str, Any] = None #: Some subclasses allow for non-serializable input parameter out of convenience (jupyter notebooks). #: For example, an AlertSupplier instance can be passed as argument of an AlertConsumer. #: Unless a custom serialization is implemented, this disables the hashing of input parameters #: and thus the generation of "trace ids". #: Set provenance to False to ignore trace ids generation error which will be raised otherwise. provenance: bool = True
[docs] @defaultmethod def prepare(self, event_hdlr: EventHandler) -> None | EventCode: """ Gives units the opportunity to cancel the subsequent call to the proceed() method by returning EventCode.PRE_CHECK_EXIT (which avoids 'burning' a run id) """ return None
@abstractmethod(var_args=True) def proceed(self, event_hdlr: EventHandler) -> Any: ...
[docs] @classmethod def new(cls, templates: str | Sequence[str], **kwargs) -> Self: """ To use with jupyter when templating is wished for """ with get_logger(kwargs['context'].config, kwargs.get('log_profile', None)) as logger: return cls(**apply_templates(kwargs['context'], templates, kwargs, logger))
def run(self, event_hdlr: None | EventHandler = None) -> Any: if event_hdlr is None: event_hdlr = EventHandler( self.process_name, self.context.get_database(), job_sig = self.job_sig, raise_exc = self.raise_exc ) # Give units opportunity to cancel the run (T2 worker when no tickets are avail for example) pre_check = self.prepare(event_hdlr) if pre_check == EventCode.PRE_CHECK_EXIT: event_hdlr.set_code(pre_check) event_hdlr.register() return None event_hdlr.register( run_id = self.context.new_run_id() ) ret = None try: if ret := self.proceed(event_hdlr): event_hdlr.add_extra(ret=ret) except Exception as e: if self.raise_exc: raise e event_hdlr.code == EventCode.EXCEPTION # Set default event code if sub-class didn't customize it if event_hdlr.code is None: event_hdlr.code = EventCode.OK # Update duration and code in event doc event_hdlr.update() # Forward value returned by proceed() to caller return ret