API reference

Ampel-interface provides base classes for implementing Ampel processing units, as well as the data classes that are provided as input to those units by the core.

Processing units

class ampel.abstract.AbsT0Unit.AbsT0Unit(**kwargs)[source]

Bases: AmpelABC, LogicalUnit

A unit that creates datapoints for Ampel

Before new datapoint are inserted into the database, they are customized (or ‘ampelized’ if you will), in order to later enable the use of short and flexible queries. The cutomizations are light, most of the original information is kept. For example, in the case of ZiDataPointShaper:

  • The field candid is renamed in id

  • A new field ‘tag’ is created

abstract process(arg, stock=None)[source]

Convert an external object to Ampel format

Return type:

list[DataPoint]

require: ClassVar[None | tuple[str, ...]] = ()

Resources requirements as class variable (passed on to and merged with subclasses).

class ampel.abstract.AbsPointT2Unit.AbsPointT2Unit(**kwargs)[source]

Bases: AmpelABC, LogicalUnit

A T2 unit bound to a DataPoint

Note that the implementing class can customize the default ingestion behavior (which DataPoint to create T2 documents for) by defining the class variable ‘eligible’

Example::

ingest: ClassVar[DPSelection] = DPSelection(select=(1, -2, 5))

will create documents bound to every 5th datapoint starting from the 2nd and ending with the 3rd-to-last

Example::

ingest: ClassVar[DPSelection] = DPSelection(filter=UnitModel(unit=’SimpleTagFilter’, config={‘require’: [‘ZTF_DP’]}), sort=’jd’, select=’first’}

will create documents bound to the first datapoint with tag ‘ZTF_DP’, in order of body.jd.

select options:
  • “first”: first datapoint for a stock

  • “last”: most recent datapoint for a stock

  • “all”: every datapoint (default)

  • tuple: slice of datapoints

-> see DPSelection docstring for info regarding ‘filter’ and ‘sort’.

If ‘eligible’ is not specified, default ingestion will occur: a T2 document will be created for each datapoint

abstract process(datapoint)[source]

Returned object should contain computed science results to be saved into the DB. :rtype: Union[None, str, int, float, bool, bytes, list[Any], dict[str, Any], UnitResult]

Note

the returned dict must have only string keys and be BSON-encodable

require: ClassVar[None | tuple[str, ...]] = ()

Resources requirements as class variable (passed on to and merged with subclasses).

class ampel.abstract.AbsStockT2Unit.AbsStockT2Unit(**kwargs)[source]

Bases: AmpelABC, LogicalUnit

A T2 unit bound to a StockDocument

abstract process(stock_doc)[source]

Returned object should contain computed science results to be saved into the DB. :rtype: Union[None, str, int, float, bool, bytes, list[Any], dict[str, Any], UnitResult]

Note

the returned dict must have only string keys and be BSON-encodable

require: ClassVar[None | tuple[str, ...]] = ()

Resources requirements as class variable (passed on to and merged with subclasses).

class ampel.abstract.AbsStateT2Unit.AbsStateT2Unit(**kwargs)[source]

Bases: AmpelABC, LogicalUnit

A T2 unit bound to a T1Document (state of a stock)

abstract process(compound, datapoints)[source]

Returned object should contain computed science results to be saved into the DB. :rtype: Union[None, str, int, float, bool, bytes, list[Any], dict[str, Any], UnitResult]

Note

the returned dict must have only string keys and be BSON-encodable

require: ClassVar[None | tuple[str, ...]] = ()

Resources requirements as class variable (passed on to and merged with subclasses).

class ampel.abstract.AbsCustomStateT2Unit.AbsCustomStateT2Unit(**kwargs)[source]

Bases: Generic[T], AmpelABC, LogicalUnit

A T2 unit bound to a custom type constructed from a compound

Known subclass: AbsLightCurveT2Unit

abstract static build(compound, datapoints)[source]

Create the parametrized type using compound and datapoints. For example, AbsCustomStateT2Unit[LightCurve] would return a LightCurve instance.

Return type:

TypeVar(T)

abstract process(arg)[source]

Returned object should contain computed science results to be saved into the DB. :rtype: Union[None, str, int, float, bool, bytes, list[Any], dict[str, Any], UnitResult]

