Selections#

Introduction#

In columnflow, selections are defined through the Selector class. This class allows for arbitrary selection criteria on event level as well as object level using masks. The results of the selection (which events or objects are to be conserved) are saved in an instance of the SelectionResult class. Similar to Producers, it is possible to create new columns in Selectors. In the original columnflow setup, Selectors are being run in the SelectEvents task.

Create an instance of the Selector class#

Similar to Producers, Selectors need to declare which columns are to be used (produced) by the Selector instance in order for them to be taken out of the parquet files (saved in the new parquet files). An example for this structure is given below (Similar to the Selector documentation.):


# import the Selector class and the selector method
from columnflow.selection import Selector, selector

# also import the SelectionResult class
from columnflow.selection import SelectionResult

# maybe import awkward in case this Selector is actually run, this needs to be set as columnflow
# would else give an error during setup, as these packages are not in the default sandbox
from columnflow.util import maybe_import
ak = maybe_import("awkward")


# now wrap any function with a selector
@selector(
    # define some additional information here, e.g.
    # what columns are needed for this Selector?
    uses={
        "Jet.pt", "Jet.eta",
    },
    # does this Selector produce any columns?
    produces=set(),

    # pass any other variable to the selector class
    some_auxiliary_variable=True,

    # ...
)
def jet_selection(self: Selector, events: ak.Array, **kwargs) -> tuple[ak.Array, SelectionResult]:
    # do something ...
    return events, SelectionResult()

The structure of the arguments for the returned SelectionResult instance are discussed below in SelectionResult.

Exposed and internal Selectors#

Selectors can be either available directly from the command line or only internally, through other selectors. To make a Selector available from the command line, it should be declared with the exposed=True argument. To call a fully functional Selector (in the following referred as Selector_int) from an other Selector (in the following referred to as Selector_ext), several steps are required:

  • If defined in an other file, Selector_int should be imported in the Selector_ext script,

  • The columns needed for Selector_int should be declared in the uses argument of Selector_ext (it is possible to simply write the name of the Selector_int in the uses set, the content of the uses set from Selector_int will be added to the uses set of Selector_ext, see below)

  • Selector_int must be run in Selector_ext, e.g. with the self[Selector_int](events, **kwargs) call.

An example of an exposed Selector_ext with the jet_selection Selector defined above as Selector_int, assuming the jet_selection exists in analysis/selection/jet.py is given below. It should be mentioned that a few details must be changed for this selector to work within the worklow, the full version can be found in the “Complete Example” section.


# import the Selector class and the selector method
from columnflow.selection import Selector, selector

# also import the SelectionResult class
from columnflow.selection import SelectionResult

# maybe import awkward in case this Selector is actually run, this needs to be set as columnflow
# would else give an error during setup, as these packages are not in the default sandbox
from columnflow.util import maybe_import
ak = maybe_import("awkward")

# import the used internal selector
from analysis.selection.jet import jet_selection


@selector(
    # some information for Selector
    # e.g., if we want to use some internal Selector, make
    # sure that you have all the relevant information
    uses={
        jet_selection,
    },
    produces={
        jet_selection,
    },

    # this is our top level Selector, so we need to make it reachable
    # for the SelectEvents task
    exposed=True,
)
def Selector_ext(self: Selector, events: ak.Array, **kwargs) -> tuple[ak.Array, SelectionResult]:
    results = SelectionResult()
    # do something here

    # e.g., call the internal Selector
    events, sub_result = self[jet_selection](events)
    results += sub_result

    return events, results

SelectionResult#

The result of a Selector is propagated through an instance of the SelectionResult class. The SelectionResult object is instantiated using a dictionary for each argument. There are four arguments that may be set, which contain:

  • Boolean masks to select the events to be kept in the analysis, which is saved under the steps argument. Several selection steps may be defined in a single Selector, each with a unique name being the key of the dictionary for the corresponding mask.

  • Several index masks for specific objects in a double dictionary structure, saved under the objects argument. The double dictionary structure in the objects defines the source column/field from which the indices are to be taken (first dimension of the dictionary) and the name of the new column/field to be created with only these objects (second dimension of the dictionary). If the name of the column/field to be created is the same as the name of an already existing column/field, the original column/field will be overwritten by the new one!

  • Additional informations to be used by other Selectors, saved under the aux argument.

  • A combined boolean mask of all steps used, which is saved under the event argument. An example with this argument will be shown in the section “Complete Example”. The final SelectionResult object to be returned by the exposed selector must have this field.

