# coding: utf-8
"""
Selector helpers for book keeping of selection and event weight statistics for
aggregation over datasets.
"""
from __future__ import annotations
import itertools
from functools import reduce
from collections import defaultdict
from operator import and_, getitem as getitem_
import law
from columnflow.selection import Selector, SelectionResult, selector
from columnflow.util import maybe_import, DotDict
from columnflow.types import Sequence, Callable, Any
np = maybe_import("numpy")
ak = maybe_import("awkward")
[docs]
@selector(
call_force=True,
)
def increment_stats(
self: Selector,
events: ak.Array,
results: SelectionResult,
stats: dict,
weight_map: dict[str, ak.Array | tuple[ak.Array, ak.Array]] | None = None,
group_map: dict[str, dict[str, ak.Array | Callable]] | None = None,
group_combinations: Sequence[tuple[str]] | None = None,
skip_func: Callable[[str, list[str]], bool] | None = None,
**kwargs,
) -> tuple[ak.Array, SelectionResult]:
"""
Unexposed selector that does not actually select objects but that instead increments selection
metrics in a given dictionary *stats* given a chunk of *events* and the corresponding selection
*results*.
A weight_map* can be defined to configure the actual fields to be added. The key of each entry
should either start with ``"num``, to state that it will refer to a plain number of events, or
``"sum"``, to state that the field describes the sum of a specific column (usualky weights).
Different types of values are accepted, depending on the type of "operation":
- ``"num"``: An event mask, or an *Ellipsis* to select all events.
- ``"sum"``: Either a column to sum over, or a 2-tuple containing the column to sum, and
an event mask to only sum over certain events.
Example:
.. code-block:: python
# weight map definition
weight_map = {
# "num" operations
"num_events": Ellipsis, # all events
"num_events_selected": results.event, # selected events only
# "sum" operations
"sum_mc_weight": events.mc_weight, # weights of all events
"sum_mc_weight_selected": (events.mc_weight, results.event), # weights of selected events
}
# usage within an exposed selector
# (where results are generated, and events and stats were passed by SelectEvents)
self[increment_stats_per_process](events, results, stats, weight_map=weight_map, **kwargs)
Each sum of weights can also be extracted for each unique element in a so-called group, such as
per process id, or per jet multiplicity bin. For this purpose, a *group_map* can be defined,
mapping the name of a group (e.g. ``"process"`` or ``"njet"``) to a dictionary with the fields
- ``"values"``, unique values to loop over,
- ``"mask_fn"``, a function that is supposed to return a mask given a single value, and
- ``"combinations_only"`` (optional), a boolean flag (*False* by default) that decides
whether this group is not to be evaluated on its own, but only as part of a combination
with other groups (see below).
Example:
.. code-block:: python
group_map = {
"process": {
"values": events.process_id,
"mask_fn": (lambda v: events.process_id == v),
},
"njet": {
"values": results.x.n_jets,
"mask_fn": (lambda v: results.x.n_jets == v),
},
}
Based on the *weight_map* in the example above, this will result in eight additional fields in
*stats*, e.g, ``"sum_mc_weight_per_process"``, ``"sum_mc_weight_selected_per_process"``,
``"sum_mc_weight_per_njet"``, ``"sum_mc_weight_selected_per_njet"``, etc. (same of "num"). Each
of these new fields will refer to a dictionary with keys corresponding to the unique values
defined in the *group_map* above.
In addition, combinations of groups can be configured using *group_combinations*. It accepts a
sequence of tuples whose elements should be names of groups in *group_names*. As the name
suggests, combinations of all possible values between groups are evaluated and stored in a
nested dictionary.
Example:
.. code-block:: python
group_combinations = [("process", "njet")]
In this case, *stats* will obtain additional fields, such as
``"sum_mc_weight_per_process_and_njet"`` and ``"sum_mc_weight_selected_per_process_and_njet"``,
referring to nested dictionaries whose structure depends on the exact order of group names per
tuple. To reduce the number of entries in the stats but still make use of this combinatorics
feature, a *skip_func* can be defined that receives the weight name and the names of the groups
of an entry. If the function returns *True*, the entry will be skipped.
"""
# defaults
if weight_map is None:
weight_map = {}
if group_map is None:
group_map = {}
if group_combinations is None:
group_combinations = []
if skip_func is None:
skip_func = lambda weight_name, group_names: False
# make values in group map unique
unique_group_values = {
group_name: np.unique(ak.flatten(group_data["values"], axis=None))
for group_name, group_data in group_map.items()
}
# treat groups as combinations of a single group
for group_name, group_data in list(group_map.items())[::-1]:
if group_data.get("combinations_only", False) or (group_name,) in group_combinations:
continue
group_combinations.insert(0, (group_name,))
# get and store the weights per entry in the map
for weight_name, obj in weight_map.items():
# check whether the weight is either a "num" or "sum" field
if weight_name.startswith("num"):
op = self.NUM
elif weight_name.startswith("sum"):
op = self.SUM
else:
raise Exception(
f"weight '{weight_name}' starting with unknown operation; should either start with "
"'num' or 'sum'",
)
# interpret obj based on the aoperation to be applied
weights = None
weight_mask = Ellipsis
if isinstance(obj, (tuple, list)):
if op == self.NUM:
raise Exception(
f"weight map entry '{weight_name}' should refer to a mask, "
f"but found a sequence: {obj}",
)
if len(obj) == 1:
weights = ak.values_astype(obj[0], np.float64)
elif len(obj) == 2:
weights, weight_mask = ak.values_astype(obj[0], np.float64), obj[1]
else:
raise Exception(f"cannot interpret as weights and optional mask: '{obj}'")
elif op == self.NUM:
weight_mask = obj
else: # SUM
weights = ak.values_astype(obj, np.float64)
# when mask is an Ellipsis, it cannot be AND joined to other masks, so convert to true mask
if weight_mask is Ellipsis:
weight_mask = np.ones(len(events), dtype=bool)
# apply the operation
if op == self.NUM:
stats[f"{weight_name}"] += int(ak.sum(weight_mask))
else: # SUM
stats[f"{weight_name}"] += float(ak.sum(weights[weight_mask]))
# per group combination
for group_names in group_combinations:
# optionally skip
if skip_func(weight_name, group_names):
continue
group_key = f"{weight_name}_per_" + "_and_".join(group_names)
# set the default structures
if group_key not in stats:
dtype = int if op == self.NUM else float
stats[group_key] = self.defaultdicts[dtype][len(group_names)]()
# set values
for values in itertools.product(*(unique_group_values[g] for g in group_names)):
# evaluate and join the masks
group_mask = reduce(
and_,
(group_map[g]["mask_fn"](v) for g, v in zip(group_names, values)),
)
# find the innermost dict to perform the in-place item assignment, then increment
str_values = list(map(str, values))
innermost_dict = reduce(getitem_, [stats[group_key]] + str_values[:-1])
if op == self.NUM:
innermost_dict[str_values[-1]] += int(ak.sum(weight_mask & group_mask))
else: # SUM
innermost_dict[str_values[-1]] += float(ak.sum(weights[weight_mask & group_mask]))
return events, results
@increment_stats.setup
def increment_stats_setup(
self: Selector,
task: law.Task,
reqs: dict[str, DotDict[str, Any]],
inputs: dict[str, Any],
reader_targets: law.util.InsertableDict,
**kwargs,
) -> None:
super(increment_stats, self).setup_func(
task=task,
reqs=reqs,
inputs=inputs,
reader_targets=reader_targets,
**kwargs,
)
# flags to descibe "number" and "sum" fields
self.NUM, self.SUM = range(2)
# store nested defaultdict's with a certain maximum nesting depth
self.defaultdicts = {
float: {1: (lambda: defaultdict(float))},
int: {1: (lambda: defaultdict(int))},
}
for i in range(2, 10 + 1):
# use a self-executing closure to avoid reliance inside the lambda on i in the loop body
self.defaultdicts[float][i] = (lambda i: (lambda: defaultdict(self.defaultdicts[float][i - 1])))(i)
self.defaultdicts[int][i] = (lambda i: (lambda: defaultdict(self.defaultdicts[int][i - 1])))(i)
[docs]
@selector(
uses={increment_stats},
produces={increment_stats},
call_force=True,
)
def increment_event_stats(
self: Selector,
events: ak.Array,
results: SelectionResult,
stats: dict,
**kwargs,
) -> tuple[ak.Array, SelectionResult]:
"""
Simplified version of :py:class:`increment_stats` that only increments the number of events and
the number of selected events.
"""
weight_map = {
"num_events": Ellipsis,
"num_events_selected": results.event,
}
return self[increment_stats](events, results, stats, weight_map=weight_map, **kwargs)