Ampel-core

The actual implementation of the Ampel framework

Processing units

Base classes for processing units

class ampel.core.ContextUnit.ContextUnit(**kwargs)[source]

Base class for units requiring a reference to an AmpelContext instance

class ampel.abstract.AbsEventUnit.AbsEventUnit(**kwargs)[source]

Base class for units orchestrating an ampel event. Ampel processes (from the ampel config) are the most common events. They define specifications of an event using a standardized model/schema: which subclass of this abstract class is to be run by a given controller at (a) given time(s) using a specific configuration.

An ampel event must be registered and traceable and thus generates: - a new run-id - a new event document (in the event collection) - possibly Log entries (less is better) - new trace ids if not registered previously

Subclassses of this class typically modify tier documents whereby the changes originate from results of logical unit(s) instantiated by this class.

process_name: Annotated[str]

name of the associated process

job_sig: Optional[int] = None

hash of potentially underlying job schema

channel: Optional[Union[Sequence[Union[int, str]], int, str]] = None

channels associated with the process

raise_exc: bool = False

raise exceptions instead of catching and logging if True, troubles collection will not be populated if an exception occurs

log_profile: str = 'default'

logging configuration to use, from the logging section of the Ampel config

base_log_flag: LogFlag = 32

flag to apply to all log records created

db_handler_kwargs: Optional[dict[str, Any]] = None

optional additional kwargs to pass to DBLoggingHandler

provenance: bool = True

Some subclasses allow for non-serializable input parameter out of convenience (jupyter notebooks). For example, an AlertSupplier instance can be passed as argument of an AlertConsumer. Unless a custom serialization is implemented, this disables the hashing of input parameters and thus the generation of “trace ids”. Set provenance to False to ignore trace ids generation error which will be raised otherwise.

prepare(event_hdlr)[source]

Gives units the opportunity to cancel the subsequent call to the proceed() method by returning EventCode.PRE_CHECK_EXIT (which avoids ‘burning’ a run id)

Return type:

Optional[EventCode]

classmethod new(templates, **kwargs)[source]

To use with jupyter when templating is wished for

Return type:

Self

Processor units

class ampel.t3.T3Processor.T3Processor(**kwargs)[source]

Bases: AbsEventUnit

channel: Optional[Union[int, str]] = None

channels associated with the process

supply: Annotated[UnitModel]

Unit must be a subclass of AbsT3Supplier

stage: Annotated[UnitModel]

Unit must be a subclass of AbsT3Stager

T3 machinery

Context and configuration

class ampel.core.AmpelContext.AmpelContext(config, db, loader, resource=None, admin_msg=None)[source]

This class is typically instantiated by Ampel controllers and provided to “processor unit” instances. Note: this class might in the future be capable of handling multiple AmpelConfig/AmpelDB instances

classmethod load(config, pwd_file_path=None, pwds=None, freeze_config=True, vault=None, one_db=False, **kwargs)[source]

Instantiates a new AmpelContext instance.

Return type:

AmpelContext

Parameters:
  • config (str | dict) – local path to an ampel config file (yaml or json) or loaded config as dict

  • pwd_file_path (Optional[str]) – path to a text file containing one password per line.

The underlying AmpelVault will be initialized with an AESecretProvider configured with these pwds. :type pwds: Optional[Iterable[str]] :param pwds: Same as ‘pwd_file_path’ except a list of passwords is provided directly via this parameter. :type freeze_config: bool :param freeze_config:

whether to convert the elements contained of ampel config into immutable structures (dict -> ReadOnlyDict, list -> tuple). Parameter does only apply if the config is loaded by this method, i.e if parameter ‘config’ is a str.

classmethod build(ignore_errors=False, pwd_file_path=None, pwds=None, freeze_config=True, verbose=False)[source]

Instantiates a new AmpelContext instance.

The required underlying ampel configuration (AmpelConfig) is built from scratch, meaning all available configurations defined in the various ampel repositories available locally are collected, merged merged together and morphed the info into the final ampel config. This is a convenience method, do not use for production

Return type:

AmpelContext

new_run_id()[source]

Return an identifier that can be used to associate log entries from a single process invocation. This ID is unique and monotonicaly increasing.

Return type:

int

get_config()[source]

Note

in the future, AmpelContext might hold references to multiple different config

Return type:

AmpelConfig

get_database()[source]

Note

in the future, this class might hold references to multiple different databases

Return type:

AmpelDB

activate_process(name, tier=None)[source]
Return type:

bool

Returns:

False if process was deactivated, True if not found

class ampel.core.UnitLoader.UnitLoader(config, db, provenance=True, vault=None)[source]
new_logical_unit(model, logger, *, sub_type=None, **kwargs)[source]