While the arguments in the aux dictionary are discarded after the ReduceEvents task and are only used for short-lived saving of internal information that might be needed by other Selectors, the steps and objects arguments are specifically used by the ReduceEvents task to apply the given masks to the nanoAOD files (potentially with additional columns). As described above, the steps argument is used to reduce the number of events to be processed further down the task tree according to the selections, while the objects argument is used to select which objects are to be kept for further processing and creates new columns/fields containing specific selected objects.

Below is an example of a fully written internal Selector with its SelectionResult object without event argument.


# import the Selector class and the selector method
from columnflow.selection import Selector, selector

# also import the SelectionResult class
from columnflow.selection import SelectionResult

# maybe import awkward in case this Selector is actually run, this needs to be set as columnflow
# would else give an error during setup, as these packages are not in the default sandbox
from columnflow.util import maybe_import
ak = maybe_import("awkward")


# now wrap any function with a selector
@selector(
    # define some additional information here, e.g.
    # what columns are needed for this Selector?
    uses={
        "Jet.pt", "Jet.eta",
    },
    # does this Selector produce any columns?
    produces=set(),

    # pass any other variable to the selector class
    some_auxiliary_variable=True,

    # ...
)
def jet_selection_with_result(self: Selector, events: ak.Array, **kwargs) -> tuple[ak.Array, SelectionResult]:
    # require an object of the Jet collection to have at least 20 GeV pt and at most 2.4 eta to be
    # considered a Jet in our analysis
    jet_mask = ((events.Jet.pt > 20.0) & (abs(events.Jet.eta) < 2.4))

    # require an object of the Jet collection to have at least 50 GeV pt and at most 2.4 eta
    jet_50pt_mask = ((events.Jet.pt > 50.0) & (abs(events.Jet.eta) < 2.4))

    # require an event to have at least two jets to be selected
    jet_sel = (ak.sum(jet_mask, axis=1) >= 2)

    # create the list of indices to be kept from the Jet collection using the jet_mask to create the
    # new Jet field containing only the selected Jet objects
    jet_indices = ak.local_index(events.Jet.pt)[jet_mask]

    # create the list of indices to be kept from the Jet collection using the jet_50pt_mask to create the
    # new Jet_50pt field containing only the selected Jet_50pt objects
    jet_50pt_indices = ak.local_index(events.Jet.pt)[jet_50pt_mask]

    return events, SelectionResult(
        steps={
            # boolean mask to create selection of the events with at least two jets, this will be
            # applied in the ReduceEvents task
            "jet": jet_sel,
        },
        objects={
            # in ReduceEvents, the Jet field will be replaced by the new Jet field containing only
            # selected jets, and a new field called Jet_50pt containing the jets with pt higher than
            # 50 GeV will be created
            "Jet": {
                "Jet": jet_indices,
                "Jet_50pt": jet_50pt_indices,
            },
        },
        aux={
            # jet mask that lead to the jet_indices
            "jet_mask": jet_mask,
        },
    )

Selection using several selection steps#

In order for the ReduceEvents task to apply the final event selection to all events, it is necessary to input the resulting boolean array in the event argument of the returned SelectionResult by the exposed Selector. When several selection steps do appear in the selection, it is necessary to combine all the masks from all the steps in order to obtain the final boolean array to be given to the event argument of the SelectionResult and for it to be applied to the events. This can be achieved in two steps:

  • Combining the results from the different selections to a single SelectionResult object:

results = SelectionResult()
results += jet_results
results += fatjet_results
  • Reducing the different steps to a single boolean array and give it to the event argument of the SelectionResult object.

# import the functions to combine the selection masks
from operator import and_
from functools import reduce

# combined event selection after all steps
event_sel = reduce(and_, results.steps.values())
results.event = event_sel

Selection stats#

In order to use the correct values for the weights to be applied to the Monte Carlo samples while plotting, it is necessary to save some information which would be lost after the ReduceEvents task. An example for that would be the sum of all the Monte Carlo weights in a simulation, which is needed for the normalization weights. In order to propagate this information to tasks further down the tree, the stats.json file is created. As it is a json file, it contains a dictionary with the key corresponding to the name of the information to be saved, while the value for the key is the information itself. The dictionary is created in the SelectEvents task and updated in place in a Selector. Depending on the weights to be used, various additional information might need to be saved in the stats object.

The keys "num_events", "num_events_selected", "sum_mc_weight", "sum_mc_weight_selected" get printed by the SelectEvents task along with the corresponding efficiency. If they are not set, the default value for floats will be printed instead.

Below is an example of such a Selector updating the stats dictionary in place. This dictionary will be saved in the stats.json file. For convenience, the weights were saved in a weight_map dictionary along with the mask before the sum of the weights was saved in the stats dictionary. In this example, the keys to be printed by the SelectEvents task and the sum of the Monte Carlo weights per process (needed for correct normalization of the number of Monte Carlo events in the plots) are saved.

from columnflow.selection import Selector, selector

# also import the SelectionResult class
from columnflow.selection import SelectionResult

# maybe import awkward in case this Selector is actually run, this needs to be set as columnflow
# would else give an error during setup, as these packages are not in the default sandbox
from columnflow.util import maybe_import
ak = maybe_import("awkward")
np = maybe_import("numpy")

from collections import defaultdict, OrderedDict


@selector(uses={"process_id", "mc_weight"})
def custom_increment_stats(
    self: Selector,
    events: ak.Array,
    results: SelectionResult,
    stats: dict,
    **kwargs,
) -> ak.Array:
    """
    Unexposed selector that does not actually select objects but instead increments selection
    *stats* in-place based on all input *events* and the final selection *mask*.
    """
    # get event masks
    event_mask = results.event

    # increment plain counts
    stats["num_events"] += len(events)
    stats["num_events_selected"] += ak.sum(event_mask, axis=0)

    # get a list of unique process ids present in the chunk
    unique_process_ids = np.unique(events.process_id)

    # create a map of entry names to (weight, mask) pairs that will be written to stats
    weight_map = OrderedDict()
    if self.dataset_inst.is_mc:
        # mc weight for all events
        weight_map["mc_weight"] = (events.mc_weight, Ellipsis)

        # mc weight for selected events
        weight_map["mc_weight_selected"] = (events.mc_weight, event_mask)

    # get and store the sum of weights in the stats dictionary
    for name, (weights, mask) in weight_map.items():
        joinable_mask = True if mask is Ellipsis else mask

        # sum of different weights in weight_map for all processes
        stats[f"sum_{name}"] += ak.sum(weights[mask])

        # sums per process id
        stats.setdefault(f"sum_{name}_per_process", defaultdict(float))
        for p in unique_process_ids:
            stats[f"sum_{name}_per_process"][int(p)] += ak.sum(
                weights[(events.process_id == p) & joinable_mask],
            )

    return events, results

Columnflow also provides a helper Selector called increment_stats, which calculates and inputs directly the number of events (the name of the feature should start with “num”) for given masks (Ellipsis should be given if no mask is to be used) or sum of specific columns (usually weights, the name of the feature should start by “sum”) for given columns and masks, using a “weight map”. These calculations can also be specified for subgroups of objects using a “group map”. An example of such a call using the number of jets and the processes as subgroups is given below, with results.event the event selection mask after all selections and having saved during the selection the number of valid jets in each event in the auxiliary field of the SelectionResult object under the name n_jets. This example stems from the cms_minimal example of the analysis templates present in the columnflow repository, more specifically in the example.py script under the selection directory. In this script, the increment_stats method from columnflow was imported (using from columnflow.selection.stats import increment_stats) and its uses columns were declared in the uses set of the Selector.


    # increment stats
    weight_map = {
        "num_events": Ellipsis,
        "num_events_selected": results.event,
    }
    group_map = {}
    if self.dataset_inst.is_mc:
        weight_map = {
            **weight_map,
            # mc weight for all events
            "sum_mc_weight": (events.mc_weight, Ellipsis),
            "sum_mc_weight_selected": (events.mc_weight, results.event),
        }
        group_map = {
            # per process
            "process": {
                "values": events.process_id,
                "mask_fn": (lambda v: events.process_id == v),
            },
            # per jet multiplicity
            "njet": {
                "values": results.x.n_jets,
                "mask_fn": (lambda v: results.x.n_jets == v),
            },
        }
    events, results = self[increment_stats](
        events,
        results,
        stats,
        weight_map=weight_map,
        group_map=group_map,
        **kwargs,
    )

Complete example#

Overall, creating an exposed Selector with several selections steps might look like this:

# coding: utf-8

# import the Selector class and the selector method
from columnflow.selection import Selector, selector

# also import the SelectionResult class
from columnflow.selection import SelectionResult
from columnflow.production.cms.mc_weight import mc_weight
from columnflow.production.processes import process_ids

# import the functions to combine the selection masks
from operator import and_
from functools import reduce

# maybe import awkward in case this Selector is actually run, this needs to be set as columnflow
# would else give an error during setup, as these packages are not in the default sandbox
from columnflow.util import maybe_import
ak = maybe_import("awkward")
np = maybe_import("numpy")

from collections import defaultdict, OrderedDict


# First, define an internal jet Selector to be used by the exposed Selector

@selector(
    # define some additional information here, e.g.
    # what columns are needed for this Selector?
    uses={
        "Jet.pt", "Jet.eta",
    },
    # does this Selector produce any columns?
    produces=set(),

    # pass any other variable to the selector class
    some_auxiliary_variable=True,
)
def jet_selection_with_result(self: Selector, events: ak.Array, **kwargs) -> tuple[ak.Array, SelectionResult]:
    # require an object of the Jet collection to have at least 20 GeV pt and at most 2.4 eta to be
    # considered a Jet in our analysis
    jet_mask = ((events.Jet.pt > 20.0) & (abs(events.Jet.eta) < 2.4))

    # require an object of the Jet collection to have at least 50 GeV pt and at most 2.4 eta
    jet_50pt_mask = ((events.Jet.pt > 50.0) & (abs(events.Jet.eta) < 2.4))

    # require an event to have at least two jets to be selected
    jet_sel = (ak.sum(jet_mask, axis=1) >= 2)

    # create the list of indices to be kept from the Jet collection using the jet_mask to create the
    # new Jet field containing only the selected Jet objects
    jet_indices = ak.local_index(events.Jet.pt)[jet_mask]

    # create the list of indices to be kept from the Jet collection using the jet_50pt_mask to create the
    # new Jet_50pt field containing only the selected Jet_50pt objects
    jet_50pt_indices = ak.local_index(events.Jet.pt)[jet_50pt_mask]

    return events, SelectionResult(
        steps={
            # boolean mask to create selection of the events with at least two jets, this will be
            # applied in the ReduceEvents task
            "jet": jet_sel,
        },
        objects={
            # in ReduceEvents, the Jet field will be replaced by the new Jet field containing only
            # selected jets, and a new field called Jet_50pt containing the jets with pt higher than
            # 50 GeV will be created
            "Jet": {
                "Jet": jet_indices,
                "Jet_50pt": jet_50pt_indices,
            },
        },
        aux={
            # jet mask that lead to the jet_indices
            "jet_mask": jet_mask,
        },
    )


# Next, define an internal fatjet Selector to be used by the exposed Selector

@selector(
    # define some additional information here, e.g.
    # what columns are needed for this Selector?
    uses={
        "FatJet.pt",
    },
    # does this Selector produce any columns?
    produces=set(),

    # ...
)
def fatjet_selection_with_result(self: Selector, events: ak.Array, **kwargs) -> tuple[ak.Array, SelectionResult]:
    # require an object of the FatJet collection to have at least 40 GeV pt to be
    # considered a FatJet in our analysis
    fatjet_mask = (events.FatJet.pt > 40.0)

    # require an event to have at least one AK8-jet (=FatJet) to be selected
    fatjet_sel = (ak.sum(fatjet_mask, axis=1) >= 1)

    # create the list of indices to be kept from the FatJet collection using the fatjet_mask to create the
    # new FatJet field containing only the selected FatJet objects
    fatjet_indices = ak.local_index(events.FatJet.pt)[fatjet_mask]

    return events, SelectionResult(
        steps={
            # boolean mask to create selection of the events with at least two jets, this will be
            # applied in the ReduceEvents task
            "fatjet": fatjet_sel,
        },
        objects={
            # in ReduceEvents, the FatJet field will be replaced by the new FatJet field containing only
            # selected fatjets
            "FatJet": {
                "FatJet": fatjet_indices,
            },
        },
    )


# Implement the task to update the stats object

