Porting contrib projects from v0.6¶
Ampel v0.7 introduced a major internal restructuring to apply lessons learned from operating v0.6 during the first 1.5 years of ZTF. Here is a summary of the changes you need to make to adapt your project from v0.6 to v0.7. An example project using the current config system can be found in Ampel-contrib-sample.
Environment¶
See Environment.
Defining channels and registering units¶
All the channel/T3 configuration and unit registration that used to be
scattered in json files and entrypoints section of setup.py now lives in YAML
files in a directory conf/ampel-contrib-PROJECTNAME
(where PROJECTNAME
is the name of your project, e.g. ZTFbh
for ampel-contrib-ZTFbh
) at the top level
of your repository. See Defining channels and registering units for details of the configuration layout, and Validation for how to check your configuration for correctness.
Terminology changes and renamed classes¶
T3Job and T3Task are no more. Everything is just a Process now.
Many classes have been renamed to more accurately reflect their meaning. A partial list is below. Note that the names refer to the class with the same name as the module, e.g.
ampel.view.LightCurve
meansfrom ampel.view.LightCurve import LightCurve
v0.6 class |
v0.7 (nearest equivalent) |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
None (replaced by |
|
None (replaced by |
|
|
|
None ( |
|
|
|
|
|
|
Classes in
ampel.content
are declared asTypedDict
. At runtime these areReadOnlyDict
, but the annotations in the class definition tell you which keys they may have.
Channel definitions¶
The easiest way to define a channel is with a YAML file, e.g. conf/ampel-contrib-PROJECTNAME/channel/EXAMPLE_BRIGHT_N_STABLE.yaml. The content is similar to the JSON-based channel definitions in v0.6, but simplified. A few notable differences:
templates define common configurations that don’t need to be repeated in every channel definition, and replace much of the boilerplate found in v0.6 channel definitions. See
ZTFLegacyChannelTemplate
.T2 unit configurations can be defined either inline or in the alias section of the top-level config. A separate t2_config.json is no longer needed.
T3 process definitions embedded in the channel definition can also use templates. See
ZTFPeriodicSummaryT3
.
A slightly truncated example:
channel: EXAMPLE_BRIGHT_N_STABLE
contact: ampel@desy.de
active: true
# Auto-complete mode: how to treat photopoints be treated once a transient has
# been accepted.
# - false: apply filter to all photopoints
# - true or "live": bypass filter once a transient has been accepted once
auto_complete: live
# Channel template: basic settings for which alert stream to listen to, how to
# build light curves from alert packets, etc.
template: ztf_uw_public
# T0: which photopoints should be accepted to build light curves for each
# transient?
t0_filter:
unit: DecentFilterCopy
config:
min_ndet: 2
min_tspan: 0
max_tspan: 5
...
# T2: how should the collected photopoints and light curves be augmented?
t2_compute:
- unit: T2SNCosmo
config:
model: salt2
upper_limits: false
# config can be omitted if the unit has defaults
- unit: T2ExamplePolyFit
- unit: T2CatalogMatch
# A named configuration, defined in alias/t2. Names that start with "%" are
# global, other names are local to the project
config: '%T2CatalogMatch_general'
# T3: what should I do with the collected data?
t3_supervise:
# A minimal T3: select all data for transients modified since last run
# the optional parameters `name`, `load`, `filter`, and `complement` are set
# to sensible defaults.
- template: ztf_periodic_summary
schedule: every().day.at('15:00')
run:
unit: DemoT3Unit
# More settings: load only transient and T2 records for transients modified
# since last run where `sncosmo` color parameter is > 1
- name: set_all_the_things
template: ztf_periodic_summary
schedule: every(4).hours
load:
- TRANSIENT
- T2RECORD
filter:
t2:
unit: T2SNCosmo
match:
fit_results.c:
$gt: 1
run:
unit: DemoT3Unit
Some operations that were previously embedded in T3 units, like filtering TransientView
in AbsT3Unit.add
, now have their own dedicated stages. This makes it possible to reuse these stages without writing new code.
Standalone T3 processes¶
Just like in v0.6, T3 processes embedded in a channel definition implicitly select only transients associated with that channel. To consume transients from multiple channels, you have to define a standalone T3 process. These definitions also use templates, however, so can be quite compact:
name: TNSCompleteSummary
tier: 3
# every 60 minutes, consume all transients that were updated since the
# previous run in channels HU_GP_10 or HU_GU_59
template: ztf_periodic_summary
schedule: every(60).minutes
channel:
any_of:
- HU_GP_10
- HU_GP_59
# load the stock, t0, and t2 records associated with the transient (and channel)
load:
- TRANSIENT
- DATAPOINT
- T2RECORD
# for each selected transient, look up the TNS name
complement: TNSNames
# and pass to TNSTalker
run:
unit: TNSTalker
config:
# a Secret item, kept separate from the rest of the config
tns_api_key:
key: tns/jnordin
submit_tns: true
sandbox: false
max_age: 30
needed_catalogs: []
T0 units¶
T0 unit configuration¶
T0 units need to be registered in your project’s config.
All units in v0.7 use type annotations and pydantic
to define and validate their configuration. This means that if you previously used a nested RunConfig
class to define a configuration, you can move its fields up to the parent class, and access them as attributes from instances. In other words, the following v0.6 filter defintion:
from pydantic import BaseModel
from ampel.base.abstract.AbsAlertFilter import AbsAlertFilter
class AwesomeFilter(AbsAlertFilter):
class RunConfig(BaseModel):
"""
Necessary class to validate configuration.
"""
MIN_NDET: int # number of previous detections
MIN_TSPAN: float # minimum duration of alert detection history [days]
MAX_TSPAN: float # maximum duration of alert detection history [days]
def __init__(self, on_match_t2_units, base_config=None, run_config=None, logger=None):
if run_config is None:
raise ValueError("Please check your run configuration")
self.on_match_t2_units = on_match_t2_units
self.logger = logger if logger is not None else logging.getLogger()
# parse the run config
rc_dict = run_config.dict()
# ----- set filter proerties ----- #
# history
self.min_ndet = rc_dict['MIN_NDET']
self.min_tspan = rc_dict['MIN_TSPAN']
self.max_tspan = rc_dict['MAX_TSPAN']
shrinks down to:
from pydantic import Field
from ampel.alert.PhotoAlert import PhotoAlert
from ampel.abstract.AbsAlertFilter import AbsAlertFilter
class AwesomeFilter(AbsAlertFilter[PhotoAlert]):
min_ndet: int #: number of previous detections
min_tspan: float #: minimum duration of alert detection history [days]
max_tspan: float #: maximum duration of alert detection history [days]
def post_init(self):
...
You no longer have to define an __init__()
; the default __init__()
will set self.min_ndet
and raise an exception if required fields are not set or set with invalid values. If you need to do any custom setup, however, you can define a post_init()
that will be called within the base class __init__()
. A few other things to note:
All instances of
AbsAlertFilter
have aself.logger
property. You do not have to set one up yourself.AbsAlertFilter[PhotoAlert]
indicates that theapply()
method expects aPhotAlert
. Instances ofPhotoAlert
have separate photopoints and upper limits. If you omit the parameter toAbsAlertFilter
in your class definition, yourapply()
method will receive the base class,AmpelAlert
, instead. Instances ofAmpelAlert
only have one collection of datapoints.The annotations are used to build a pydantic model that validates the configuration parameters defined for the instance in e.g. a channel definition. This means that you can use Field() to impose jsonschema-style constraints on the field value, for example requiring an integer to be positive, or a list to have a specified number of items.
The
#:
comment marker indicates that the following is interface documentation, and should be included in autogenerated docs. Normal comments that start with just#
are ignored.Field names should be lower camel-cased by convention.
Base classes also exist to automate the configuration of e.g. catalog matching services. For example, if you were previously setting up catsHTM
matching
like this:
from pydantic import BaseModel
from ampel.base.abstract.AbsAlertFilter import AbsAlertFilter
from ampel.contrib.hu import catshtm_server
class GaiaVetoFilter(AbsAlertFilter):
resources = ('catsHTM.default',)
def __init__(self, on_match_t2_units, base_config=None, run_config=None, logger=None):
catshtm_uri = base_confg["catsHTM.default"]
self.catshtm = catshtm_server.get_client(catshtm_uri)
you can inherit from CatsHTMUnit
and simplify to:
from ampel.alert.PhotoAlert import PhotoAlert
from ampel.abstract.AbsAlertFilter import AbsAlertFilter
from ampel.contrib.hu.base.CatsHTMUnit import CatsHTMUnit
class GaiaVetoFilter(CatsHTMUnit, AbsAlertFilter[PhotoAlert]):
...
ExtCatsUnit
is the equivalent for extcats.
Filtering¶
PhotoAlert
is mostly a drop-in replacement for the v0.6 AmpelAlert
. There are important differences, however:
get_values()
uses native field names instead of the internal aliases from v0.6. Usejd
instead ofobs_date
,magpsf
instead ofmag
, etc.The third argument to
get_values()
is now a string rather than a bool. Where you formerly usedget_values(..., upper_limits=True)
to get values from upper limits, useget_values(..., data="uls")
. To get both detections and upper limits, useget_values(..., data="all")
.
The return value of AbsAlertFilter.apply
may now return a bool
or an int
.
If you previously returned
self.on_match_t2_units
to accept an alert and trigger all configured T2s, returnTrue
instead.If you previously returned
False
orNone
to reject an alert, you may continue to do so. You may also return an integer “rejection code” between -255 and -1. You can define these codes however you like, and use them to efficiently query the properties of rejected alerts after the fact.If you previously returned a subset of
self.on_match_t2_units
depending on the exact properties of the alert, return a positive integer instead. This will be interpreted as id of the group of T2s to run.
T2 units¶
T2 units need to be registered in your project’s config.
New base classes¶
There are now 3 different kinds of T2 unit. If your T2 does something other than a light curve analysis, it may be a better fit for one of the new ones:
AbsLightCurveT2Unit
operates on entire light curves, and runs every time a new photopoint or upper limit is added to a transient. This is equivalent to the oldAbsT2Unit
, but can be configured to operate on all photopoints, or on detections only.AbsPointT2Unit
operates on single data points. It can be configured to run on a subset of photopoints, e.g. to run catalog matching on only the first detection.AbsStockT2Unit
operates on the stock (transient) record itself. This can be used to perform some action when the transient is added to a channel.
There are also “tied” variants of these, such as AbsTiedLightCurveT2Unit
, that can be used to build a directed acyclic graph of T2s. In other words, these T2s depend on the output of other T2s.
T2 unit configuration¶
Like T0 units, T2 units declare their configurations using annotations. See T0 unit configuration.
run()
¶
T2 units now have a single configuration, so the run()
method no longer takes a run_config
argument. If your run()
method previously looked like this [contrived] example:
def run(self, light_curve, run_config):
count = len(light_curve.get_values("jd", upper_limits=False))
if run_config["include_upper_limits"]:
count += len(light_curve.get_values("jd", upper_limits=True))
return {"length": count}
it should be replaced with:
def run(self, lightcurve: LightCurve) -> T2UnitResult:
count = len(light_curve.get_values("jd", of_upper_limits=False))
if run_config["include_upper_limits"]:
count += len(light_curve.get_values("jd", of_upper_limits=True))
return {"length": count}
The PEP 484 annotations in the method signature are optional but highly encouraged. If these type hints are present, static type checkers like mypy will be able to spot mistakes like returning the wrong type, calling a method that does not exist or with the wrong arguments, etc.
T3 units¶
T2 units need to be registered in your project’s config.
T3 unit configuration¶
If your T3 authenticates with an external service like Slack or DropBox using a secret token, you should not check this token into your repository. Slack in particular scans all commits to public GitHub repositories and revokes any of its tokens it finds there. Instead, you can use the special Secret
type hint to indicate that the value should be looked up in a separate secret store. For example, if you previously had:
from pydantic import BaseModel, BaseConfig
from ampel.base.abstract.AbsT3Unit import AbsT3Unit
class LooseLipsSinkShips(AbsT3Unit):
class RunConfig(BaseModel):
slack_token: str = "xoxb-216058338329-819573451732-Rjxt1zb9WpjhVZ6H6Y3ZUuHo"
def __init__(self, logger, base_config=None, run_config=None, global_info=None):
self.run_config = self.RunConfig() if run_config is None else run_config
def add(self, views):
token = self.run_config["slack_token"]
...
you should have this:
from typing import Dict, Tuple
from ampel.abstract.AbsT3Unit import AbsT3Unit
from ampel.model.Secret import Secret
from ampel.struct.JournalExtra import JournalExtra
from ampel.type import StockId
from ampel.view.TransientView import TransientView
class Skrytnost(AbsPhotoT3Unit):
slack_token: Secret[str] = {"key": "my-slack-token"} # type: ignore[assignment]
def add(self, transients: Tuple[TransientView, ...]) -> Dict[StockId, JournalExtra]:
token = self.slack_token.get()
...
Again, all type annotations in method signatures (and the associated imports) are optional, but encouraged. The default value of slack_token
tells Ampel to look up the value under the name “my-slack-token” in its secret store. Your T3 instance will be configured with an object whose get()
method returns the value (of the type indicated in []
, or str
if unspecified). This mechanism allows you to specify which token you want by default as a symbolic name rather than a value. The trailing comment instructs mypy to not complain about the assignment.
add()
¶
return a
Dict[StockId,JournalExtra]
instead of a list ofJournalUpdate
. For example, if you were previously doing something like this:jupdates = [] for tran_view in views: jcontent = {'t3unit': self.name, 'reactDict': do_something(tran_view), 'success':success} jupdates.append(JournalUpdate(tran_id=tran_view.tran_id) ext=self.run_config.ext_journal, content=jcontent) return jupdates
you can replace that with:
jupdates = {} for tran_view in views: jupdates[tran_view.id] = JournalExtra(extra={'reactDict': do_something(tran_view), 'success':success}) return jupdates
For current ZTF transients, the ZTF name is the first element of the stock name, e.g.:
transient_view.stock["name"][0]
To be extra-pendantic (and pass all mypy checks), use:
assert view.stock and view.stock["name"] is not None name = next( n for n in view.stock["name"] if isinstance(n, str) and n.startswith("ZTF") )