Source code for ampel.log.AmpelLogger

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# File:                Ampel-core/ampel/log/AmpelLogger.py
# License:             BSD-3-Clause
# Author:              valery brinnel <firstname.lastname@gmail.com>
# Date:                27.09.2018
# Last Modified Date:  20.04.2022
# Last Modified By:    Jakob van Santen <jakob.van.santen@desy.de>

import logging, sys, traceback
from sys import _getframe
from os.path import basename
from typing import Any, TYPE_CHECKING
from ampel.types import ChannelId
from ampel.log.LightLogRecord import LightLogRecord
from ampel.log.LogFlag import LogFlag
from ampel.protocol.LoggingHandlerProtocol import LoggingHandlerProtocol, AggregatingLoggingHandlerProtocol
from ampel.log.handlers.AmpelStreamHandler import AmpelStreamHandler

if TYPE_CHECKING:
	from ampel.mongo.update.var.DBLoggingHandler import DBLoggingHandler

ERROR = LogFlag.ERROR
WARNING = LogFlag.WARNING
SHOUT = LogFlag.SHOUT
INFO = LogFlag.INFO
VERBOSE = LogFlag.VERBOSE
DEBUG = LogFlag.DEBUG

if TYPE_CHECKING:
	from ampel.core.AmpelContext import AmpelContext

