Source code for columnflow.selection

# coding: utf-8

"""
Object and event selection tools.
"""

from __future__ import annotations

import copy
import inspect

import law
import order as od

from columnflow.calibration import TaskArrayFunctionWithCalibratorRequirements
from columnflow.util import maybe_import, DotDict, DerivableMeta, UNSET
from columnflow.types import Callable, T, Sequence, UNSET_TYPE

ak = maybe_import("awkward")


[docs] class Selector(TaskArrayFunctionWithCalibratorRequirements): """ Base class for all selectors. """ exposed = False # register attributes for arguments accepted by decorator mc_only: bool = False data_only: bool = False def __init__(self: Selector, *args, **kwargs) -> None: super().__init__(*args, **kwargs) # when not exposed and call_force is not specified, # set it to True which prevents calls from being cached if self.call_force is None and not self.exposed: self.call_force = True
[docs] @classmethod def selector( cls, func: Callable | None = None, bases=(), mc_only: bool | UNSET_TYPE = UNSET, data_only: bool | UNSET_TYPE = UNSET, require_calibrators: Sequence[str] | set[str] | None | UNSET_TYPE = UNSET, **kwargs, ) -> DerivableMeta | Callable: """ Decorator for creating a new :py:class:`~.Selector` subclass with additional, optional *bases* and attaching the decorated function to it as ``call_func``. When *mc_only* (*data_only*) is *True*, the selector is skipped and not considered by other calibrators, selectors and producers in case they are evaluated on a :py:class:`order.Dataset` (using the :py:attr:`dataset_inst` attribute) whose ``is_mc`` (``is_data``) attribute is *False*. All additional *kwargs* are added as class members of the new subclasses. :param func: Function to be wrapped and integrated into new :py:class:`Selector` class. :param bases: Additional bases for the new :py:class:`Selector`. :param mc_only: Boolean flag indicating that this :py:class:`Selector` should only run on Monte Carlo simulation and skipped for real data. :param data_only: Boolean flag indicating that this :py:class:`Selector` should only run on real data and skipped for Monte Carlo simulation. :param require_calibrators: Sequence of names of calibrators to add to the requirements. :return: New :py:class:`Selector` subclass. """ def decorator(func: Callable) -> DerivableMeta: # create the class dict cls_dict = {**kwargs, "call_func": func} if mc_only is not UNSET: cls_dict["mc_only"] = mc_only if data_only is not UNSET: cls_dict["data_only"] = data_only if require_calibrators is not UNSET: cls_dict["require_calibrators"] = require_calibrators # get the module name frame = inspect.stack()[1] module = inspect.getmodule(frame[0]) # get the selector name cls_name = cls_dict.pop("cls_name", func.__name__) # hook to update the class dict during class derivation def update_cls_dict(cls_name, cls_dict, get_attr): mc_only = get_attr("mc_only") data_only = get_attr("data_only") # optionally add skip function if mc_only and data_only: raise Exception(f"selector {cls_name} received both mc_only and data_only") if (mc_only or data_only) and cls_dict.get("skip_func"): raise Exception( f"selector {cls_name} received custom skip_func, but either mc_only or data_only are set", ) if "skip_func" not in cls_dict: def skip_func(self, **kwargs) -> bool: # check mc_only and data_only if mc_only and not self.dataset_inst.is_mc: return True if data_only and not self.dataset_inst.is_data: return True # in all other cases, do not skip return False cls_dict["skip_func"] = skip_func return cls_dict cls_dict["update_cls_dict"] = update_cls_dict # create the subclass subclass = cls.derive(cls_name, bases=bases, cls_dict=cls_dict, module=module) return subclass return decorator(func) if func else decorator
# shorthand selector = Selector.selector
[docs] class SelectionResult(od.AuxDataMixin): """ Lightweight class that wraps selection decisions (e.g. event and object selection steps). Additionally, this class provides convenience methods to merge them or to dump them into an awkward array. Arbitrary, auxiliary information (additional arrays, or other objects) that should not be stored in dumped akward arrays can be placed in the *aux* dictionary (see :py:class:`~order.mixins.AuxDataMixin`). The resulting structure looks like the following example: .. code-block:: python results = { # boolean selection mask for events "event": selected_events_mask, "steps": { # event selection decisions from certain steps "jet": array_of_event_masks, "muon": array_of_event_masks, ..., }, "objects": { # object selection decisions or indices "Jet": { "jet": array_of_jet_indices, "bjet": array_of_bjet_indices, }, "Muon": { "muon": array_of_muon_indices, }, }, # additionally, you can also save auxiliary data, e.g. "aux": { # save the per-object jet selection masks "jet": array_of_jet_object_masks, # save number of jets "n_passed_jets": ak.num(array_of_jet_indices, axis=1), ..., }, # other arbitrary top-level fields ... } Specific fields can be configured through *event*, *steps*, *objects* and *aux* keyword arguments. All additional keyword arguments are stored as top-level fields. The following example creates the structure above. .. code-block:: python # combined event selection after all steps event_sel = reduce(and_, results.steps.values()) res = SelectionResult( event=selected_event_mask, steps={ "jet": array_of_event_masks, "muon": array_of_event_masks, ... }, # nested mappings of source collections to target collections with different indices objects={ # collections to be created from the initial "Jet" collection: "jet" and "bjet" # define name of new field and provide indices of the corresponding objects "Jet": { "jet": array_of_jet_indices "bjet": list_of_bjet_indices, }, # collections to be created from the initial "Muon" collection: "muon" "Muon": { "muon": array_of_selected_muon_indices, }, }, # others ... ) res.to_ak() """ def __init__( self: SelectionResult, event: ak.Array | None = None, steps: DotDict | dict | None = None, objects: DotDict | dict | None = None, aux: DotDict | dict | None = None, **other, ) -> None: super().__init__(aux=aux) # store fields self.event = event self.steps = DotDict.wrap(steps or {}) self.objects = DotDict.wrap(objects or {}) self.other = DotDict.wrap(other) def __iadd__(self: SelectionResult, other: SelectionResult | None) -> SelectionResult: """ Adds the field of an *other* instance in-place. When *None*, *this* instance is returned unchanged. :param other: Instance of :py:class:`~.SelectionResult` to be added to current instance. :raises TypeError: If *other* is not a :py:class:`~.SelectionResult` instance. :return: This instance. """ # do nothing if the other instance is none if other is None: return self # type check if not isinstance(other, SelectionResult): raise TypeError(f"cannot add '{other}' to {self.__class__.__name__} instance") # helper to create a view without behavior def deepcopy_without_behavior(struct: T) -> T: return law.util.map_struct( (lambda obj: ( ak.Array(obj, behavior={}) if isinstance(obj, ak.Array) else copy.deepcopy(obj) )), struct, map_list=True, map_tuple=True, map_dict=True, ) # logical AND between event masks if self.event is None: self.event = deepcopy_without_behavior(other.event) elif other.event is not None: self.event = self.event & other.event # update steps in-place self.steps.update(deepcopy_without_behavior(other.steps)) # use deep merging for objects law.util.merge_dicts( self.objects, deepcopy_without_behavior(other.objects), inplace=True, deep=True, ) # update other fields in-place self.other.update(deepcopy_without_behavior(other.other)) # shallow update for aux self.aux.update(deepcopy_without_behavior(other.aux)) return self def __add__(self: SelectionResult, other: SelectionResult | None) -> SelectionResult: """ Returns a new instance with all fields of *this* and an *other* instance merged. When *None*, a copy of *this* instance is returned. :param other: Instance of :py:class:`~.SelectionResult` to be added to current instance. :raises TypeError: If *other* is not a :py:class:`~.SelectionResult` instance. :return: Copy of this instance after the "add" operation. """ inst = self.__class__() # add this instance inst += self # add the other instance if not none if other is not None: if not isinstance(other, SelectionResult): raise TypeError(f"cannot add '{other}' to {self.__class__.__name__} instance") inst += other return inst
[docs] def to_ak(self: SelectionResult) -> ak.Array: """ Converts the contained fields into a nested awkward array and returns it. The conversion is performed with multiple calls of :external+ak:py:func:`ak.zip`. :raises ValueError: If the main events mask contains a type other than bool. :raises KeyError: If the additional top-level fields in :py:attr:`other` have a field "event", "step" or "objects" that might overwrite existing special fields. :return: :py:class:`~.SelectionResult` transformed into an awkward array. """ # complain if the event mask consists of non-boolean values if self.event is not None and getattr(ak.type(self.event).content, "primitive", None) != "bool": raise ValueError( f"{self.__class__.__name__} event mask must be of type N * bool, " "but got {ak.type(self.event)}", ) # prepare objects to merge to_merge = {} if self.event is not None: to_merge["event"] = self.event if self.steps: to_merge["steps"] = ak.zip(self.steps) if self.objects: to_merge["objects"] = ak.zip({ src_name: ak.zip(dst_dict, depth_limit=1) # limit due to ragged axis 1 for src_name, dst_dict in self.objects.items() }) # add other fields but verify they do not overwrite existing fields for key in self.other: if key in to_merge: raise KeyError( f"additional top-level field '{key}' of {self.__class__.__name__} conflicts " f"with existing special field '{key}'", ) to_merge.update(self.other) return ak.zip(to_merge)