Logical units require logger and resource as init parameters, additionaly to the potentialy defined custom parameters which will be provided as a union of the model config and the kwargs provided to this method (the latter having prevalance) :raises: ValueError is the unit defined in the model is unknown

Return type:

Union[TypeVar(LT, bound= LogicalUnit), LogicalUnit]

new_safe_logical_unit(um, unit_type, logger, _chan=None)[source]

Returns a logical unit with dedicated logger containing no db handler

Return type:

TypeVar(LT, bound= LogicalUnit)

new_context_unit(model, context, *, sub_type=None, **kwargs)[source]

Context units require an AmpelContext instance as init parameters, additionaly to potentialy defined dedicated custom parameters. :raises: ValueError is the unit defined in the model is unknown

Return type:

Union[TypeVar(CT, bound= ContextUnit), ContextUnit]

new(model, *, unit_type=None, **kwargs)[source]

Instantiate new object based on provided model and kwargs. :param ‘unit_type’: performs isinstance check and raise error on mismatch. Enables mypy/other static checks. :rtype: Union[AmpelUnit, TypeVar(T, bound= AmpelUnit)] :returns: unit instance, trace id (0 if not computable)

get_class_by_name(name, unit_type=None)[source]

Matches the parameter ‘name’ with the unit definitions defined in the ampel_config. This allows to retrieve the corresponding fully qualified name of the class and to load it.

Parameters:

unit_type (Optional[type[TypeVar(T, bound= AmpelUnit)]]) –

  • LogicalUnit or any sublcass of LogicalUnit

  • ContextUnit or any sublcass of ContextUnit

  • If None (auxiliary class), returned object will have type[Any]

Raises:

ValueError if unit cannot be found or loaded or if parent class is unrecognized

Return type:

type[Union[TypeVar(T, bound= AmpelUnit), AmpelUnit]]

get_init_config(config=None, override=None, kwargs=None, unfreeze=True)[source]
Raises:

ValueError if config alias is not found

Return type:

dict[str, Any]

resolve_aliases(value)[source]

Recursively resolve aliases from config

resolve_secrets(unit_type, init_kwargs)[source]

Add a resolved Secret instance to init_kwargs for every Secret field of unit_type.

Return type:

dict[str, Any]

get_resources(model)[source]

Resources are defined using the static variable ‘require’ in ampel units -> example: catsHTM.default

Return type:

dict[str, Any]

validate_unit_models()[source]

Enable validation for UnitModel instances

Return type:

Iterator[None]

Data classes

Models

class ampel.model.operator.AnyOf.AnyOf(**kwargs)[source]
any_of: list[Union[TypeVar(T), AllOf]]

Select items by logical OR

model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'extra': 'forbid', 'populate_by_name': True, 'validate_default': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'any_of': FieldInfo(annotation=list[Union[~T, AllOf]], required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

class ampel.model.operator.AllOf.AllOf(**data)[source]
all_of: list[TypeVar(T)]

Select items by logical AND

model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'extra': 'forbid', 'populate_by_name': True, 'validate_default': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'all_of': FieldInfo(annotation=list[~T], required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

class ampel.model.operator.OneOf.OneOf(**data)[source]
model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'extra': 'forbid', 'populate_by_name': True, 'validate_default': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'one_of': FieldInfo(annotation=list[~T], required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

class ampel.model.time.TimeConstraintModel.TimeConstraintModel(**data)[source]

Examples:

TimeConstraintModel(
    **{
        "after": {
            "match_type": "time_delta",
            "days": -1
        },
        "before": {
            "match_type": "time_string",
            "dateTimeStr": "21/11/06 16:30",
            "dateTimeFormat": "%d/%m/%y %H:%M"
        }
    }
)
TimeConstraintModel(
    **{
        "after": {
            "match_type": "time_last_run",
            "name": "val_test"
        },
        "before": {
            "match_type": "unix_time",
            "value": 1531306299
        }
    }
)
before: Optional[Union[TimeDeltaModel, TimeLastRunModel, TimeStringModel, UnixTimeModel]]
after: Optional[Union[TimeDeltaModel, TimeLastRunModel, TimeStringModel, UnixTimeModel]]
get_query_model(**kwargs)[source]

Call this method with db=<instance of AmpelDB> if your time constraint is based on TimeLastRunModel

Return type:

Optional[QueryTimeModel]

model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'extra': 'forbid', 'populate_by_name': True, 'validate_default': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'after': FieldInfo(annotation=Union[NoneType, TimeDeltaModel, TimeLastRunModel, TimeStringModel, UnixTimeModel], required=False), 'before': FieldInfo(annotation=Union[NoneType, TimeDeltaModel, TimeLastRunModel, TimeStringModel, UnixTimeModel], required=False)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

class ampel.model.time.TimeDeltaModel.TimeDeltaModel(**data)[source]
match_type: Literal['time_delta']
days: int
seconds: int
microseconds: int
milliseconds: int
minutes: int
hours: int
weeks: int
get_timestamp(**kwargs)[source]
Return type:

float

model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'extra': 'forbid', 'populate_by_name': True, 'validate_default': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'days': FieldInfo(annotation=int, required=False, default=0), 'hours': FieldInfo(annotation=int, required=False, default=0), 'match_type': FieldInfo(annotation=Literal['time_delta'], required=True), 'microseconds': FieldInfo(annotation=int, required=False, default=0), 'milliseconds': FieldInfo(annotation=int, required=False, default=0), 'minutes': FieldInfo(annotation=int, required=False, default=0), 'seconds': FieldInfo(annotation=int, required=False, default=0), 'weeks': FieldInfo(annotation=int, required=False, default=0)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

class ampel.model.time.TimeLastRunModel.TimeLastRunModel(**data)[source]
match_type: Literal['time_last_run']
process_name: str
fallback: Optional[dict]
require_success: bool
get_timestamp(**kwargs)[source]
Return type:

Optional[float]

model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'extra': 'forbid', 'populate_by_name': True, 'validate_default': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'fallback': FieldInfo(annotation=Union[NoneType, dict], required=False, default={'days': -1}), 'match_type': FieldInfo(annotation=Literal['time_last_run'], required=True), 'process_name': FieldInfo(annotation=str, required=True), 'require_success': FieldInfo(annotation=bool, required=False, default=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

class ampel.model.time.TimeStringModel.TimeStringModel(**data)[source]
match_type: Literal['time_string']
dateTimeStr: str
dateTimeFormat: str
get_timestamp(**kwargs)[source]
Return type:

float

model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'extra': 'forbid', 'populate_by_name': True, 'validate_default': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'dateTimeFormat': FieldInfo(annotation=str, required=True), 'dateTimeStr': FieldInfo(annotation=str, required=True), 'match_type': FieldInfo(annotation=Literal['time_string'], required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

class ampel.model.time.UnixTimeModel.UnixTimeModel(**data)[source]
match_type: Literal['unix_time']
value: int
get_timestamp(**kwargs)[source]
Return type:

int

model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'extra': 'forbid', 'populate_by_name': True, 'validate_default': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'match_type': FieldInfo(annotation=Literal['unix_time'], required=True), 'value': FieldInfo(annotation=int, required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

class ampel.model.t3.LoaderDirective.LoaderDirective(**data)[source]

Specification of documents to load

col: Literal['stock', 't0', 't1', 't2']

Source collection

query_complement: Optional[dict[str, Any]]

Mongo match expression to include in the query

resolve_config: bool

whether to replace init config integer hash with ‘resolved’ config dict

excluding_query: bool

whether an emtpy find() result should discard entirely the associated stock for further processing

model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'extra': 'forbid', 'populate_by_name': True, 'validate_default': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'col': FieldInfo(annotation=Literal['stock', 't0', 't1', 't2'], required=True), 'excluding_query': FieldInfo(annotation=bool, required=False, default=False), 'query_complement': FieldInfo(annotation=Union[NoneType, dict[str, Any]], required=False), 'resolve_config': FieldInfo(annotation=bool, required=False, default=False)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

class ampel.model.t3.T2FilterModel.T2FilterModel(**data)[source]
unit: str

name of T2 unit

match: dict[str, Any]

Mongo match expression to apply to most recent result

model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'extra': 'forbid', 'populate_by_name': True, 'validate_default': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'match': FieldInfo(annotation=dict[str, Any], required=True), 'unit': FieldInfo(annotation=str, required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

class ampel.model.ingest.IngestDirective.IngestDirective(**data)[source]

channel filter ingest

stock_t2 point_t2 <- based on input dps list combine <- same

state_t2 <- based on dps list returned by combine point_t2 <- same

mux <- based on input dps list
insert

point_t2 <- based on <datapoints insert> result from muxer (first list)

combine

state_t2 <- based on <datapoints combine> result from muxer (second list) point_t2 <- same

channel: Union[int, str]

Channel for which to create documents

filter: Optional[FilterModel]

Potientially filter input datapoints

model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'extra': 'forbid', 'populate_by_name': True, 'validate_default': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'channel': FieldInfo(annotation=Union[int, str], required=True), 'filter': FieldInfo(annotation=Union[NoneType, FilterModel], required=False), 'ingest': FieldInfo(annotation=IngestBody, required=False, default=IngestBody(stock_t2=None, point_t2=None, combine=None, mux=None))}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

Templates

Odds and ends

class ampel.log.AmpelLogger.AmpelLogger(name=0, base_flag=None, handlers=None, channel=None, console=True)[source]
classmethod get_logger(name=None, force_refresh=False, **kwargs)[source]

Creates or returns an instance of AmpelLogger that is registered in static dict ‘loggers’ using the provided name as key. If a logger with the given name already exists, the existing logger instance is returned. If name is None, unique (int) name will be generated :param **kwargs: passed to constructor

Typical use:

logger = AmpelLogger.get_logger()
Return type:

AmpelLogger

class ampel.core.AmpelRegister.AmpelRegister(autoload=True, **kwargs)[source]

# noqa: E101

General notes:

An ampel register file is made of three parts: signature, header and content

1) The file signature is encoded in the first 11 bytes. The signature contains the header’s length and size (3 bytes each)

2) A BSON encoded dict is saved in the next x bytes, refered to as the file’s header. The header can be updated independently of the register’s content. The size of the header is customizable during file creation.

3) The register’s content is usually a compressed structure (zip, bzip or xz) which stores information in binary format. Compression can be turned off if so whished. (The size of a register containing 10 millions alert ids [8 bytes] and filter return code [1 byte] can be reduced from ~90MB to ~50MB using gzip with default settings).

Properties:

  • Registers can be re-opened and appended

  • Header content can be accessed or updated independently of the register’s content.

  • Header updates are fast if enough space was reserved for updates in the first place.

  • Header size can be increased afterwards at the cost of having to rewrite the entire file once. This happens automatically when needed.

  • Logging read and write access to the register content into the register header is supported

  • Registers can be capped (based on max content length or max number of run ids). Once the limit is reached, the full register is renamed and a new one is created.

Note: the module ampel.util.register contains tools to manually change ampel registers, such as: get_header_content, open_file_and_write_header and rescale_header

Target file:

param path_base:

the base folder path where to create/read register files.

param path_extra:

an optional additional folder path (or paths) to be appended to path_base.

param full_path:

when provided, all path building options will be ignored and a new file handle will be opened using the provided full_path

param file_handle:

if you provide a file handle to a register file (parameter file handle), it will be used and all path options will be ignored.

param file_cap:

registers can be capped, typically based on a maximum number of blocks. When access to a full register is requested, the original file is renamed and a new register is created. During files renames, an index is appended (suffix) to the original file name. The current (newest) register file always comes without index suffix. This index is saved into the current register header.

Example:

ampel_register.bin.gz (current - newest)
ampel_register.bin.gz.1 (first renamed register - oldest)
ampel_register.bin.gz.2
ampel_register.bin.gz.3 (second newest)

Note1: file renames occur during the opening procedure of registers, no check is performed during the filling of registers once a register is opened. A register can thus grow beyond the defined limit as long as a process keeps it open.

Errors:

raises:

ValueError, FileNotFoundError

  • if neither file_handle, full_path, path_base exist.

  • if read access to a non-existing file is requested

  • if the target file is not a register file (existing empty files are ok)

  • if the ‘struct’ parameter of sub-class of this class differs with the values registered in the file’s header (this behavior can be deactivated from parameter on_exist_check)

  • during file renames (in case capped registers are used) if the target file already exists

path_base: Optional[str]

save files in <path_base>/<file>

path_extra: Optional[list[str]]

save files in <path_base>/<path_extra(s)>/<file>

file_prefix: Optional[str]

prefix for each file

path_full: Optional[str]

ignore all other path options and use this file path

file_handle: Optional[BinaryIO]

use existing file handle

compression: Optional[Literal['gz', 'bz2', 'xz']] = 'gz'

compression scheme

compression_level: Optional[int]

compression level

check_rename(header)[source]

override if needed

Return type:

bool

onload_update_header()[source]

Override if you need to update the header of an existing register. Ex: BaseAlertRegister adds the current run id

Return type:

None

get_file_path()[source]
Raise:

errors if sub-directories cannot be created

Return type:

str

get_file_name()[source]

override if needed

Return type:

str

check_header(header)[source]
Raises:

ValueError is raised on mismatch between this instance value

and the header value for the provided key :rtype: None

register_file_access(header=None, use_this_time=None, new_blocks=None)[source]
Parameters:
  • header (Optional[dict[str, Any]]) – use provided header rather than self.header[‘payload’]

  • use_this_time (Optional[float]) – use provided time rather than time.time()

  • new_blocks (Optional[int]) – None (default) when register is opened, 0 or an integer when register is closed

Return type:

None

gen_new_header()[source]

Creates a new header and create instance variable self.header to reference it. :rtype: bytes :returns: bson encoded bytes representing the generated header :raises: ValueError if the generated header size exceeds user-provided bound parameters

close(close_outer_fh=True, update_header=True)[source]
Parameters:
  • close_outer_fh (bool) – whether principal file handle should be closed

  • update_header (bool) – possible overidde of default settings

(aimed for admins working with command line)