Note

the returned dict must have only string keys and be BSON-encodable

require: ClassVar[None | tuple[str, ...]] = ()

Resources requirements as class variable (passed on to and merged with subclasses).

class ampel.abstract.AbsTiedT2Unit.AbsTiedT2Unit(*args, **kwargs)[source]

Bases: AmpelABC, LogicalUnit

A T2 unit that depends on the results of other T2 units.

t2_dependency: Sequence[UnitModel]

Dependencies configuration for the underlying tied t2 unit Sub-class can add unit id constraints by parametrizing UnitModel

require: ClassVar[None | tuple[str, ...]] = ()

Resources requirements as class variable (passed on to and merged with subclasses).

class ampel.abstract.AbsTiedStateT2Unit.AbsTiedStateT2Unit(*args, **kwargs)[source]

Bases: Generic[T], AbsTiedT2Unit

A T2 unit bound to a T1Document (state of a stock), as well as the results of other T2 units

t2_dependency: Sequence[StateT2Dependency]

Dependencies configuration for the underlying tied t2 unit Sub-class can add unit id constraints by parametrizing UnitModel

abstract process(compound, datapoints, t2_views)[source]

Returned object should contain computed science results to be saved into the DB. :rtype: Union[None, str, int, float, bool, bytes, list[Any], dict[str, Any], UnitResult]

Note

the returned dict must have only string keys and be BSON-encodable

require: ClassVar[None | tuple[str, ...]] = ()

Resources requirements as class variable (passed on to and merged with subclasses).

class ampel.abstract.AbsTiedCustomStateT2Unit.AbsTiedCustomStateT2Unit(*args, **kwargs)[source]

Bases: Generic[T, U], AbsTiedT2Unit

A T2 unit bound to a custom type constructed from a compound, as well as the results of other T2 units.

Known subclass: AbsTiedLightCurveT2Unit

t2_dependency: Sequence[StateT2Dependency[~U]]

Dependencies configuration for the underlying tied t2 unit Sub-class can add unit id constraints by parametrizing UnitModel

abstract static build(compound, datapoints)[source]

Create the parametrized type using compound and datapoints. For example, AbsTiedCustomStateT2Unit[LightCurve] would return a LightCurve instance.

Return type:

TypeVar(T)

abstract process(arg, t2_views)[source]

Returned object should contain computed science results to be saved into the DB. :rtype: Union[None, str, int, float, bool, bytes, list[Any], dict[str, Any], UnitResult]

Note

the returned dict must have only string keys and be BSON-encodable

require: ClassVar[None | tuple[str, ...]] = ()

Resources requirements as class variable (passed on to and merged with subclasses).

class ampel.abstract.AbsT3Unit.AbsT3Unit(**kwargs)[source]

Bases: Generic[T], AmpelABC, LogicalUnit

Generic abstract class for T3 units receiving a SnapView generator

abstract process(gen, t3s)[source]

T3 units receive SnapView instances (or subclasses of) via a generator. The method gen.send(…) applies a modification to the last view yielded by the generator unless a stock id is provided via a tuple. Use gen.send(JournalAttributes) to customize the journal of the stock document associated with the view. Use gen.send(StockAttributes) to customize the stock tags or name (or journal) associated with the view. Use gen.send((StockId, JournalAttributes)) to customize the journal for a view other than the most recent, e.g. when processing in batches. The content of the t3 store is dependent on: - the configuration of the ‘include’ option of the underlying t3 process - previously run t3 units if the option ‘propagate’ is activated

Return type:

Union[None, str, int, float, bool, bytes, list[Any], dict[str, Any], UnitResult]

require: ClassVar[None | tuple[str, ...]] = ()

Resources requirements as class variable (passed on to and merged with subclasses).

Base classes for processing units

class ampel.base.LogicalUnit.LogicalUnit(**kwargs)[source]

Logical as in: performing logic operations of ampel’s tier: add, combine, augment, synthetise. Base for standardized t0, t1, t2 and t3 units. Note: - the defined parameters must be serializable - logical units are initialized (by UnitLoader) as follows: * ctor * if available, secrets are resolved * if defined by sub-class, post_init() is called

