Source code for ampel.t3.T3Processor

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# File:                Ampel-core/ampel/t3/T3Processor.py
# License:             BSD-3-Clause
# Author:              valery brinnel <firstname.lastname@gmail.com>
# Date:                26.02.2018
# Last Modified Date:  03.04.2023
# Last Modified By:    valery brinnel <firstname.lastname@gmail.com>

from typing import Any, Annotated

from ampel.types import ChannelId
from ampel.abstract.AbsEventUnit import AbsEventUnit
from ampel.abstract.AbsT3Supplier import AbsT3Supplier
from ampel.abstract.AbsT3Stager import AbsT3Stager
from ampel.struct.T3Store import T3Store
from ampel.model.UnitModel import UnitModel
from ampel.model.t3.T3IncludeDirective import T3IncludeDirective
from ampel.core.EventHandler import EventHandler
from ampel.log import AmpelLogger, LogFlag, SHOUT


[docs] class T3Processor(AbsEventUnit): # Require single channel for now (super classes allow multi-channel) channel: None | ChannelId = None include: None | T3IncludeDirective #: Unit must be a subclass of AbsT3Supplier supply: Annotated[UnitModel, AbsT3Supplier] #: Unit must be a subclass of AbsT3Stager stage: Annotated[UnitModel, AbsT3Stager] def post_init(self): if self.supply.unit not in self.context.config._config['unit']: raise ValueError(f"Unknown supply unit: {self.supply.unit}") if self.stage.unit not in self.context.config._config['unit']: raise ValueError(f"Unknown stager unit: {self.stage.unit}") def proceed(self, event_hdlr: EventHandler) -> None: event_hdlr.set_tier(3) logger = AmpelLogger.from_profile( self.context, self.log_profile, event_hdlr.get_run_id(), base_flag = LogFlag.T3 | LogFlag.CORE | self.base_log_flag, force_refresh = True ) try: # Feedback logger.log(SHOUT, f'Running {self.process_name}') t3s = T3Store() if self.include: if self.include.docs: pass # later if (x := self.include.session): sdict: dict[str, Any] = {} for model in [x] if isinstance(x, UnitModel) else x: rd = self.context.loader.new_context_unit( model = model, context = self.context, sub_type = AbsT3Supplier, event_hdlr = event_hdlr, logger = logger ).supply(t3s) if rd: sdict |= rd if sdict: t3s.add_session_info(sdict) supplier = self.context.loader.new_context_unit( model = self.supply, context = self.context, sub_type = AbsT3Supplier, logger = logger, event_hdlr = event_hdlr ) # Stager unit ############# stager = self.context.loader.new_context_unit( model = self.stage, context = self.context, sub_type = AbsT3Stager, logger = logger, event_hdlr = event_hdlr, channel = ( self.stage.config['channel'] # type: ignore if self.stage.config and self.stage.config.get('channel') # type: ignore[union-attr] else self.channel ) ) logger.info("Running stager", extra={'unit': self.stage.unit}) if (doc_gen := stager.stage(supplier.supply(t3s), t3s)): for t3d in doc_gen: if 'meta' not in t3d: raise ValueError("Invalid T3Document") t3d['meta']['traceid'] = {'t3processor': self._trace_id} if event_hdlr.job_sig: t3d['meta']['jobid'] = event_hdlr.job_sig self.context.db.get_collection('t3').insert_one(t3d) # type: ignore[arg-type] """ if t3s.resources: for v in t3s.resources.values(): event_hdlr.add_resource(v, overwrite=self.allow_resource_override) if t3s.aliases: for k, v in t3s.aliases.items(): event_hdlr.add_alias(k, v, overwrite=self.allow_alias_override) """ except Exception as e: event_hdlr.handle_error(e, logger) # Feedback logger.log(SHOUT, f'Done running {self.process_name}') logger.flush()