# coding: utf-8
"""
Object and event selection tools.
"""
from __future__ import annotations
import copy
import inspect
import law
import order as od
from columnflow.calibration import TaskArrayFunctionWithCalibratorRequirements
from columnflow.util import maybe_import, DotDict, DerivableMeta, UNSET
from columnflow.types import Callable, T, Sequence, UNSET_TYPE
ak = maybe_import("awkward")
[docs]
class Selector(TaskArrayFunctionWithCalibratorRequirements):
"""
Base class for all selectors.
"""
exposed = False
# register attributes for arguments accepted by decorator
mc_only: bool = False
data_only: bool = False
def __init__(self: Selector, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
# when not exposed and call_force is not specified,
# set it to True which prevents calls from being cached
if self.call_force is None and not self.exposed:
self.call_force = True
[docs]
@classmethod
def selector(
cls,
func: Callable | None = None,
bases=(),
mc_only: bool | UNSET_TYPE = UNSET,
data_only: bool | UNSET_TYPE = UNSET,
require_calibrators: Sequence[str] | set[str] | None | UNSET_TYPE = UNSET,
**kwargs,
) -> DerivableMeta | Callable:
"""
Decorator for creating a new :py:class:`~.Selector` subclass with additional, optional *bases* and attaching the
decorated function to it as ``call_func``.
When *mc_only* (*data_only*) is *True*, the selector is skipped and not considered by other calibrators,
selectors and producers in case they are evaluated on a :py:class:`order.Dataset` (using the
:py:attr:`dataset_inst` attribute) whose ``is_mc`` (``is_data``) attribute is *False*.
All additional *kwargs* are added as class members of the new subclasses.
:param func: Function to be wrapped and integrated into new :py:class:`Selector` class.
:param bases: Additional bases for the new :py:class:`Selector`.
:param mc_only: Boolean flag indicating that this :py:class:`Selector` should only run on Monte Carlo simulation
and skipped for real data.
:param data_only: Boolean flag indicating that this :py:class:`Selector` should only run on real data and
skipped for Monte Carlo simulation.
:param require_calibrators: Sequence of names of calibrators to add to the requirements.
:return: New :py:class:`Selector` subclass.
"""
def decorator(func: Callable) -> DerivableMeta:
# create the class dict
cls_dict = {**kwargs, "call_func": func}
if mc_only is not UNSET:
cls_dict["mc_only"] = mc_only
if data_only is not UNSET:
cls_dict["data_only"] = data_only
if require_calibrators is not UNSET:
cls_dict["require_calibrators"] = require_calibrators
# get the module name
frame = inspect.stack()[1]
module = inspect.getmodule(frame[0])
# get the selector name
cls_name = cls_dict.pop("cls_name", func.__name__)
# hook to update the class dict during class derivation
def update_cls_dict(cls_name, cls_dict, get_attr):
mc_only = get_attr("mc_only")
data_only = get_attr("data_only")
# optionally add skip function
if mc_only and data_only:
raise Exception(f"selector {cls_name} received both mc_only and data_only")
if (mc_only or data_only) and cls_dict.get("skip_func"):
raise Exception(
f"selector {cls_name} received custom skip_func, but either mc_only or data_only are set",
)
if "skip_func" not in cls_dict:
def skip_func(self, **kwargs) -> bool:
# check mc_only and data_only
if mc_only and not self.dataset_inst.is_mc:
return True
if data_only and not self.dataset_inst.is_data:
return True
# in all other cases, do not skip
return False
cls_dict["skip_func"] = skip_func
return cls_dict
cls_dict["update_cls_dict"] = update_cls_dict
# create the subclass
subclass = cls.derive(cls_name, bases=bases, cls_dict=cls_dict, module=module)
return subclass
return decorator(func) if func else decorator
# shorthand
selector = Selector.selector
[docs]
class SelectionResult(od.AuxDataMixin):
"""
Lightweight class that wraps selection decisions (e.g. event and object selection steps).
Additionally, this class provides convenience methods to merge them or to dump them into an
awkward array. Arbitrary, auxiliary information (additional arrays, or other objects) that
should not be stored in dumped akward arrays can be placed in the *aux* dictionary (see
:py:class:`~order.mixins.AuxDataMixin`).
The resulting structure looks like the following example:
.. code-block:: python
results = {
# boolean selection mask for events
"event": selected_events_mask,
"steps": {
# event selection decisions from certain steps
"jet": array_of_event_masks,
"muon": array_of_event_masks,
...,
},
"objects": {
# object selection decisions or indices
"Jet": {
"jet": array_of_jet_indices,
"bjet": array_of_bjet_indices,
},
"Muon": {
"muon": array_of_muon_indices,
},
},
# additionally, you can also save auxiliary data, e.g.
"aux": {
# save the per-object jet selection masks
"jet": array_of_jet_object_masks,
# save number of jets
"n_passed_jets": ak.num(array_of_jet_indices, axis=1),
...,
},
# other arbitrary top-level fields
...
}
Specific fields can be configured through *event*, *steps*, *objects* and *aux* keyword
arguments. All additional keyword arguments are stored as top-level fields.
The following example creates the structure above.
.. code-block:: python
# combined event selection after all steps
event_sel = reduce(and_, results.steps.values())
res = SelectionResult(
event=selected_event_mask,
steps={
"jet": array_of_event_masks,
"muon": array_of_event_masks,
...
},
# nested mappings of source collections to target collections with different indices
objects={
# collections to be created from the initial "Jet" collection: "jet" and "bjet"
# define name of new field and provide indices of the corresponding objects
"Jet": {
"jet": array_of_jet_indices
"bjet": list_of_bjet_indices,
},
# collections to be created from the initial "Muon" collection: "muon"
"Muon": {
"muon": array_of_selected_muon_indices,
},
},
# others
...
)
res.to_ak()
"""
def __init__(
self: SelectionResult,
event: ak.Array | None = None,
steps: DotDict | dict | None = None,
objects: DotDict | dict | None = None,
aux: DotDict | dict | None = None,
**other,
) -> None:
super().__init__(aux=aux)
# store fields
self.event = event
self.steps = DotDict.wrap(steps or {})
self.objects = DotDict.wrap(objects or {})
self.other = DotDict.wrap(other)
def __iadd__(self: SelectionResult, other: SelectionResult | None) -> SelectionResult:
"""
Adds the field of an *other* instance in-place.
When *None*, *this* instance is returned unchanged.
:param other: Instance of :py:class:`~.SelectionResult` to be added to current instance.
:raises TypeError: If *other* is not a :py:class:`~.SelectionResult` instance.
:return: This instance.
"""
# do nothing if the other instance is none
if other is None:
return self
# type check
if not isinstance(other, SelectionResult):
raise TypeError(f"cannot add '{other}' to {self.__class__.__name__} instance")
# helper to create a view without behavior
def deepcopy_without_behavior(struct: T) -> T:
return law.util.map_struct(
(lambda obj: (
ak.Array(obj, behavior={})
if isinstance(obj, ak.Array)
else copy.deepcopy(obj)
)),
struct,
map_list=True,
map_tuple=True,
map_dict=True,
)
# logical AND between event masks
if self.event is None:
self.event = deepcopy_without_behavior(other.event)
elif other.event is not None:
self.event = self.event & other.event
# update steps in-place
self.steps.update(deepcopy_without_behavior(other.steps))
# use deep merging for objects
law.util.merge_dicts(
self.objects,
deepcopy_without_behavior(other.objects),
inplace=True,
deep=True,
)
# update other fields in-place
self.other.update(deepcopy_without_behavior(other.other))
# shallow update for aux
self.aux.update(deepcopy_without_behavior(other.aux))
return self
def __add__(self: SelectionResult, other: SelectionResult | None) -> SelectionResult:
"""
Returns a new instance with all fields of *this* and an *other*
instance merged.
When *None*, a copy of *this* instance is returned.
:param other: Instance of :py:class:`~.SelectionResult` to be added to current instance.
:raises TypeError: If *other* is not a :py:class:`~.SelectionResult` instance.
:return: Copy of this instance after the "add" operation.
"""
inst = self.__class__()
# add this instance
inst += self
# add the other instance if not none
if other is not None:
if not isinstance(other, SelectionResult):
raise TypeError(f"cannot add '{other}' to {self.__class__.__name__} instance")
inst += other
return inst
[docs]
def to_ak(self: SelectionResult) -> ak.Array:
"""
Converts the contained fields into a nested awkward array and returns it.
The conversion is performed with multiple calls of :external+ak:py:func:`ak.zip`.
:raises ValueError: If the main events mask contains a type other than bool.
:raises KeyError: If the additional top-level fields in :py:attr:`other` have a field
"event", "step" or "objects" that might overwrite existing special fields.
:return: :py:class:`~.SelectionResult` transformed into an awkward array.
"""
# complain if the event mask consists of non-boolean values
if self.event is not None and getattr(ak.type(self.event).content, "primitive", None) != "bool":
raise ValueError(
f"{self.__class__.__name__} event mask must be of type N * bool, "
"but got {ak.type(self.event)}",
)
# prepare objects to merge
to_merge = {}
if self.event is not None:
to_merge["event"] = self.event
if self.steps:
to_merge["steps"] = ak.zip(self.steps)
if self.objects:
to_merge["objects"] = ak.zip({
src_name: ak.zip(dst_dict, depth_limit=1) # limit due to ragged axis 1
for src_name, dst_dict in self.objects.items()
})
# add other fields but verify they do not overwrite existing fields
for key in self.other:
if key in to_merge:
raise KeyError(
f"additional top-level field '{key}' of {self.__class__.__name__} conflicts "
f"with existing special field '{key}'",
)
to_merge.update(self.other)
return ak.zip(to_merge)