require: ClassVar[Optional[tuple[str, ...]]] = None

Resources requirements as class variable (passed on to and merged with subclasses).

Context and configuration

class ampel.config.AmpelConfig.AmpelConfig(config, freeze=False)[source]

Container for the central Ampel configuration

get(entry=None, ret_type=None, *, raise_exc=False)[source]

Optional arguments:

Parameters:
Return type:

Union[None, str, int, float, bool, list[Any], dict[str, Any], TypeVar(JT, None, str, int, float, bool, bytes, list[Any], dict[str, Any])]

Examples:

get("channel.HU_RANDOM")
get(['foo', 'bar', 'baz'])
Raises:

ValueError – if the retrieved value has not the expected type

class ampel.abstract.AbsSecretProvider.AbsSecretProvider(*args, **kwargs)[source]

Interface to a secret store used to resolve secrets. The underlying store may be as simple as a dict loaded from a JSON file or a complete key manager like Vault.

abstract tell(arg, ValueType)[source]

Potentially update an initialized Secret instance with the actual sensitive information associable with it. :rtype: bool :returns: True if the Secret was told/resolved or False if the provided Secret is either unknown to this secret provider, or resolves to a value of the wrong type.

Data classes

class ampel.alert.AmpelAlert.AmpelAlert(id, stock, datapoints, tag=None, extra=None)[source]

Implements AmpelAlertProtocol

get_values(key, filters=None)[source]

Example: :rtype: list[Any]

get_values(“magpsf”)

get_tuples(key1, key2, filters=None)[source]

Example:

get_tuples("jd", "magpsf")
Return type:

list[tuple[Any, Any]]

get_ntuples(params, filters=None)[source]

Example: :rtype: list[tuple]

get_ntuples([“fid”, “jd”, “magpsf”])

class ampel.content.StockDocument.StockDocument[source]

The stock record ties together data from various sources, selected by various channels, but all related to the same underlying object. Each channel has a different view of the stock. From the perspective of a given channel, the stock is updated whenever a linked document (T0, T1, T2) is updated.

A dict containing 1 or more of the following items:

stock: Union[int, bytes, str]

The unique id associated with the stock. Integer most of the time

origin: int

Optional source origin (avoids potential stock collision between different data sources)

tag: Sequence[Union[int, str]]

Optional tag(s)

channel: Sequence[Union[int, str]]

Channels asscoiated with this stock

journal: Sequence[JournalRecord]

Records of activity

ts: dict[Union[int, str], dict[Literal['tied', 'upd'], float]]

Creation time (UNIX epoch) in each channel

updated: int | float

Last update time for any channel

name: Sequence[int | str]

External name(s) associated with the stock

body: dict[str, Any]

Optional specific content

class ampel.content.T2Document.T2Document[source]

Specifications for tier2 documents stored as BSON structures in the ampel DB. Calculations of the associated t2 unit is performed based on ampel data referenced by the attribute ‘link’. Linked input data type can be either StockDocument, DataPoint, or T1Document.

stock: Required[Union[int, bytes, str, Sequence[Union[int, bytes, str]]]]

Stock id associated with the data

origin: int

Optional source origin (avoids potential stock collision between different data sources)

unit: Required[str]

Name of the unit to be run. This may be hashed for performance reasons.

config: Required[Optional[int]]

Configuration hash, if unit defaults were overridden. The underlying values can be resolved with UnitLoader.get_init_config()

References to input data

tag: Sequence[Union[int, str]]

visible by any projection (not channel bound)

channel: Required[Sequence[Union[int, str]]]

Ampel channel(s) associated with this document

meta: Required[Sequence[MetaRecord]]

Records of activity on this document

col: str

Name of the database collection holding the input data (t1 if unspecified) (enables efficient DB queries at T3 level)

code: Required[int]

DocumentCode.NEW for new T2 document, DocumentCode.OK if computation was successful

body: Required[Sequence[Union[None, str, int, float, bool, bytes, list[Any], dict[str, Any]]]]

value(s) returned by T2 unit execution(s)

expiry: datetime

Time when this document may be deleted

class ampel.content.LogDocument.LogDocument[source]

Abbreviations: s: stock, a: alert, f: flag, r: run, m: msg, c: channel

