#!/usr/bin/env python
# -*- coding: utf-8 -*-
# File: Ampel-core/ampel/core/AmpelRegister.py
# License: BSD-3-Clause
# Author: valery brinnel <firstname.lastname@gmail.com>
# Date: 16.05.2020
# Last Modified Date: 04.03.2021
# Last Modified By: valery brinnel <firstname.lastname@gmail.com>
import bson
from time import time
from os.path import isdir, isfile
from pathlib import Path
from struct import calcsize
from typing import BinaryIO, Literal, Any, TypedDict
from ampel.log.AmpelLogger import AmpelLogger, VERBOSE
from ampel.base.AmpelUnit import AmpelUnit
from ampel.util.hash import build_unsafe_dict_id
from ampel.util.register import read_header, write_header, \
get_inner_file_handle, get_outer_file_handle, rescale_header
class HeaderInfo(TypedDict):
size: int
len: int
payload: dict[str, Any]
[docs]
class AmpelRegister(AmpelUnit):
""" # noqa: E101
General notes:
--------------
An ampel register file is made of three parts: signature, header and content
1) The file signature is encoded in the first 11 bytes.
The signature contains the header's length and size (3 bytes each)
2) A BSON encoded dict is saved in the next x bytes, refered to as the file's header.
The header can be updated independently of the register's content.
The size of the header is customizable during file creation.
3) The register's content is usually a compressed structure (zip, bzip or xz)
which stores information in binary format. Compression can be turned off if so whished.
(The size of a register containing 10 millions alert ids [8 bytes] and filter
return code [1 byte] can be reduced from ~90MB to ~50MB using gzip with default settings).
Properties:
-----------
- Registers can be re-opened and appended
- Header content can be accessed or updated independently of the register's content.
- Header updates are fast if enough space was reserved for updates in the first place.
- Header size can be increased afterwards at the cost of having to rewrite the entire file once.
This happens automatically when needed.
- Logging read and write access to the register content into the register header is supported
- Registers can be capped (based on max content length or max number of run ids).
Once the limit is reached, the full register is renamed and a new one is created.
Note: the module ampel.util.register contains tools to manually change ampel registers, such as:
`get_header_content`, `open_file_and_write_header` and `rescale_header`
Target file:
------------
:param path_base: the base folder path where to create/read register files.
:param path_extra: an optional additional folder path (or paths) to be appended to `path_base`.
:param full_path: when provided, all path building options will be ignored and a new
file handle will be opened using the provided full_path
:param file_handle: if you provide a file handle to a register file (parameter file handle),
it will be used and all path options will be ignored.
:param file_cap: registers can be capped, typically based on a maximum number of blocks.
When access to a full register is requested, the original file is renamed and a new register is created.
During files renames, an index is appended (suffix) to the original file name.
The current (newest) register file always comes without index suffix.
This index is saved into the current register header.
Example::
ampel_register.bin.gz (current - newest)
ampel_register.bin.gz.1 (first renamed register - oldest)
ampel_register.bin.gz.2
ampel_register.bin.gz.3 (second newest)
Note1: file renames occur during the opening procedure of registers, no check is performed during the filling of registers
once a register is opened. A register can thus grow beyond the defined limit as long as a process keeps it open.
Header:
-------
Limitations:
- The maximum space reservable for the header is 2**24 bytes, i.e ~16MB.
- integers cannot exceed 2**63 bits. Should you need to save bigger numbers,
please use the methods bindata_to_int, int_to_bindata from module ampel.util.bson
:param new_header_size: either None, or an integer or a string (please read the note above).
- None or 0: the header block size will equal the header encoded length. Choose this option if
the header is not meant to be updated later. Otherwise, updates will only be possible if the header size
does not grow (note that there is a margin allowed since header exceeding the limit are automatically
compressed using zlib and written to disk if the size condition is then fullfilled).
- integer number: the header will be allocated the specified number of bytes (for example 4096).
- a string: refers to a 'header margin' and must start with the character '+'.
This option can save space in some circumstances. The header space allocated for the header will equal
the length of the initial header (including all provided options such as `header_extra`) to which
the specified header margin will be added. For example, '+1024' means that 1024 bytes will be
allocated additionally to the initial header lengths for future updates. If the initial header length
is 100 bytes, then a header block of 1124 bytes will be created.
:param header_extra: any extra to be included in the header under the key 'extra' (must be bson encodable)
:param header_extra_base: any extra to be included in the header at root depth (must be bson encodable)
:param header_update_anyway: if no update to the register is made, the default setting is that the
header is not updated. This settings forces header updates. For example, you might want
to save all run ids into the header whether or not they changed the content of the register.
:param header_log_accesses: if True, timestamps will be recorded each time the register is opened/closed
along with the amount of new blocks appended to the register.
Note 1: parameter `new_header_size` must be set when using this value.
Note 2: see docstring of paramter `header_update_anyway` that affects the behavior of this parameter.
In the following example, `header_log_accesses` is responsible for creating/updating the key 'updated'::
"ts": {
"created": 1590506868.3880599,
"updated": [
[1590506868.3880599, 1590509029.389, 1200],
[1590507152.079873, 1590507295.080478, 2300]
]
}
Errors:
-------
:raises: ValueError, FileNotFoundError
- if neither `file_handle`, `full_path`, `path_base` exist.
- if read access to a non-existing file is requested
- if the target file is not a register file (existing empty files are ok)
- if the 'struct' parameter of sub-class of this class differs with the values registered
in the file's header (this behavior can be deactivated from parameter `on_exist_check`)
- during file renames (in case capped registers are used) if the target file already exists
"""
struct: str
verbose: int = 0
logger: AmpelLogger
#: save files in <path_base>/<file>
path_base: None | str
#: save files in <path_base>/<path_extra(s)>/<file>
path_extra: None | list[str]
#: prefix for each file
file_prefix: None | str
#: ignore all other path options and use this file path
path_full: None | str
file_cap: None | dict[Literal['blocks'], int]
#: use existing file handle
file_handle: None | BinaryIO
#: compression scheme
compression: None | Literal['gz', 'bz2', 'xz'] = 'gz'
#: compression level
compression_level: None | int
# General header options
new_header_size: None | int | str
header_log_accesses: bool = False
header_count_blocks: bool = True
header_extra: None | dict[str, Any]
header_extra_base: None | dict[str, Any]
header_update_anyway: bool = False
# New header options
header_creation_size: None | int
# Which header key to check if file already exists
on_exist_check: None | list[str | tuple[str, str]] = ['struct']
on_exist_strict_check: bool = False
def __init__(self, autoload: bool = True, **kwargs) -> None:
""" See class docstring """
super().__init__(**kwargs)
if not hasattr(self, 'struct'):
raise ValueError("Sub-classes of AmpelRegister must define static field 'struct'")
if self.header_log_accesses and not self.new_header_size:
raise ValueError("Parameter 'new_header_size' is required when using 'header_log_accesses'")
if autoload:
self.load()
def load(self) -> None:
if self.file_handle:
hinfo = read_header(self.file_handle, self.logger if self.verbose > 1 else None)
self._outer_fh = self.file_handle
else:
f_path = self.get_file_path()
hinfo, self._outer_fh = get_outer_file_handle(
f_path, write=True, logger=self.logger if self.verbose > 1 else None
)
if self._outer_fh is None:
raise ValueError(f"{f_path}: cannot get file handle")
# File exists (and is not empty)
if hinfo:
if self.verbose > 1:
self.logger.debug("Header loaded", extra=hinfo['payload'])
if self.on_exist_check:
self.check_header(hinfo['payload'])
if self.file_cap and self.check_rename(hinfo['payload']):
self._outer_fh = self.rename_file(self._outer_fh, hinfo['payload'])
hinfo = None
else:
self.header = hinfo
# Update file access
if self.header_log_accesses:
self.register_file_access()
# Hook for sub-classes
self.onload_update_header()
if hinfo is None:
if self.verbose > 1:
self.logger.debug("Generating new header")
header_bytes = self.gen_new_header()
write_header(
self._outer_fh, header=header_bytes, hsize=self.header['size'],
logger=self.logger if self.verbose else None
)
self._inner_fh = get_inner_file_handle(
self._outer_fh, write=True,
logger=self.logger if self.verbose > 0 else None
)
# Non-compressed file returns the EOF position when opened in mode 'ab'
# compressed file return 0
if self.compression is None:
self._ftell = self._inner_fh.tell()
self.header_sig = build_unsafe_dict_id(self.header['payload'])
[docs]
def check_rename(self, header: dict[str, Any]) -> bool:
""" override if needed """
if not self.file_cap:
return False
if 'blocks' in self.file_cap:
if header['blocks'] >= self.file_cap['blocks']:
return True
if self.verbose > 1:
self.logger.debug("File rotation trigger not reached")
return False
self.logger.warn(f"Unknown 'file_cap' value: {self.file_cap}")
return False
def rename_file(self, fh: BinaryIO, header: dict[str, Any]) -> BinaryIO:
fh.close()
self.file_index = header.get('findex', 0) + 1
from os import rename
target_file_path = f"{fh.name}.{self.file_index}"
# we might handle this rather than raising an error in the future
if isfile(target_file_path):
raise ValueError(f"File rotation failure: {target_file_path} already exists")
rename(fh.name, target_file_path)
if self.verbose > 0:
self.logger.info(f"Current register renamed into {target_file_path}")
return get_outer_file_handle(
fh.name, write=True, logger=self.logger if self.verbose > 1 else None
)[1]
[docs]
def get_file_path(self) -> str:
""" :raise: errors if sub-directories cannot be created """
if self.path_full:
outdir = self.path_full
else:
if not self.path_base:
raise ValueError("Parameter path_base is not set")
outdir = self.path_base
if self.path_extra:
outdir += '/' + '/'.join(self.path_extra)
if not isdir(outdir):
Path(outdir).mkdir(parents=True)
return f"{outdir}/{self.get_file_name()}"
[docs]
def get_file_name(self) -> str:
""" override if needed """
return '.'.join([
self.file_prefix or 'ampel_register',
f'bin.{self.compression}' if self.compression else 'bin'
])
[docs]
def register_file_access(self,
header: None | dict[str, Any] = None,
use_this_time: None | float = None,
new_blocks: None | int = None,
) -> None:
"""
:param header: use provided header rather than self.header['payload']
:param use_this_time: use provided time rather than time.time()
:param new_blocks: None (default) when register is opened, 0 or an integer when register is closed
"""
hdr = header if header else self.header['payload']
now = use_this_time if use_this_time else time()
# File opened
if new_blocks is None:
if 'updated' in hdr['ts']:
hdr['ts']['updated'].append([now, 0., 0])
else: # new file
hdr['ts']['updated'] = [[now, 0., 0]]
return
l = hdr['ts']['updated'][-1]
l[1] = now
l[2] = new_blocks
def __del__(self):
""" method called when class is destroyed """
if getattr(self, '_outer_fh', None):
if self.verbose > 1:
self.logger.debug("Destroying ampel register instance")
self.close()
[docs]
def close(self, close_outer_fh: bool = True, update_header: bool = True):
"""
:param close_outer_fh: whether principal file handle should be closed
:param update_header: possible overidde of default settings
(aimed for admins working with command line)
"""
file_updated = 0
# Important: zip file handle should be closed before header is updated
if hasattr(self, '_inner_fh'):
self._inner_fh.flush()
if self.compression:
file_updated = self._inner_fh.tell()
else:
file_updated = self._inner_fh.tell() - self._ftell
# When no compression is used, inner_fh is the outer_fh
if self._inner_fh != self._outer_fh:
if self.verbose > 1:
self.logger.debug("Closing inner file")
self._inner_fh.close()
self._inner_fh = None # type: ignore
if not self._outer_fh or self._outer_fh.closed:
self.logger.info("File handle already closed")
return
else:
if self.verbose:
self.logger.log(VERBOSE, f"Closing {self.get_file_path()}")
if hasattr(self, 'header') and update_header:
if file_updated or self.header_update_anyway:
if self.header_count_blocks or self.header_log_accesses:
new_blocks = int(file_updated / calcsize(self.struct))
if self.header_count_blocks:
self.header['payload']['blocks'] += new_blocks
if self.header_log_accesses:
self.register_file_access(new_blocks=new_blocks)
if (
self.header_log_accesses or self.header_count_blocks or
build_unsafe_dict_id(self.header['payload']) != self.header_sig
):
if self.verbose:
self.logger.log(VERBOSE, "Header has changed, triggering update")
try:
write_header(
self._outer_fh, header=self.header['payload'], hsize=self.header['size'],
flush=False, logger=self.logger if self.verbose else None
)
except ValueError:
self.logger.warn("Header still too long, rescaling it")
self._outer_fh.flush()
self._outer_fh.close()
rescale_header(
self._outer_fh.name, new_size = self.header['size'] * 2, remove_old_file = True,
header = self.header['payload']
)
self._outer_fh = None # type: ignore[assignment]
elif self.verbose > 1:
self.logger.debug("Header was not updated")
elif self.verbose > 1:
self.logger.debug("File was not updated, skipping potential header updates")
elif self.verbose > 1:
self.logger.debug("Header update is disabled")
if self._outer_fh:
self._outer_fh.flush()
if close_outer_fh:
if self._outer_fh:
self._outer_fh.close()
self._outer_fh = None # type: ignore[assignment]