Source code for ampel.core.AmpelContext

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# File:                Ampel-core/ampel/core/AmpelContext.py
# License:             BSD-3-Clause
# Author:              valery brinnel <firstname.lastname@gmail.com>
# Date:                18.02.2020
# Last Modified Date:  09.01.2022
# Last Modified By:    valery brinnel <firstname.lastname@gmail.com>

from typing import Any, Literal, TYPE_CHECKING
from collections.abc import Iterable
from ampel.config.AmpelConfig import AmpelConfig
from ampel.base.AuxUnitRegister import AuxUnitRegister
from ampel.secret.AmpelVault import AmpelVault
from ampel.secret.AESecretProvider import AESecretProvider
from ampel.config.builder.DisplayOptions import DisplayOptions

# Avoid cyclic import issues
if TYPE_CHECKING:
	from ampel.core.AmpelDB import AmpelDB
	from ampel.core.UnitLoader import UnitLoader # noqa


[docs] class AmpelContext: """ This class is typically instantiated by Ampel controllers and provided to "processor unit" instances. Note: this class might in the future be capable of handling multiple AmpelConfig/AmpelDB instances """ def __init__(self, config: AmpelConfig, db: 'AmpelDB', loader: 'UnitLoader', # forward reference to avoid cyclic import issues resource: None | dict[str, Any] = None, admin_msg: None | str = None ) -> None: """ :param config: system configuration :param db: database client :param loader: instantiates unit from :class:`~ampel.model.UnitModel.UnitModel` """ self.config = config self.db = db self.loader = loader self.admin_msg = admin_msg self.resource = resource self.run_time_aliases: dict[str, Any] = {} # try to register aux units globally try: AuxUnitRegister.initialize(config) except Exception: print("UnitLoader auxiliary units auto-registration failed")
[docs] @classmethod def load(cls, config: str | dict, pwd_file_path: None | str = None, pwds: None | Iterable[str] = None, freeze_config: bool = True, vault: None | AmpelVault = None, one_db: bool = False, **kwargs ) -> 'AmpelContext': """ Instantiates a new AmpelContext instance. :param config: local path to an ampel config file (yaml or json) or loaded config as dict :param pwd_file_path: path to a text file containing one password per line. The underlying AmpelVault will be initialized with an AESecretProvider configured with these pwds. :param pwds: Same as 'pwd_file_path' except a list of passwords is provided directly via this parameter. :param freeze_config: whether to convert the elements contained of ampel config into immutable structures (:class:`dict` -> :class:`~ampel.view.ReadOnlyDict.ReadOnlyDict`, :class:`list` -> :class:`tuple`). Parameter does only apply if the config is loaded by this method, i.e if parameter 'config' is a str. """ # Avoid cyclic import issues from ampel.core.UnitLoader import UnitLoader # noqa from ampel.core.AmpelDB import AmpelDB alconf = AmpelConfig(config) if isinstance(config, dict) else AmpelConfig.load(config) if vault is None: vault = AmpelVault([]) if pwds: vault.providers.append( AESecretProvider(pwds) ) elif pwd_file_path: with open(pwd_file_path, "r") as f: vault.providers.append( AESecretProvider([l.strip() for l in f.readlines()]) ) db = AmpelDB.new(alconf, vault, one_db=one_db) return cls( config = alconf, db = db, loader = UnitLoader(config=alconf, db=db, vault=vault), **kwargs )
[docs] @classmethod def build(cls, ignore_errors: bool = False, pwd_file_path: None | str = None, pwds: None | Iterable[str] = None, freeze_config: bool = True, verbose: bool = False, ) -> 'AmpelContext': """ Instantiates a new AmpelContext instance. The required underlying ampel configuration (:class:`~ampel.config.AmpelConfig.AmpelConfig`) is built from scratch, meaning all available configurations defined in the various ampel repositories available locally are collected, merged merged together and morphed the info into the final ampel config. This is a convenience method, do not use for production """ if pwd_file_path: with open(pwd_file_path, "r") as f: pwds = [l.strip() for l in f.readlines()] # Import here to avoid cyclic import error from ampel.config.builder.DistConfigBuilder import DistConfigBuilder cb = DistConfigBuilder(options=DisplayOptions(verbose=verbose)) cb.load_distributions() return cls.load( cb.build_config(ignore_errors, pwds=pwds), pwd_file_path=pwd_file_path, pwds=pwds, freeze_config=freeze_config, )
[docs] def new_run_id(self) -> int: """ Return an identifier that can be used to associate log entries from a single process invocation. This ID is unique and monotonicaly increasing. """ return self.db \ .get_collection('counter') \ .find_one_and_update( {'_id': 'current_run_id'}, {'$inc': {'value': 1}}, new=True, upsert=True ) \ .get('value')
[docs] def get_config(self) -> AmpelConfig: """ .. note:: in the future, AmpelContext might hold references to multiple different config """ return self.config
[docs] def get_database(self) -> 'AmpelDB': """ .. note:: in the future, this class might hold references to multiple different databases """ return self.db
def resolve_conf_id(self, conf_id: int) -> None | dict[str, Any]: try: return self.config.get_conf_id(conf_id) # confid not found (obsolete or dynamically generated by process) except Exception: l = list(self.db.col_conf_ids.find({"_id": conf_id})) if len(l) == 0: return None del l[0]['_id'] return l[0] def add_conf_id(self, conf_id: int, unit_config: dict[str, Any]) -> None: self.db.add_conf_id(conf_id, unit_config) dict.__setitem__(self.config._config["confid"], conf_id, unit_config) def add_run_time_alias(self, key: str, value: Any, overwrite: bool = False) -> None: if not isinstance(key, str) or key[0] != '%' != key[1]: raise ValueError('Run time aliases must begin with %%') if key in self.run_time_aliases and not overwrite: raise ValueError(f"Run time alias {key} already defined, set overwrite=True to ignore") self.run_time_aliases[key] = value def __repr__(self) -> str: return "<AmpelContext>" def deactivate_processes(self) -> None: for i in range(4): for p in self.config.get(f'process.t{i}', dict, raise_exc=True).values(): p['active'] = False
[docs] def activate_process(self, name: str, tier: None | Literal[0, 1, 2, 3] = None ) -> bool: """ :returns: False if process was deactivated, True if not found """ for i in range(4): if tier and i != tier: continue if p := self.config.get(f'process.t{i}.{name}', dict): p['active'] = False return False return True