Example: {

“_id” : ObjectId(“5be4aa6254048041edbac353”), “s” : NumberLong(1810101032122523), “a” : NumberLong(404105201415015004), “f” : 572784643, “r” : 509, “c” : “NO_FILTER”, “m” : “Alert accepted”

}

_id: bytes

database key

f: Required[int]

flag

r: Required[int | Sequence[int]]

run id

m: str | Sequence[str] | ChannelLogEntry

msg

s: Union[int, bytes, str, Sequence[Union[int, bytes, str]]]

stock

c: Union[int, str, Sequence[Union[int, str]]]

channel

u: str

unit

x: dict[str, Any]

extra

p: tuple[str, int]

file:line_number (set DBLoggingHanlder.log_provenance to True)

class ampel.content.JournalRecord.JournalRecord[source]

A record of activity on a stock document.

tier: Literal[-1, 0, 1, 2, 3]

Tier of the associated process

ts: int | float

UNIX epoch of the activity

channel: Union[int, str, Sequence[Union[int, str]]]

Channels associated with the activity

process: int | str

Name of the associated process

tag: Union[int, str, Sequence[Union[int, str]]]

Free-form labels

run: int

Run(s) associated with this record

code: int

Status code of the associated process

action: int

Action code(s) built from JournalActionCode

duration: int | float

Duration of the process

traceid: dict[str, int]

Trace ids

unit: int | str

id of the unit associated with this record

doc: int | bytes

id of the document associated with the invocation

extra: dict[str, Any]

Free-form information

class ampel.struct.JournalAttributes.JournalAttributes(code=None, tag=None, extra=None)[source]

Structure potentialy used by Ampel units to customize the stock journal entry created after a process is run.

tag: Union[None, int, str, Sequence[Union[int, str]]]

journal entry tag(s)

extra: Optional[dict[str, Any]]

if provided, will be included as-is under the journal root key ‘extra’

code: Optional[int]

code / status

dict()[source]
Return type:

JournalRecord

into(prime)[source]
Return type:

JournalRecord

class ampel.view.SnapView.SnapView(id, stock=None, origin=None, t0=None, t1=None, t2=None, logs=None, extra=None)[source]

View of a given ampel object (with unique stock id).

This class references various instances of objects from package ampel.content, originating from different ampel tiers. It can also contain external/composite objects embedded in the dict called ‘extra’, for example spectra or cutout images. The config parameter of a T3 process determines which information are included. Instances of this class (or of subclass such as TransientView) are provided to AbsT3Unit.process().

get_t2_views(unit=None, link=None, code=None)[source]

Get a subset of T2 documents.

Parameters:
Return type:

Iterator[T2DocView]

get_raw_t2_body(unit, link=None, code=None)[source]
Parameters:
Return type:

Optional[Sequence[Union[None, str, int, float, bool, bytes, list[Any], dict[str, Any]]]]

get_latest_t2_body(unit, link=None, code=None)[source]

Get latest t2 body element from a given unit.

Parameters:
  • unit_id – target unit id

  • link_id – restrict to a specific link

Return type:

Union[None, str, int, float, bool, bytes, list[Any], dict[str, Any]]

get_t2_body(unit, ret_type=<class 'dict'>, *, data_slice=-1, link=None, code=None, raise_exc=False)[source]

Get latest t2 body element from a given unit. :type ret_type: type[TypeVar(T)] :param ret_type: expected body element type. If isinstance check is not fullfied None will be returned (unless multiple unit are to be matched and another unit fullfill the criteria) or an exception will be raised if raise_exc is True :type link: Union[None, int, bytes, str] :param link: restrict to a specific link

Return type:

Optional[TypeVar(T)]

get_t2_value(unit, key, rtype, *, code=None)[source]

Examples: get_t2_value((“T2NedSNCosmo”, “T2SNCosmo”), “fit_result”, dict)

see T2DocView.get_value(…) for more info

Return type:

Optional[TypeVar(T)]

get_t2_ntuple(unit, key, rtype, *, no_none=False, require_all_keys=True, code=None)[source]

Examples: get_t2_ntuple(“T2NedTap”, (“ra”, “dec”, “z”, “zunc”), float) get_t2_ntuple((“T2NedSNCosmo”, “T2SNCosmo”), (“fit_result”, “covariance”), dict)

see T2DocView.get_ntuple(…) for more info

Return type:

UnionType[None, tuple[TypeVar(T), ...], tuple[Optional[TypeVar(T)], ...]]

get_journal_entries(tier=None, process_name=None, filter_func=None)[source]

Get a subset of journal entries.

Parameters:
  • tier (Optional[Literal[0, 1, 2, 3]]) – return only journal entries associated with the given tier

  • process_name (Optional[str]) – return only journal entries associated with a given process name

  • latest – return only the latest entry in the journal (the latest in time)

Return type:

Iterator[JournalRecord]

Returns:

journal entries corresponding to a given tier and/or job, sorted by timestamp.

class ampel.view.ReadOnlyDict.ReadOnlyDict[source]

A dict whose items can’t be changed.

pop(*args, **kwargs)
Raises:

RuntimeError – whenever called

popitem(*args, **kwargs)
Raises:

RuntimeError – whenever called

clear(*args, **kwargs)
Raises:

RuntimeError – whenever called

update(*args, **kwargs)
Raises:

RuntimeError – whenever called

setdefault(*args, **kwargs)
Raises:

RuntimeError – whenever called

Enums

Data models

These models are used to parse and validate the configuration of the Ampel system.

class ampel.model.UnitModel.UnitModel(**data)[source]

Specification of a processing unit. Note: generic parametrization allows to constrain unit ids (ex: UnitModel[Literal[‘T2SNCosmo’]])

unit: TypeVar(T, bound= str)

Name of ampel unit class

config: UnionType[None, int, str, dict[str, Any]]
  • None: no config (use class defaults)

  • dict: config ‘as is’

  • str: a corresponding alias key in the AmpelConfig must match the provided string

  • int: used internally for T2 units, a corresponding int key (AmpelConfig, base key ‘confid’) must match the provided integer

override: Optional[dict[str, Any]]

Values to override in the config

model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'extra': 'forbid', 'populate_by_name': True, 'validate_default': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'config': FieldInfo(annotation=Union[NoneType, int, str, dict[str, Any]], required=False), 'override': FieldInfo(annotation=Union[NoneType, dict[str, Any]], required=False), 'secrets': FieldInfo(annotation=Union[NoneType, dict[str, Any]], required=False), 'unit': FieldInfo(annotation=~T, required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

class ampel.model.StateT2Dependency.StateT2Dependency(**data)[source]

Bases: UnitModel, Generic[T]

Used to specify how “tied state t2” units should select the associated required t2 document.

T2Processor needs to retrieve the T2Records of units tied with this unit. If link_override is unspecified, t2 dependencies are resolved - for each StateT2Dependency - using the db match query: {unit: <unit_name>, link: <same link as root doc>}.

This behavior is overridable/customizable via ‘t2_dependency->link_override’ (t2 config dict). ‘link_override’ allows to link a state T2 with a different value than the one registered as ‘link’ in the T2Record.

For example, link_override enables to tie a state T2 with the result of a point t2 (the returned link could be the id of the first datapoint contained in the compound)

  • None: the state associated with the root tied state T2 will be used (value of ‘link’ in t2 doc)

  • DPSelection: allows tied state T2 units to be bound with point t2 units.

model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'extra': 'forbid', 'populate_by_name': True, 'validate_default': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_fields: ClassVar[dict[str, FieldInfo]] = {'config': FieldInfo(annotation=Union[NoneType, int, str, dict[str, Any]], required=False), 'link_override': FieldInfo(annotation=Union[NoneType, DPSelection], required=False), 'override': FieldInfo(annotation=Union[NoneType, dict[str, Any]], required=False), 'secrets': FieldInfo(annotation=Union[NoneType, dict[str, Any]], required=False), 'unit': FieldInfo(annotation=~T, required=True)}

Metadata about the fields defined on the model, mapping of field names to [FieldInfo][pydantic.fields.FieldInfo].

This replaces Model.__fields__ from Pydantic V1.

class ampel.base.AmpelBaseModel.AmpelBaseModel(**data)[source]

Raises validation errors if extra fields are present