Ampel-core¶
The actual implementation of the Ampel framework
Processing units¶
Base classes for processing units¶
- class ampel.core.ContextUnit.ContextUnit(**kwargs)[source]¶
Base class for units requiring a reference to an AmpelContext instance
- class ampel.abstract.AbsEventUnit.AbsEventUnit(**kwargs)[source]¶
Base class for units orchestrating an ampel event. Ampel processes (from the ampel config) are the most common events. They define specifications of an event using a standardized model/schema: which subclass of this abstract class is to be run by a given controller at (a) given time(s) using a specific configuration.
An ampel event must be registered and traceable and thus generates: - a new run-id - a new event document (in the event collection) - possibly Log entries (less is better) - new trace ids if not registered previously
Subclassses of this class typically modify tier documents whereby the changes originate from results of logical unit(s) instantiated by this class.
-
channel:
Optional
[Union
[Sequence
[Union
[int
,str
]],int
,str
]] = None¶ channels associated with the process
-
raise_exc:
bool
= False¶ raise exceptions instead of catching and logging if True, troubles collection will not be populated if an exception occurs
-
log_profile:
str
= 'default'¶ logging configuration to use, from the logging section of the Ampel config
-
base_log_flag:
LogFlag
= 32¶ flag to apply to all log records created
-
db_handler_kwargs:
Optional
[dict
[str
,Any
]] = None¶ optional additional kwargs to pass to
DBLoggingHandler
-
provenance:
bool
= True¶ Some subclasses allow for non-serializable input parameter out of convenience (jupyter notebooks). For example, an AlertSupplier instance can be passed as argument of an AlertConsumer. Unless a custom serialization is implemented, this disables the hashing of input parameters and thus the generation of “trace ids”. Set provenance to False to ignore trace ids generation error which will be raised otherwise.
-
channel:
Processor units¶
T3 machinery¶
Context and configuration¶
- class ampel.core.AmpelContext.AmpelContext(config, db, loader, resource=None, admin_msg=None)[source]¶
This class is typically instantiated by Ampel controllers and provided to “processor unit” instances. Note: this class might in the future be capable of handling multiple AmpelConfig/AmpelDB instances
- classmethod load(config, pwd_file_path=None, pwds=None, freeze_config=True, vault=None, one_db=False, **kwargs)[source]¶
Instantiates a new AmpelContext instance.
- Return type:
- Parameters:
The underlying AmpelVault will be initialized with an AESecretProvider configured with these pwds. :type pwds:
Optional
[Iterable
[str
]] :param pwds: Same as ‘pwd_file_path’ except a list of passwords is provided directly via this parameter. :type freeze_config:bool
:param freeze_config:whether to convert the elements contained of ampel config into immutable structures (
dict
->ReadOnlyDict
,list
->tuple
). Parameter does only apply if the config is loaded by this method, i.e if parameter ‘config’ is a str.
- classmethod build(ignore_errors=False, pwd_file_path=None, pwds=None, freeze_config=True, verbose=False)[source]¶
Instantiates a new AmpelContext instance.
The required underlying ampel configuration (
AmpelConfig
) is built from scratch, meaning all available configurations defined in the various ampel repositories available locally are collected, merged merged together and morphed the info into the final ampel config. This is a convenience method, do not use for production- Return type:
- new_run_id()[source]¶
Return an identifier that can be used to associate log entries from a single process invocation. This ID is unique and monotonicaly increasing.
- Return type:
- get_config()[source]¶
Note
in the future, AmpelContext might hold references to multiple different config
- Return type:
- class ampel.core.UnitLoader.UnitLoader(config, db, provenance=True, vault=None)[source]¶
- new_logical_unit(model, logger, *, sub_type=None, **kwargs)[source]¶
Logical units require logger and resource as init parameters, additionaly to the potentialy defined custom parameters which will be provided as a union of the model config and the kwargs provided to this method (the latter having prevalance) :raises: ValueError is the unit defined in the model is unknown
- Return type:
Union
[TypeVar
(LT
, bound=LogicalUnit
),LogicalUnit
]
- new_safe_logical_unit(um, unit_type, logger, _chan=None)[source]¶
Returns a logical unit with dedicated logger containing no db handler
- Return type:
TypeVar
(LT
, bound=LogicalUnit
)
- new_context_unit(model, context, *, sub_type=None, **kwargs)[source]¶
Context units require an AmpelContext instance as init parameters, additionaly to potentialy defined dedicated custom parameters. :raises: ValueError is the unit defined in the model is unknown
- Return type:
Union
[TypeVar
(CT
, bound=ContextUnit
),ContextUnit
]
- new(model, *, unit_type=None, **kwargs)[source]¶
Instantiate new object based on provided model and kwargs. :param ‘unit_type’: performs isinstance check and raise error on mismatch. Enables mypy/other static checks. :rtype:
Union
[AmpelUnit
,TypeVar
(T
, bound=AmpelUnit
)] :returns: unit instance, trace id (0 if not computable)
- get_class_by_name(name, unit_type=None)[source]¶
Matches the parameter ‘name’ with the unit definitions defined in the ampel_config. This allows to retrieve the corresponding fully qualified name of the class and to load it.
- Parameters:
unit_type (
Optional
[type
[TypeVar
(T
, bound=AmpelUnit
)]]) –LogicalUnit or any sublcass of LogicalUnit
ContextUnit or any sublcass of ContextUnit
If None (auxiliary class), returned object will have type[Any]
- Raises:
ValueError if unit cannot be found or loaded or if parent class is unrecognized
- Return type:
- resolve_secrets(unit_type, init_kwargs)[source]¶
Add a resolved Secret instance to init_kwargs for every Secret field of unit_type.
Data classes¶
Models¶
- class ampel.model.operator.AnyOf.AnyOf(**kwargs)[source]¶
-
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'extra': 'forbid', 'populate_by_name': True, 'validate_default': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[dict[str, FieldInfo]] = {'any_of': FieldInfo(annotation=list[Union[~T, AllOf]], required=True)}¶
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].
This replaces Model.__fields__ from Pydantic V1.
- class ampel.model.operator.AllOf.AllOf(**data)[source]¶
-
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'extra': 'forbid', 'populate_by_name': True, 'validate_default': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[dict[str, FieldInfo]] = {'all_of': FieldInfo(annotation=list[~T], required=True)}¶
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].
This replaces Model.__fields__ from Pydantic V1.
- class ampel.model.operator.OneOf.OneOf(**data)[source]¶
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'extra': 'forbid', 'populate_by_name': True, 'validate_default': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[dict[str, FieldInfo]] = {'one_of': FieldInfo(annotation=list[~T], required=True)}¶
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].
This replaces Model.__fields__ from Pydantic V1.
- class ampel.model.time.TimeConstraintModel.TimeConstraintModel(**data)[source]¶
Examples:
TimeConstraintModel( **{ "after": { "match_type": "time_delta", "days": -1 }, "before": { "match_type": "time_string", "dateTimeStr": "21/11/06 16:30", "dateTimeFormat": "%d/%m/%y %H:%M" } } )
TimeConstraintModel( **{ "after": { "match_type": "time_last_run", "name": "val_test" }, "before": { "match_type": "unix_time", "value": 1531306299 } } )
-
before:
Optional
[Union
[TimeDeltaModel
,TimeLastRunModel
,TimeStringModel
,UnixTimeModel
]]¶
- get_query_model(**kwargs)[source]¶
Call this method with db=<instance of AmpelDB> if your time constraint is based on TimeLastRunModel
- Return type:
Optional
[QueryTimeModel
]
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'extra': 'forbid', 'populate_by_name': True, 'validate_default': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[dict[str, FieldInfo]] = {'after': FieldInfo(annotation=Union[NoneType, TimeDeltaModel, TimeLastRunModel, TimeStringModel, UnixTimeModel], required=False), 'before': FieldInfo(annotation=Union[NoneType, TimeDeltaModel, TimeLastRunModel, TimeStringModel, UnixTimeModel], required=False)}¶
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].
This replaces Model.__fields__ from Pydantic V1.
-
before:
- class ampel.model.time.TimeDeltaModel.TimeDeltaModel(**data)[source]¶
-
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'extra': 'forbid', 'populate_by_name': True, 'validate_default': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[dict[str, FieldInfo]] = {'days': FieldInfo(annotation=int, required=False, default=0), 'hours': FieldInfo(annotation=int, required=False, default=0), 'match_type': FieldInfo(annotation=Literal['time_delta'], required=True), 'microseconds': FieldInfo(annotation=int, required=False, default=0), 'milliseconds': FieldInfo(annotation=int, required=False, default=0), 'minutes': FieldInfo(annotation=int, required=False, default=0), 'seconds': FieldInfo(annotation=int, required=False, default=0), 'weeks': FieldInfo(annotation=int, required=False, default=0)}¶
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].
This replaces Model.__fields__ from Pydantic V1.
- class ampel.model.time.TimeLastRunModel.TimeLastRunModel(**data)[source]¶
-
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'extra': 'forbid', 'populate_by_name': True, 'validate_default': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[dict[str, FieldInfo]] = {'fallback': FieldInfo(annotation=Union[NoneType, dict], required=False, default={'days': -1}), 'match_type': FieldInfo(annotation=Literal['time_last_run'], required=True), 'process_name': FieldInfo(annotation=str, required=True), 'require_success': FieldInfo(annotation=bool, required=False, default=True)}¶
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].
This replaces Model.__fields__ from Pydantic V1.
- class ampel.model.time.TimeStringModel.TimeStringModel(**data)[source]¶
-
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'extra': 'forbid', 'populate_by_name': True, 'validate_default': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[dict[str, FieldInfo]] = {'dateTimeFormat': FieldInfo(annotation=str, required=True), 'dateTimeStr': FieldInfo(annotation=str, required=True), 'match_type': FieldInfo(annotation=Literal['time_string'], required=True)}¶
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].
This replaces Model.__fields__ from Pydantic V1.
- class ampel.model.time.UnixTimeModel.UnixTimeModel(**data)[source]¶
-
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'extra': 'forbid', 'populate_by_name': True, 'validate_default': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[dict[str, FieldInfo]] = {'match_type': FieldInfo(annotation=Literal['unix_time'], required=True), 'value': FieldInfo(annotation=int, required=True)}¶
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].
This replaces Model.__fields__ from Pydantic V1.
- class ampel.model.t3.LoaderDirective.LoaderDirective(**data)[source]¶
Specification of documents to load
-
excluding_query:
bool
¶ whether an emtpy find() result should discard entirely the associated stock for further processing
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'extra': 'forbid', 'populate_by_name': True, 'validate_default': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[dict[str, FieldInfo]] = {'col': FieldInfo(annotation=Literal['stock', 't0', 't1', 't2'], required=True), 'excluding_query': FieldInfo(annotation=bool, required=False, default=False), 'query_complement': FieldInfo(annotation=Union[NoneType, dict[str, Any]], required=False), 'resolve_config': FieldInfo(annotation=bool, required=False, default=False)}¶
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].
This replaces Model.__fields__ from Pydantic V1.
-
excluding_query:
- class ampel.model.t3.T2FilterModel.T2FilterModel(**data)[source]¶
-
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'extra': 'forbid', 'populate_by_name': True, 'validate_default': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[dict[str, FieldInfo]] = {'match': FieldInfo(annotation=dict[str, Any], required=True), 'unit': FieldInfo(annotation=str, required=True)}¶
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].
This replaces Model.__fields__ from Pydantic V1.
- class ampel.model.ingest.IngestDirective.IngestDirective(**data)[source]¶
channel filter ingest
stock_t2 point_t2 <- based on input dps list combine <- same
state_t2 <- based on dps list returned by combine point_t2 <- same
- mux <- based on input dps list
- insert
point_t2 <- based on <datapoints insert> result from muxer (first list)
- combine
state_t2 <- based on <datapoints combine> result from muxer (second list) point_t2 <- same
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'extra': 'forbid', 'populate_by_name': True, 'validate_default': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_fields: ClassVar[dict[str, FieldInfo]] = {'channel': FieldInfo(annotation=Union[int, str], required=True), 'filter': FieldInfo(annotation=Union[NoneType, FilterModel], required=False), 'ingest': FieldInfo(annotation=IngestBody, required=False, default=IngestBody(stock_t2=None, point_t2=None, combine=None, mux=None))}¶
Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].
This replaces Model.__fields__ from Pydantic V1.
Templates¶
Odds and ends¶
- class ampel.log.AmpelLogger.AmpelLogger(name=0, base_flag=None, handlers=None, channel=None, console=True)[source]¶
- classmethod get_logger(name=None, force_refresh=False, **kwargs)[source]¶
Creates or returns an instance of
AmpelLogger
that is registered in static dict ‘loggers’ using the provided name as key. If a logger with the given name already exists, the existing logger instance is returned. If name is None, unique (int) name will be generated :param**kwargs
: passed to constructorTypical use:
logger = AmpelLogger.get_logger()
- Return type:
- class ampel.core.AmpelRegister.AmpelRegister(autoload=True, **kwargs)[source]¶
# noqa: E101
General notes:¶
An ampel register file is made of three parts: signature, header and content
1) The file signature is encoded in the first 11 bytes. The signature contains the header’s length and size (3 bytes each)
2) A BSON encoded dict is saved in the next x bytes, refered to as the file’s header. The header can be updated independently of the register’s content. The size of the header is customizable during file creation.
3) The register’s content is usually a compressed structure (zip, bzip or xz) which stores information in binary format. Compression can be turned off if so whished. (The size of a register containing 10 millions alert ids [8 bytes] and filter return code [1 byte] can be reduced from ~90MB to ~50MB using gzip with default settings).
Properties:¶
Registers can be re-opened and appended
Header content can be accessed or updated independently of the register’s content.
Header updates are fast if enough space was reserved for updates in the first place.
Header size can be increased afterwards at the cost of having to rewrite the entire file once. This happens automatically when needed.
Logging read and write access to the register content into the register header is supported
Registers can be capped (based on max content length or max number of run ids). Once the limit is reached, the full register is renamed and a new one is created.
Note: the module ampel.util.register contains tools to manually change ampel registers, such as: get_header_content, open_file_and_write_header and rescale_header
Target file:¶
- param path_base:
the base folder path where to create/read register files.
- param path_extra:
an optional additional folder path (or paths) to be appended to path_base.
- param full_path:
when provided, all path building options will be ignored and a new file handle will be opened using the provided full_path
- param file_handle:
if you provide a file handle to a register file (parameter file handle), it will be used and all path options will be ignored.
- param file_cap:
registers can be capped, typically based on a maximum number of blocks. When access to a full register is requested, the original file is renamed and a new register is created. During files renames, an index is appended (suffix) to the original file name. The current (newest) register file always comes without index suffix. This index is saved into the current register header.
Example:
ampel_register.bin.gz (current - newest) ampel_register.bin.gz.1 (first renamed register - oldest) ampel_register.bin.gz.2 ampel_register.bin.gz.3 (second newest)
Note1: file renames occur during the opening procedure of registers, no check is performed during the filling of registers once a register is opened. A register can thus grow beyond the defined limit as long as a process keeps it open.
Header:¶
Limitations:
The maximum space reservable for the header is 2**24 bytes, i.e ~16MB.
integers cannot exceed 2**63 bits. Should you need to save bigger numbers, please use the methods bindata_to_int, int_to_bindata from module ampel.util.bson
- param new_header_size:
either None, or an integer or a string (please read the note above).
None or 0: the header block size will equal the header encoded length. Choose this option if the header is not meant to be updated later. Otherwise, updates will only be possible if the header size does not grow (note that there is a margin allowed since header exceeding the limit are automatically compressed using zlib and written to disk if the size condition is then fullfilled).
integer number: the header will be allocated the specified number of bytes (for example 4096).
a string: refers to a ‘header margin’ and must start with the character ‘+’. This option can save space in some circumstances. The header space allocated for the header will equal the length of the initial header (including all provided options such as header_extra) to which the specified header margin will be added. For example, ‘+1024’ means that 1024 bytes will be allocated additionally to the initial header lengths for future updates. If the initial header length is 100 bytes, then a header block of 1124 bytes will be created.
- param header_extra:
any extra to be included in the header under the key ‘extra’ (must be bson encodable)
- param header_extra_base:
any extra to be included in the header at root depth (must be bson encodable)
- param header_update_anyway:
if no update to the register is made, the default setting is that the header is not updated. This settings forces header updates. For example, you might want to save all run ids into the header whether or not they changed the content of the register.
- param header_log_accesses:
if True, timestamps will be recorded each time the register is opened/closed along with the amount of new blocks appended to the register.
Note 1: parameter new_header_size must be set when using this value. Note 2: see docstring of paramter header_update_anyway that affects the behavior of this parameter.
In the following example, header_log_accesses is responsible for creating/updating the key ‘updated’:
- “ts”: {
“created”: 1590506868.3880599, “updated”: [
[1590506868.3880599, 1590509029.389, 1200], [1590507152.079873, 1590507295.080478, 2300]
- ]
}
Errors:¶
- raises:
ValueError, FileNotFoundError
if neither file_handle, full_path, path_base exist.
if read access to a non-existing file is requested
if the target file is not a register file (existing empty files are ok)
if the ‘struct’ parameter of sub-class of this class differs with the values registered in the file’s header (this behavior can be deactivated from parameter on_exist_check)
during file renames (in case capped registers are used) if the target file already exists
- onload_update_header()[source]¶
Override if you need to update the header of an existing register. Ex: BaseAlertRegister adds the current run id
- Return type:
- check_header(header)[source]¶
- Raises:
ValueError is raised on mismatch between this instance value
and the header value for the provided key :rtype:
None
- register_file_access(header=None, use_this_time=None, new_blocks=None)[source]¶
- Parameters:
- Return type: