Source code for columnflow.ml

# coding: utf-8

"""
Definition of basic objects for describing and creating ML models.
"""

from __future__ import annotations

import abc
from collections import OrderedDict

import law
import order as od

from columnflow.columnar_util import Route
from columnflow.util import maybe_import, Derivable, DotDict, KeyValueMessage
from columnflow.types import TYPE_CHECKING, Any, Sequence

if TYPE_CHECKING:
    ak = maybe_import("awkward")


[docs] class MLModel(Derivable): """ Minimal interface to ML models with connections to config objects (such as py:class:`order.Config` or a :py:class:`order.Dataset`) and, on an optional basis, to tasks. Inheriting classes need to overwrite eight methods: - :py:meth:`sandbox` - :py:meth:`datasets` - :py:meth:`uses` - :py:meth:`produces` - :py:meth:`output` - :py:meth:`open_model` - :py:meth:`train` - :py:meth:`evaluate` See their documentation below for more info. There are several optional hooks that allow for a custom setup after config objects were assigned (:py:meth:`setup`), a fine-grained configuration of additional training requirements (:py:meth:`requires`), diverging training and evaluation phase spaces (:py:meth:`training_configs`, :py:meth:`training_calibrators`, :py:meth:`training_selector`, :py:meth:`training_producers`, :py:meth:`evaluation_producers`), or how hyper-paramaters are string encoded for output declarations (:py:meth:`parameter_pairs`). The optional py:meth:`preparation_producer` allows setting a producer that is run during the initial preparation of ML columns. .. py:classattribute:: single_config type: bool The default flag that marks whether this model only accepts a single config object in case no value is passed in the constructor. Converted into an instance attribute upon instantiation. .. py:classattribute:: folds type: int The default number of folds for the k-fold cross-validation in case no value is passed in the constructor. Converted into an instance attribute upon instantiation. .. py:classattribute:: store_name type: str, None The default name for storing input data in case no value is passed in the constructor. When *None*, the name of the model class is used instead. Converted into an instance attribute upon instantiation. .. py:attribute:: analysis_inst type: order.Analysis Reference to the :py:class:`order.Analysis` object. .. py:attribute:: parameters type: OrderedDict A dictionary mapping parameter names to arbitrary values, such as ``{"layers": 5, "units": 128}``. .. py:attribute:: used_datasets type: dict read-only Sets of :py:class:`order.Dataset` instances that are used by the model training, mapped to their corresponding :py:class:`order.Config` instances. .. py:attribute:: used_columns type: set read-only Column names or :py:class:`Route`'s that are used by this model, mapped to :py:class:`order.Config` instances they belong to. .. py:attribute:: produced_columns type: set read-only Column names or :py:class:`Route`'s that are produces by this model, mapped to :py:class:`order.Config` instances they belong to. """ # default setting mark whether this model accepts only a single config single_config: bool = False # default number of folds folds: int = 2 # default name for storing e.g. input data # falls back to cls_name if None store_name: str | None = None # flag denoting whether the preparation_producer is invoked before evaluate() preparation_producer_in_ml_evaluation: bool = True # names of attributes that are automatically extracted from init kwargs and # fall back to classmembers in case they are missing init_attributes: list[str] = ["single_config", "folds", "store_name", "preparation_producer_in_ml_evaluation"] def __init__( self: MLModel, analysis_inst: od.Analysis, *, parameters: OrderedDict | None = None, **kwargs, ) -> None: super().__init__() # store attributes self.analysis_inst = analysis_inst self.parameters = OrderedDict(parameters or {}) # set instance members based on registered init attributes for attr in self.init_attributes: # get the class-level attribute value = getattr(self, attr) # get the value from kwargs _value = kwargs.get(attr, law.no_value) if _value != law.no_value: value = _value # set the instance-level attribute setattr(self, attr, value) # list of config instances self.config_insts = [] if "configs" in kwargs: self._setup(kwargs["configs"]) def __str__(self): """ Returns a string representation of this model instance. The string is composed of the class name and the string representation of all parameters. """ model_str = f"{self.cls_name}" if self.parameters_repr: model_str += f"__{self.parameters_repr}" return model_str @property def config_inst(self: MLModel) -> od.Config: if self.single_config and len(self.config_insts) != 1: raise Exception( f"the config_inst property requires MLModel '{self.cls_name}' to have the " "single_config enabled to to contain exactly one config instance, but found " f"{len(self.config_insts)}", ) return self.config_insts[0] def _assert_configs(self: MLModel, msg: str) -> None: """ Raises an exception showing *msg* in case this model's :py:attr:`config_insts` is empty. """ if not self.config_insts: raise Exception(f"MLModel '{self.cls_name}' has no config instances, {msg}") def _format_value(self: MLModel, value: Any) -> str: """ Formats any paramter *value* to a readable string. """ if isinstance(value, (list, tuple)): return "_".join(map(self._format_value, value)) if isinstance(value, bool): return str(value).lower() if isinstance(value, float): # scientific notation when too small return f"{value}" if value >= 0.01 else f"{value:.2e}" # any other case return str(value) @property def parameters_repr(self: MLModel) -> str: """ Returns a hash of string representation of all parameters. This is used to uniquely identify a model instance based on its parameters. :raises: Exception in case the parameters_repr changed after it was set. :returns: String representation of all parameters. """ if not self.parameters: return "" parameters_repr = law.util.create_hash(self._join_parameter_pairs(only_significant=True)) if hasattr(self, "_parameters_repr") and self._parameters_repr != parameters_repr: raise Exception( f"parameters_repr changed from {self._parameters_repr} to {parameters_repr};" "this should not happen", ) self._parameters_repr = parameters_repr return self._parameters_repr def _join_parameter_pairs(self: MLModel, only_significant: bool = True) -> str: """ Returns a joined string representation of all significant parameters. In this context, significant parameters are those that potentially lead to different results (e.g. network architecture parameters as opposed to some log level). """ return "__".join( f"{name}_{self._format_value(value)}" for name, value in self.parameter_pairs(only_significant=True) )
[docs] def parameter_pairs(self: MLModel, only_significant: bool = False) -> list[tuple[str, Any]]: """ Returns a sorted list of all parameter name-value tuples. In this context, significant parameters are those that potentially lead to different results (e.g. network architecture parameters as opposed to some log level). """ return sorted(self.parameters.items())
@property def accepts_scheduler_messages(self: MLModel) -> bool: """ Whether the training or evaluation loop expects and works with messages sent from a central luigi scheduler through the active worker to the underlying task. See :py:meth:`get_scheduler_messages` for more info. """ return True
[docs] def get_scheduler_messages(self: MLModel, task: law.Task) -> DotDict[str, KeyValueMessage]: """ Checks if the *task* obtained messages from a central luigi scheduler, parses them expecting key - value pairs, and returns them in an ordered :py:class:`DotDict`. All values are :py:class:`KeyValueMessage` objects (with ``key``, ``value`` and ``respond()`` members). Scheduler messages are only sent while the task is actively running, so it most likely only makes sense to expect and react to messages during training and evaluation loops. """ messages = DotDict() if task.accepts_messages and task.scheduler_messages: while not self.scheduler_messages.empty(): msg = KeyValueMessage.from_message(self.scheduler_messages.get()) if msg: messages[msg.key] = msg return messages
def _set_configs(self: MLModel, configs: list[str | od.Config]) -> None: # complain when only a single config is accepted if self.single_config and len(configs) > 1: raise Exception( f"MLModel '{self.cls_name}' only accepts a single config but received " f"{len(configs)}: {','.join(map(str, configs))}", ) # remove existing config instances del self.config_insts[:] # add them one by one for config in configs: config_inst = ( config if isinstance(config, od.Config) else self.analysis_inst.get_config(config) ) self.config_insts.append(config_inst) def _setup(self: MLModel, configs: list[str | od.Config] | None = None) -> None: # setup configs if configs: self._set_configs(configs) # setup hook self.setup() @property def used_columns(self: MLModel) -> dict[od.Config, set[Route]]: self._assert_configs("cannot determined used columns") return { config_inst: set(map(Route, self.uses(config_inst))) for config_inst in self.config_insts } @property def produced_columns(self: MLModel) -> dict[od.Config, set[Route]]: """ Helper function to resolve column names of produced with this MLModel instance. :returns: Set of column names """ self._assert_configs("cannot determined produced columns") return { config_inst: set(map(Route, self.produces(config_inst))) for config_inst in self.config_insts } @property def used_datasets(self: MLModel) -> dict[od.Config, set[od.Dataset]]: self._assert_configs("cannot determined used datasets") return { config_inst: set(self.datasets(config_inst)) for config_inst in self.config_insts }
[docs] def setup(self: MLModel) -> None: """ Hook that is called after the model has been setup and its :py:attr:`config_insts` were assigned. """ return
[docs] def requires(self: MLModel, task: law.Task) -> Any: """ Returns tasks that are required for the training to run and whose outputs are needed. """ return {}
[docs] def training_configs( self: MLModel, requested_configs: Sequence[str], ) -> list[str]: """ Given a sequence of names of requested :py:class:`order.Config` objects, *requested_configs*, this method can alter and/or replace them to define a different (set of) config(s) for the preprocessing and training pipeline. This can be helpful in cases where training and evaluation phase spaces, as well as the required input datasets and/or columns are intended to diverge. """ return list(requested_configs)
[docs] def training_calibrators( self: MLModel, analysis_inst: od.Analysis, requested_calibrators: Sequence[str], ) -> list[str]: """ Given a sequence of *requested_calibrators* for a *analysis_inst*, this method can alter and/or replace them to define a different set of calibrators for the preprocessing and training pipeline. This can be helpful in cases where training and evaluation phase spaces, as well as the required input columns are intended to diverge. Example usage: .. literalinclude:: ../../user_guide/examples/ml_code.py :language: python :pyobject: TestModel.training_calibrators :param analysis_inst: Analysis instance to extract the *requested_calibrators* from :returns: Set with str of the *requested_calibrators* """ return list(requested_calibrators)
[docs] def training_selector( self: MLModel, analysis_inst: od.Analysis, requested_selector: str, ) -> str: """ Given a *requested_selector* for a *analysis_inst*, this method can change it to define a different selector for the preprocessing and training pipeline. This can be helpful in cases where training and evaluation phase spaces, as well as the required input columns are intended to diverge. Example usage: .. literalinclude:: ../../user_guide/examples/ml_code.py :language: python :pyobject: TestModel.training_selector :param analysis_inst: Analysis instance to extract the *requested_selector* from :returns: Set with str of the *requested_selector* """ return requested_selector
[docs] def training_producers( self: MLModel, analysis_inst: od.Analysis, requested_producers: Sequence[str], ) -> list[str]: """ Given a sequence of *requested_producers* for a *analysis_inst*, this method can alter and/or replace them to define a different set of producers for the preprocessing and training pipeline. This can be helpful in cases where training and evaluation phase spaces, as well as the required input columns are intended to diverge. Example usage: .. literalinclude:: ../../user_guide/examples/ml_code.py :language: python :pyobject: TestModel.training_producers :param analysis_inst: Analysis instance to extract the *requested_producers* from :returns: Set with str of the *requested_producers* """ return list(requested_producers)
[docs] def evaluation_producers( self: MLModel, analysis_inst: od.Analysis, requested_producers: Sequence[str], ) -> list[str]: """ Given a sequence of *requested_producers* for a *analysis_inst*, this method can alter and/or replace them to define a different set of producers for the evaluation phase of the ML pipeline. This can be helpful in cases where the producers in the evaluation phase and subsequent tasks are intended to diverge. Example usage: .. literalinclude:: ../../user_guide/examples/ml_code.py :language: python :pyobject: TestModel.evaluation_producers :param analysis_inst: Analysis instance to extract the *requested_producers* from :returns: Set with str of the *requested_producers* """ return list(requested_producers)
[docs] def preparation_producer( self: MLModel, analysis_inst: od.Analysis, ) -> str | None: """ This method allows setting a producer that can be called as part of the preparation of the ML input columns given a *analysis_inst*. :param analysis_inst: :py:class:`~order.Analysis` object for which the producer should run. :return: Name of a :py:class:`Producer` class or *None*. """ return None
[docs] @abc.abstractmethod def sandbox(self: MLModel, task: law.Task) -> str: """ Given a *task*, returns the name of a sandbox that is needed to perform model training and evaluation. Example usage: .. literalinclude:: ../../user_guide/examples/ml_code.py :language: python :pyobject: TestModel.sandbox :param task: Task instance to extract the datasets from :returns: path to the requested sandbox, optinally prefixed by the executing shell command with trailing :: as separator """ return
[docs] @abc.abstractmethod def datasets(self: MLModel, config_inst: od.Config) -> set[od.Dataset]: """ Returns a set of all required datasets for a certain *config_inst*. To be implemented in subclasses. Example usage: .. literalinclude:: ../../user_guide/examples/ml_code.py :language: python :pyobject: TestModel.datasets :param config_inst: Config instance to extract the datasets from :returns: Set with :py:class:`~order.dataset.Dataset` instances """ return
[docs] @abc.abstractmethod def uses(self: MLModel, config_inst: od.Config) -> set[Route]: """ Returns a set of all required columns for a certain *config_inst*. To be implemented in subclasses. Example usage: .. literalinclude:: ../../user_guide/examples/ml_code.py :language: python :pyobject: TestModel.uses :param config_inst: Config instance to extract the datasets from :returns: Set with str of required columns """ return
[docs] @abc.abstractmethod def produces(self: MLModel, config_inst: od.Config) -> set[Route]: """ Returns a set of all produced columns for a certain *config_inst*. To be implemented in subclasses. Example usage: .. literalinclude:: ../../user_guide/examples/ml_code.py :language: python :pyobject: TestModel.produces :param config_inst: Config instance to extract the datasets from :returns: Set with str of produced columns """ return
[docs] @abc.abstractmethod def output(self: MLModel, task: law.Task) -> Any: """ Returns a structure of output targets. To be implemented in subclasses. Example usage: .. literalinclude:: ../../user_guide/examples/ml_code.py :language: python :pyobject: TestModel.output :param task: Task instance used extract task related information :returns: Instance of :py:class:`~law.DirectoryTarget`, containing the path to directory. """ return
[docs] @abc.abstractmethod def open_model(self: MLModel, target: Any) -> Any: """ Implemenents the opening of a trained model from *target* (corresponding to the structure returned by :py:meth:`output`). To be implemented in subclasses. Example usage: .. literalinclude:: ../../user_guide/examples/ml_code.py :language: python :pyobject: TestModel.open_model :param target: Instance of :py:class:`~law.DirectoryTarget`, contains path to directory holding the machine learning model. :returns: Machine learning model instance """ return
[docs] @abc.abstractmethod def train( self: MLModel, task: law.Task, input: Any, output: Any, ) -> None: """ Performs the creation and training of a model, being passed a *task* and its *input* and *output*. To be implemented in subclasses. Example usage: .. literalinclude:: ../../user_guide/examples/ml_code.py :language: python :pyobject: TestModel.train :param task: Task instance used extract task related information :param input: List of instances of :py:class:`~law.DirectoryTarget`, containing the paths of all required *input* files :param output: Instance of :py:class:`~law.DirectoryTarget`, contain path to *target* directory of the task :returns: None """ return
[docs] @abc.abstractmethod def evaluate( self: MLModel, task: law.Task, events: ak.Array, models: list[Any], fold_indices: ak.Array, events_used_in_training: bool = False, ) -> ak.Array: """ Performs the model evaluation for a *task* on a chunk of *events* and returns them. The list of *models* corresponds to the number of folds generated by this model, and the already evaluated *fold_indices* for this event chunk that might used depending on *events_used_in_training*. To be implemented in subclasses. Example usage: .. literalinclude:: ../../user_guide/examples/ml_code.py :language: python :pyobject: TestModel.evaluate :param task: Task instance used to extract task related information :param events: Awkward Array containing the events to evaluate :param models: List containing trained models :param fold_indices: Awkward Array containing the indices of the folds used for training :param events_used_in_training: Boolean flag to indicate if events were used during training :returns: Awkward array containing events with additional columns """ return