@selector(uses={"process_id", "mc_weight"})
def custom_increment_stats(
    self: Selector,
    events: ak.Array,
    results: SelectionResult,
    stats: dict,
    **kwargs,
) -> ak.Array:
    """
    Unexposed selector that does not actually select objects but instead increments selection
    *stats* in-place based on all input *events* and the final selection *mask*.
    """
    # get event masks
    event_mask = results.event

    # increment plain counts
    stats["num_events"] += len(events)
    stats["num_events_selected"] += ak.sum(event_mask, axis=0)

    # get a list of unique process ids present in the chunk
    unique_process_ids = np.unique(events.process_id)

    # create a map of entry names to (weight, mask) pairs that will be written to stats
    weight_map = OrderedDict()
    if self.dataset_inst.is_mc:
        # mc weight for all events
        weight_map["mc_weight"] = (events.mc_weight, Ellipsis)

        # mc weight for selected events
        weight_map["mc_weight_selected"] = (events.mc_weight, event_mask)

    # get and store the sum of weights in the stats dictionary
    for name, (weights, mask) in weight_map.items():
        joinable_mask = True if mask is Ellipsis else mask

        # sum of different weights in weight_map for all processes
        stats[f"sum_{name}"] += ak.sum(weights[mask])

        # sums per process id
        stats.setdefault(f"sum_{name}_per_process", defaultdict(float))
        for p in unique_process_ids:
            stats[f"sum_{name}_per_process"][int(p)] += ak.sum(
                weights[(events.process_id == p) & joinable_mask],
            )

    return events, results


# Now create the exposed Selector using the three above defined Selectors

@selector(
    # some information for Selector
    # e.g., if we want to use some internal Selector, make
    # sure that you have all the relevant information
    uses={
        mc_weight, jet_selection_with_result, fatjet_selection_with_result, custom_increment_stats,
        process_ids,
    },
    produces={
        mc_weight, process_ids,
    },

    # this is our top level Selector, so we need to make it reachable
    # for the SelectEvents task
    exposed=True,
)
def Selector_ext(
    self: Selector,
    events: ak.Array,
    stats: defaultdict,
    **kwargs,
) -> tuple[ak.Array, SelectionResult]:

    results = SelectionResult()

    # add corrected mc weights to be used later for plotting and to calculate the sum saved in stats
    if self.dataset_inst.is_mc:
        events = self[mc_weight](events, **kwargs)

    # call the first internal selector, the jet selector, and save its result
    events, jet_results = self[jet_selection_with_result](events, **kwargs)
    results += jet_results

    # call the second internal selector, the fatjet selector, and save its result
    events, fatjet_results = self[fatjet_selection_with_result](events, **kwargs)
    results += fatjet_results

    # combined event selection after all steps
    event_sel = reduce(and_, results.steps.values())
    results.event = event_sel

    # create process ids, used by custom_increment_stats
    events = self[process_ids](events, **kwargs)

    # use increment stats selector to update dictionary to be saved in json format
    events, results = self[custom_increment_stats](events, results, stats, **kwargs)

    return events, results

Notes:

  • If you want to use an exposed Selector in a task call, and if this new Selector is created in a new file, you need to include this file in the law.cfg file under the selection_modules argument. A more detailed explanation of the law config file can be found in the Law config section.

  • The actual creation of the weights to be applied in the histogramms after the selection might be done in the ProduceColumns task, using the stats object created in this task if needed.

  • Other useful functions (e.g. for easier handling of columns) can be found in the Best practices section of this documentation.

Running the SelectEvents task#

The SelectEvents task runs a specific selection script and saves the created masks for event and objects selections in a parquet file, as well as the statistics of the selection in a json file.

While it is possible to see all the arguments and their explanation for this task using law run cf.SelectEvents --help, the only argument created specifically for this task is the --selector argument, through which the exposed Selector to be used can be chosen.

The masks created by this task are then used by the ReduceEvents task to reduce the number of events (see the steps argument for the SelectionResult) and create/update new columns/fields with only the selected objects (see the objects argument for the SelectionResult). The saved statistics are used e.g. for the weights needed for plotting.

It should not be forgotten that any column created in this task should be included in the keep_columns argument of the config file, as only columns explicitely required to be kept in this dictionary will be loaded and then saved in the reduced parquet file.

An example of how to run this task for an analysis with several datasets and configs is given below:

law run cf.SelectEvents --version name_of_your_version \
                        --config name_of_your_config \
                        --selector name_of_the_selector \
                        --dataset name_of_the_dataset_to_be_run

It is to be mentioned that this task is run after the CalibrateEvents task and therefore uses the default argument for the --calibrators if not specified otherwise.

Notes:

  • Running the exposed Selector from the “Complete Example” section would simply require you to give the name Selector_ext in the --selector argument.