[docs] class AmpelLogger: loggers: dict[int | str, 'AmpelLogger'] = {} _counter: int = 0 verbose: int = 0
[docs] @classmethod def get_logger(cls, name: None | int | str = None, force_refresh: bool = False, **kwargs) -> 'AmpelLogger': """ Creates or returns an instance of :obj:`AmpelLogger <ampel.log.AmpelLogger>` that is registered in static dict 'loggers' using the provided name as key. If a logger with the given name already exists, the existing logger instance is returned. If name is None, unique (int) name will be generated :param ``**kwargs``: passed to constructor Typical use:\n .. sourcecode:: python\n logger = AmpelLogger.get_logger() """ if not name: cls._counter += 1 name = cls._counter if name not in AmpelLogger.loggers or force_refresh: AmpelLogger.loggers[name] = AmpelLogger(name=name, **kwargs) return AmpelLogger.loggers[name]
@staticmethod def from_profile(context: 'AmpelContext', profile: str, run_id: None | int = None, **kwargs) -> 'AmpelLogger': handlers = context.config.get(f'logging.{profile}', dict, raise_exc=True) logger = AmpelLogger.get_logger(console=False, **kwargs) if "db" in handlers: # avoid circular import from ampel.mongo.update.var.DBLoggingHandler import DBLoggingHandler if run_id is None: raise ValueError("Parameter 'run_id' is required when log_profile requires db logging handler") logger.addHandler( DBLoggingHandler(context.db, run_id, **handlers['db']) ) if "console" in handlers: logger.addHandler( AmpelStreamHandler(**handlers['console']) ) return logger @staticmethod def get_console_level(context: 'AmpelContext', profile: str) -> None | int: handlers = context.config.get(f'logging.{profile}', dict, raise_exc=True) if "console" in handlers: if 'level' in handlers['console']: return handlers['console']['level'] return LogFlag.INFO.__int__() return None @classmethod def has_verbose_console(cls, context: 'AmpelContext', profile: str) -> bool: if lvl := cls.get_console_level(context, profile): return lvl < INFO return False def __init__(self, name: int | str = 0, base_flag: None | LogFlag = None, handlers: None | list[LoggingHandlerProtocol | AggregatingLoggingHandlerProtocol] = None, channel: None | ChannelId | list[ChannelId] = None, # See AmpelStreamHandler annotations for more details console: None | bool | dict[str, Any] = True ) -> None: self.name = name self.base_flag = base_flag.__int__() if base_flag else 0 self.handlers = handlers or [] self.channel = channel self.level = 0 self.fname = _getframe().f_code.co_filename if console: self.addHandler( AmpelStreamHandler() if console is True else AmpelStreamHandler(**console) # type: ignore ) else: self.provenance = False self._auto_level() def _auto_level(self): self.level = min([h.level for h in self.handlers]) if self.handlers else 0 if self.level < INFO: self.verbose = 2 if self.level < VERBOSE else 1 else: if self.verbose != 0: self.verbose = 0 def addHandler(self, handler: LoggingHandlerProtocol) -> None: if handler.level < self.level: self.level = handler.level if isinstance(handler, AmpelStreamHandler) and handler.provenance: self.provenance = True if self.level < INFO: self.verbose = 2 if self.level < VERBOSE else 1 self.handlers.append(handler) def removeHandler(self, handler: LoggingHandlerProtocol) -> None: self.handlers.remove(handler) self._auto_level() def get_db_logging_handler(self) -> 'None | DBLoggingHandler': # avoid circular import from ampel.mongo.update.var.DBLoggingHandler import DBLoggingHandler for el in self.handlers: if isinstance(el, DBLoggingHandler): return el return None def break_aggregation(self) -> None: for el in self.handlers: if isinstance(el, (AggregatingLoggingHandlerProtocol, AmpelStreamHandler)): el.break_aggregation() def error(self, msg: str | dict[str, Any], *args, exc_info: None | Exception = None, extra: None | dict[str, Any] = None, ): self.log(ERROR, msg, *args, exc_info=exc_info, extra=extra) def warn(self, msg: str | dict[str, Any], *args, extra: None | dict[str, Any] = None, ): if self.level <= WARNING: self.log(WARNING, msg, *args, extra=extra) def info(self, msg: None | str | dict[str, Any], *args, extra: None | dict[str, Any] = None, ) -> None: if self.level <= INFO: self.log(INFO, msg, *args, extra=extra) def debug(self, msg: None | str | dict[str, Any], *args, extra: None | dict[str, Any] = None, ): if self.level <= DEBUG: self.log(DEBUG, msg, *args, extra=extra) def handle(self, record: LightLogRecord | logging.LogRecord) -> None: for h in self.handlers: if record.levelno >= h.level: h.handle(record) def flush(self) -> None: for h in self.handlers: h.flush() def log(self, lvl: int, msg: None | str | dict[str, Any], *args, exc_info: None | bool | Exception = None, extra: None | dict[str, Any] = None, ): if args and isinstance(msg, str): msg = msg % args record = LightLogRecord(name=self.name, levelno=lvl | self.base_flag, msg=msg) if lvl > WARNING or self.provenance: frame = _getframe(1) # logger.log(...) was called directly if frame.f_code.co_filename == self.fname: frame = _getframe(2) # logger.info(...), logger.debug(...) was used record.__dict__['filename'] = basename(frame.f_code.co_filename) record.__dict__['lineno'] = frame.f_lineno if extra: extra = dict(extra) if (stock := extra.pop("stock", None)): record.stock = stock if (channel := (extra.pop("channel", None) or self.channel)): record.channel = channel if (unit := (extra.pop("unit", None))): record.unit = unit record.extra = extra if exc_info: if exc_info == 1: exc_info = sys.exc_info() # type: ignore lines = traceback.format_exception(*sys.exc_info()) elif isinstance(exc_info, tuple): lines = traceback.format_exception(*sys.exc_info()) elif isinstance(exc_info, Exception): lines = traceback.format_exception( type(exc_info), exc_info, exc_info.__traceback__ ) else: lines = [] erec = AmpelLogger.fork_rec(record, "\n") for h in self.handlers: h.handle(erec) for el in lines: for l in el.split('\n'): if not l: continue erec = AmpelLogger.fork_rec(record, l) for h in self.handlers: h.handle(erec) if record.msg: rec2 = AmpelLogger.fork_rec(record, "-" * len(record.msg)) for h in self.handlers: h.handle(record) h.handle(rec2) return for h in self.handlers: if record.levelno >= h.level: h.handle(record) @staticmethod def fork_rec(orig: LightLogRecord, msg: str) -> LightLogRecord: new_rec = LightLogRecord(name=0, msg=None, levelno=0) for k, v in orig.__dict__.items(): new_rec.__dict__[k] = v new_rec.msg = msg return new_rec