columnflow.columnar_util

Contents

columnflow.columnar_util#

Helpers and utilities for working with columnar libraries.

Data:

mandatory_coffea_columns

Columns that are always required when opening a nano file with coffea.

EMPTY_INT

Empty-value definition in places where an integer number is expected but not present.

EMPTY_FLOAT

Empty-value definition in places where a float number is expected but not present.

eval_item

ItemEval singleton mimicking a function.

Classes:

ColumnCollection(value)

Enumeration containing flags that describe arbitrary collections of columns.

Route([route, tags])

Route objects describe the location of columns in nested arrays and are basically wrappers around a sequence of nested fields.

RouteFilter(keep_routes)

Shallow helper class that handles removal of routes in an awkward array that do not match those in keep_routes.

ArrayFunction([call_func, init_func, ...])

Base class for function wrappers that act on arrays and keep track of used as well as produced columns, as close as possible to the actual implementation.

TaskArrayFunction(*args[, requires_func, ...])

Subclass of ArrayFunction providing an interface to certain task features such as declaring dependent or produced shifts, task requirements, and defining a custom setup function.

ChunkedIOHandler(source[, source_type, ...])

Allows reading one or multiple files and iterating through chunks of their content with multi-threaded read operations.

Functions:

get_ak_routes(ak_array[, max_depth])

Extracts all routes pointing to columns of a potentially deeply nested awkward array ak_array and returns them in a list of Route instances.

has_ak_column(ak_array, route)

Returns whether an awkward array ak_array contains a nested field identified by a route.

set_ak_column(ak_array, route, value[, ...])

Inserts a new column into awkward array ak_array and returns a new view with the column added or overwritten.

remove_ak_column(ak_array, route[, ...])

Removes a route from an awkward array ak_array and returns a new view with the corresponding column removed.

add_ak_alias(ak_array, src_route, dst_route)

Adds an alias to an awkward array ak_array pointing the array at src_route to dst_route and returns a new view with the alias applied.

add_ak_aliases(ak_array, aliases, **kwargs)

Adds multiple aliases, given in a dictionary mapping destination columns to source columns, to an awkward array ak_array and returns a new view with the aliases applied.

update_ak_array(ak_array, *others[, ...])

Updates an awkward array ak_array with the content of multiple different arrays others and potentially (see below) returns a new view.

flatten_ak_array(ak_array[, routes, nano_format])

Flattens a nested awkward array ak_array into a dictionary that maps joined column names to single awkward arrays.

sort_ak_fields(ak_array[, sort_fn])

Recursively sorts all fields of an awkward array ak_array and returns a new view.

sorted_ak_to_parquet(ak_array, *args, **kwargs)

Sorts the fields in an awkward array ak_array recursively with sort_ak_fields() and saves it as a parquet file using awkward.to_parquet which receives all additional args and kwargs.

sorted_ak_to_root(ak_array, path[, tree_name])

Sorts the fields in an awkward array ak_array recursively with sort_ak_fields() and saves it as a root tree named tree_name to a file at path using uproot.

attach_behavior(ak_array, type_name[, ...])

Attaches behavior of type type_name to an awkward array ak_array and returns it.

layout_ak_array(data_array, layout_array)

Takes a data_array and structures its contents into the same structure as layout_array, with up to one level of nesting.

flat_np_view(ak_array[, axis])

Takes an ak_array and returns a fully flattened numpy view.

deferred_column([call_func])

Subclass factory to be used as a decorator wrapping a call_func that will be used as the new __call__ method.

optional_column(*routes)

Takes one or several objects routes whose type can be anything that is accepted by the Route constructor, and returns a single or a set of route objects being tagged "optional".

mandatory_coffea_columns = {'event', 'luminosityBlock', 'run'}#

Columns that are always required when opening a nano file with coffea.

class ColumnCollection(value)[source]#

Bases: Flag

Enumeration containing flags that describe arbitrary collections of columns.

Attributes:

MANDATORY_COFFEA

ALL_FROM_CALIBRATOR

ALL_FROM_CALIBRATORS

ALL_FROM_SELECTOR

ALL_FROM_PRODUCER

ALL_FROM_PRODUCERS

ALL_FROM_ML_EVALUATION

MANDATORY_COFFEA = 1#
ALL_FROM_CALIBRATOR = 2#
ALL_FROM_CALIBRATORS = 4#
ALL_FROM_SELECTOR = 8#
ALL_FROM_PRODUCER = 16#
ALL_FROM_PRODUCERS = 32#
ALL_FROM_ML_EVALUATION = 64#
EMPTY_INT = -99999#

Empty-value definition in places where an integer number is expected but not present.

EMPTY_FLOAT = -99999.0#

Empty-value definition in places where a float number is expected but not present.

class Route(route=None, tags=None)[source]#

Bases: TagMixin

Route objects describe the location of columns in nested arrays and are basically wrappers around a sequence of nested fields. Additionally, they provide convenience methods for conversion into column names, either in dot or nano-style underscore format.

The constructor takes another route instance, a sequence of strings, or a string in dot format to initialize the fields. Most operators are overwritten to work with routes in a tuple-like fashion. Examples:

route = Route("Jet.pt")
# same as Route(("Jet", "pt"))

len(route)
# -> 2

route.fields
# -> ("Jet", "pt")

route.column
# -> "Jet.pt"

route.nano_column
# -> "Jet_pt"

route[-1]
# -> "pt"

route += "jec_up"
route.fields
# -> ("Jet", "pt", "jec_up")

route[1:]
# -> "pt.jec_up"
fields#

type: tuple read-only

The fields of this route.

column#

type: string read-only

The name of the corresponding column in dot format.

nano_column#

type: string read-only

The name of the corresponding column in nano-style underscore format.

string_column#

type: string read-only

The name of the corresponding column in dot format, but only consisting of string fields, i.e., without slicing or indexing fields.

string_nano_column#

type: string read-only

The name of the corresponding column in nano-style underscore format, but only consisting of string fields, i.e., without slicing or indexing fields.

Attributes:

DOT_SEP

NANO_SEP

fields

column

nano_column

string_column

string_nano_column

Methods:

slice_to_str(s)

rtype:

str

join(fields)

Joins a sequence of strings into a string in dot format and returns it.

join_nano(fields)

Joins a sequence of strings into a string in nano-style underscore format and returns it.

split(column)

Splits a string assumed to be in dot format and returns the fragments, potentially with selection, slice and advanced indexing expressions.

split_nano(column)

Splits a string assumed to be in nano-style underscore format and returns the fragments, potentially with selection, slice and advanced indexing expressions.

add(other)

Adds an other route instance, or the fields extracted from either a sequence of strings or a string in dot format to the fields if this instance.

pop([index])

Removes a field at index and returns it.

reverse()

Reverses the fields of this route in-place.

copy()

Returns a copy if this instance.

apply(ak_array[, null_value])

Returns a selection of ak_array using the fields in this route.

DOT_SEP = '.'#
NANO_SEP = '_'#
classmethod slice_to_str(s)[source]#
Return type:

str

classmethod join(fields)[source]#

Joins a sequence of strings into a string in dot format and returns it.

Return type:

str

classmethod join_nano(fields)[source]#

Joins a sequence of strings into a string in nano-style underscore format and returns it.

Return type:

str

classmethod split(column)[source]#

Splits a string assumed to be in dot format and returns the fragments, potentially with selection, slice and advanced indexing expressions.

Parameters:

column (str) – Name of the column to be split

Raises:

ValueError – If column is malformed, specifically if brackets are not encountered in pairs (i.e. opening backet w/o closing and vice versa).

Return type:

tuple[str | int | slice | type(Ellipsis) | list | tuple]

Returns:

tuple of subcomponents extracted from column

classmethod split_nano(column)[source]#

Splits a string assumed to be in nano-style underscore format and returns the fragments, potentially with selection, slice and advanced indexing expressions.

Parameters:

column (str) – Name of the column to be split

Raises:

ValueError – If column is malformed, specifically if brackets are not encountered in pairs (i.e. opening backet w/o closing and vice versa).

Return type:

tuple[str | int | slice | type(Ellipsis) | list | tuple]

Returns:

tuple of subcomponents extracted from column

property fields: tuple#
property column: str#
property nano_column: str#
property string_column: str#
property string_nano_column: str#
add(other)[source]#

Adds an other route instance, or the fields extracted from either a sequence of strings or a string in dot format to the fields if this instance. A ValueError is raised when other could not be interpreted.

Return type:

None

pop(index=-1)[source]#

Removes a field at index and returns it.

Return type:

str

reverse()[source]#

Reverses the fields of this route in-place.

Return type:

None

copy()[source]#

Returns a copy if this instance.

Return type:

Route

apply(ak_array, null_value=<object object>)[source]#

Returns a selection of ak_array using the fields in this route. When the route is empty, ak_array is returned unchanged. When null_value is set, it is used to fill up missing elements in the selection corresponding to this route. Example:

# select the 6th jet in each event
Route("Jet.pt[:, 5]").apply(events)
# -> might lead to "index out of range" errors for events with fewer jets

Route("Jet.pt[:, 5]").apply(events, -999)
# -> [
#     34.15625,
#     17.265625,
#     -999.0,  # 6th jet was missing here
#     19.40625,
#     ...
# ]
Return type:

Array

class RouteFilter(keep_routes)[source]#

Bases: object

Shallow helper class that handles removal of routes in an awkward array that do not match those in keep_routes. Each route can either be a Route instance, or anything that is accepted by its constructor. Example:

route_filter = RouteFilter(["Jet.pt", "Jet.eta"])
events = route_filter(events)

print(get_ak_routes(events))
# [
#    "Jet.pt",
#    "Jet.eta",
# ]
keep_routes#

type: list

The routes to keep.

remove_routes#

type: None, set

A set of Route instances that are removed, defined after the first call to this instance.

class ArrayFunction(call_func=law.util.no_value, init_func=law.util.no_value, skip_func=law.util.no_value, check_used_columns=None, check_produced_columns=None, instance_cache=None, log_runtime=None, deferred_init=True, **kwargs)[source]#

Bases: Derivable

Base class for function wrappers that act on arrays and keep track of used as well as produced columns, as close as possible to the actual implementation. ArrayFunction’s can express the dependence between one another (either via used or produced columns) (1) and they can invoke one another for the purpose of modularity (2). Knowledge of the columns to load (save) is especially useful when opening (writing) files and selecting the content to deserialize (serialize).

To understand the internals of both (1) and (2), it is imperative to distinguish between ArrayFunction subclasses and their instances, as shown in the example below.

class my_func(ArrayFunction):
    uses = {"Jet.pt"}
    produces = {"Jet.pt2"}

    def call_func(self, events):
        events["Jet", "pt"] = events.Jet.pt ** 2

class my_other_func(ArrayFunction):
    uses = {my_func}
    produces = {"Jet.pt4"}

    def call_func(self, events):
        # call the correct my_func instance
        events = self[my_func](events)

        events["Jet", "pt4"] = events.Jet.pt2 ** 2

# call my_other_func on a chunk of events
inst = my_other_func()
inst(events)

ArrayFunction’s declare dependence between one another through class-level sets uses and produces. This allows for the construction of an internal callstack. Once an ArrayFunction is instantiated, all dependent objects in this callstack are instantiated as well and stored internally mapped to their class. This is strictly required as ArrayFunctions, and most likely their subclasses, can have a state (a set of instance-level members that are allowed to differ between instances). The instance of a dependency can be accessed via item syntax (self[my_func] above).

Note

The above example uses explicit subclassing, but most certainly this might never be used in practice. Instead, please consider using a decorator to wrap the main callable as done by the Calibrator, Selector and Producer interfaces.

uses and produces should be strings denoting a column in dot format or a Route instance, other ArrayFunction instances, or a sequence or set of the two. On instance-level, the full sets of used_columns and produced_columns are simply resolvable through attributes.

call_func defines the function being invoked when the instance is called. An additional initialization function can be wrapped through a decorator (similiar to property setters) as shown in the example below. They constitute a mechanism to update the uses and produces sets to declare dependencies in a more dynamic way.

@my_other_func.init
def my_other_func_init(self: ArrayFunction):
    self.uses.add(my_func)

# the above adds an initialization function to the already created class my_other_func,
# but the same could also be achived by just declaring an instance method *init_func* as
# part of the class definition above

Another function that can be dynamically assined (or simply implemented in the subclass) is skip_func. If set, it is assumed to be a function that returns a bool, deciding on whether to include this ArrayFunction in the dependencies of other instances. The decorator

In the example above, my_other_func declares a dependence on my_func by listing it in uses. This means that my_other_func uses the columns that my_func also uses. The same logic applies for produces. To make this dependence more explicit, the entry could also be changed to my_func.USES which results in the same behavior, or my_func.PRODUCES to reverse it - my_other_func would define that it is using the columns that my_func produces. Omitting these flags is identical to using (e.g.) my_func.AUTO.

classattribute uses#

type: set

The set of used column names or other dependencies to recursively resolve the names of used columns.

classattribute produces#

type: set

The set of produced column names or other dependencies to recursively resolve the names of produced columns.

classattribute AUTO[source]#

type: ArrayFunction.IOFlag

Flag that can be used in nested dependencies between array functions to denote automatic resolution of column names.

classattribute USES[source]#

type: ArrayFunction.IOFlag

Flag that can be used in nested dependencies between array functions to denote columns names in the uses set.

classattribute PRODUCES[source]#

type: ArrayFunction.IOFlag

Flag that can be used in nested dependencies between array functions to denote columns names in the produces set.

classattribute check_used_columns#

type: bool

A flag that decides whether, during the actual call, the input array should be checked for the existence of all non-optional columns defined in uses. If a column is missing, an exception is raised. A column, represented by a Route object internally, is considered optional if it has a tag "optional" as, for instance, added by optional_column().

classattribute check_produced_columns#

type: bool

A flag that decides whether, after the actual call, the output array should be checked for the existence of all non-optional columns defined in produces. If a column is missing, an exception is raised. A column, represented by a Route object internally, is considered optional if it has a tag "optional" as, for instance, added by optional_column().

uses_instances#

type: set

The set of used column names or instantiated dependencies to recursively resolve the names of used columns. Set during the deferred initialization.

produces_instances#

type: set

The set of produces column names or instantiated dependencies to recursively resolve the names of produced columns. Set during the deferred initialization.

deps#

type: dict

The callstack of dependencies, i.e., a dictionary mapping dependent classes to their instances as to be used by this instance. Item access on this instance is forwarded to this object.

deps_kwargs#

type: dict

Optional keyword arguments mapped to dependent classes that are forwarded to their initialization.

call_func#

type: callable

The wrapped function to be called on arrays.

Attributes:

uses

produces

AUTO

USES

PRODUCES

check_used_columns

check_produced_columns

log_runtime

call_func

init_func

skip_func

used_columns

produced_columns

Classes:

IOFlag(value)

An enumeration.

IOFlagged(wrapped, io_flag)

DeferredColumn(*columns)

Methods:

init(func)

Decorator to wrap a function func that should be registered as init_func() which is used to initialize this instance dependent on specific task attributes.

skip(func)

Decorator to wrap a function func that should be registered as skip_func() which is used to decide whether this instance should be skipped during the building of dependency trees in other ArrayFunction's.

deferred_init([instance_cache])

Controls the deferred part of the initialization process.

create_dependencies(instance_cache[, ...])

Walks through all dependencies configured in the _dependency_sets and fills deps as well as separate sets, corresponding to the classes defined in _dependency_sets (e.g.

instantiate_dependency(cls, **kwargs)

Controls the instantiation of a dependency given by its cls and arbitrary kwargs.

get_dependencies()

Returns a set of instances of all dependencies.

uses = {}#
produces = {}#
class IOFlag(value)[source]#

Bases: Flag

An enumeration.

Attributes:

AUTO

USES

PRODUCES

AUTO = 1#
USES = 2#
PRODUCES = 4#
class IOFlagged(wrapped, io_flag)#

Bases: tuple

Attributes:

io_flag

Alias for field number 1

wrapped

Alias for field number 0

io_flag#

Alias for field number 1

wrapped#

Alias for field number 0

class DeferredColumn(*columns)[source]#

Bases: object

Methods:

deferred_column([call_func])

Subclass factory to be used as a decorator wrapping a call_func that will be used as the new __call__ method.

get()

rtype:

Any | set[Any]

classmethod deferred_column(call_func=None)[source]#

Subclass factory to be used as a decorator wrapping a call_func that will be used as the new __call__ method. Note that the use of super() inside the decorated method should always be done with arguments (i.e., class and instance).

Return type:

ArrayFunction.DeferredColumn | Callable

get()[source]#
Return type:

Any | set[Any]

AUTO = IOFlagged(wrapped=<class 'columnflow.columnar_util.ArrayFunction'>, io_flag=<IOFlag.AUTO: 1>)[source]#
USES = IOFlagged(wrapped=<class 'columnflow.columnar_util.ArrayFunction'>, io_flag=<IOFlag.USES: 2>)[source]#
PRODUCES = IOFlagged(wrapped=<class 'columnflow.columnar_util.ArrayFunction'>, io_flag=<IOFlag.PRODUCES: 4>)[source]#
classmethod init(func)[source]#

Decorator to wrap a function func that should be registered as init_func() which is used to initialize this instance dependent on specific task attributes. The function should not accept positional arguments.

The decorator does not return the wrapped function.

Return type:

None

classmethod skip(func)[source]#

Decorator to wrap a function func that should be registered as skip_func() which is used to decide whether this instance should be skipped during the building of dependency trees in other ArrayFunction’s.

The function should not accept positional arguments and return a boolean.

Return type:

None

check_used_columns = True#
check_produced_columns = True#
log_runtime = False#
call_func = None#
init_func = None#
skip_func = None#
deferred_init(instance_cache=None)[source]#

Controls the deferred part of the initialization process.

Return type:

dict

create_dependencies(instance_cache, only_update=False)[source]#

Walks through all dependencies configured in the _dependency_sets and fills deps as well as separate sets, corresponding to the classes defined in _dependency_sets (e.g. uses -> uses_instances).

instance_cache is a dictionary that is serves as a cache to prevent same classes being instantiated multiple times.

Return type:

None

instantiate_dependency(cls, **kwargs)[source]#

Controls the instantiation of a dependency given by its cls and arbitrary kwargs. The latter update optional keyword arguments in self.deps_kwargs and are then forwarded to the instantiation.

Return type:

ArrayFunction

get_dependencies()[source]#

Returns a set of instances of all dependencies.

Return type:

set[ArrayFunction | Any]

property used_columns: set[columnflow.columnar_util.Route]#
property produced_columns: set[columnflow.columnar_util.Route]#
class TaskArrayFunction(*args, requires_func=law.util.no_value, setup_func=law.util.no_value, sandbox=law.util.no_value, call_force=law.util.no_value, pick_cached_result=law.util.no_value, inst_dict=None, **kwargs)[source]#

Bases: ArrayFunction

Subclass of ArrayFunction providing an interface to certain task features such as declaring dependent or produced shifts, task requirements, and defining a custom setup function. In addition, there is the option to update all these configurations based on task attributes.

shifts can be defined similarly to columns to use and/or produce in the ArrayFunction base class. It can be a sequence or set of shift names, or dependent TaskArrayFunction’s. Similar to used_columns and produced_columns, the all_shifts property returns a flat set of all shifts, potentially resolving information from dependencies registered in py:attr:`uses, py:attr:`produces and py:attr:`shifts itself.

As opposed to more basic ArrayFunction’s, instances of this class have a direct interface to tasks and can influence their behavior - and vice-versa. For this purpose, custom task requirements, and a setup of objects resulting from these requirements can be defined in a similar, programmatic way. Also, they might define an optional sandbox that is required to run this array function.

Exmple:

class my_func(ArrayFunction):
    uses = {"Jet.pt"}
    produces = {"Jet.pt_weighted"}

    def call_func(self, events):
        # self.weights is defined below
        events["Jet", "pt_weighted"] = events.Jet.pt * self.weights

# define requirements that (e.g.) compute the weights
@my_func.requires
def requires(self, reqs):
    # fill the requirements dict
    reqs["weights_task"] = SomeWeightsTask.req(self.task)
    reqs["columns_task"] = SomeColumnsTask.req(self.task)

# define the setup step that loads event weights from the required task
@my_func.setup
def setup(self, reqs, inputs, reader_targets):
    # load the weights once, inputs is corresponding to what we added to reqs above
    weights = inputs["weights_task"].load(formatter="json")

    # save them as an instance attribute
    self.weights = weights

    # fill the reader_targets to be added in an event chunk loop
    reder_targets["my_columns"] = inputs["columns_task"]["columns"]

# call my_func on a chunk of events
inst = my_func()
inst(events)

For a possible implementation, see columnflow.production.pileup.

Note

The above example uses explicit subclassing, mixed with decorator usage to extend the class. This is most certainly never used in practice. Instead, please either consider defining the class the normal way, or use a decorator to wrap the main callable first and by that creating the class as done by the Calibrator, Selector and Producer interfaces.

classattribute shifts#

type: set

The set of dependent or produced shifts, or other dependencies to recursively resolve the names of shifts.

shifts_instances#

type: set

The set of shift names or instantiated dependencies to recursively resolve the names of shifts. Set during the deferred initialization.

all_shifts#

type: set read-only

The resolved, flat set of dependent or produced shifts.

setup_func#

type: callable

The registered function performing the custom setup step, or None.

sandbox#

type: str, None

A optional string referring to a sandbox that is required to run this array function.

call_force#

type: None, bool

When a bool, this flag decides whether calls of this instance are cached. However, note that when the call_force flag passed to __call__() is specified, it has precedence over this attribute.

pick_cached_result[source]#

type: callable

A callable that is given a previously cached result, and all arguments and keyword arguments of the main call function, to pick values that should be returned and cached for the next invocation.

Attributes:

shifts

requires_func

setup_func

sandbox

call_force

all_shifts

Methods:

requires(func)

Decorator to wrap a function func that should be registered as requires_func() which is used to define additional task requirements.

setup(func)

Decorator to wrap a function func that should be registered as setup_func() which is used to perform a custom setup of objects.

pick_cached_result(cached_result, *args, ...)

Default implementation for picking a return value from a previously cached_result and all args and kwargs that were passed to the main call method.

instantiate_dependency(cls, **kwargs)

Controls the instantiation of a dependency given by its cls and arbitrary kwargs, updated by this instances inst_dict.

run_requires([reqs, _cache])

Recursively runs the requires_func() of this instance and all dependencies.

run_setup(reqs, inputs[, reader_targets, _cache])

Recursively runs the setup_func() of this instance and all dependencies.

shifts = {}#
classmethod requires(func)[source]#

Decorator to wrap a function func that should be registered as requires_func() which is used to define additional task requirements. The function should accept one positional argument: :rtype: None

  • reqs, a dictionary into which requirements should be inserted.

The decorator does not return the wrapped function.

Note

When the task invoking the requirement is workflow, be aware that both the actual workflow instance as well as branch tasks might call the wrapped function. When the requirements should differ between them, make sure to use the BaseWorkflow.is_workflow() and BaseWorkflow.is_branch() methods to distinguish the cases.

classmethod setup(func)[source]#

Decorator to wrap a function func that should be registered as setup_func() which is used to perform a custom setup of objects. The function should accept two positional arguments: :rtype: None

  • reqs, a dictionary containing the required tasks as defined by the custom requires_func().

  • inputs, a dictionary containing the outputs created by the tasks in reqs.

  • reader_targets, an InsertableDict containing the targets to be included in an event chunk loop

The decorator does not return the wrapped function.

requires_func = None#
setup_func = None#
sandbox = None#
call_force = None#
static pick_cached_result(cached_result, *args, **kwargs)[source]#

Default implementation for picking a return value from a previously cached_result and all args and kwargs that were passed to the main call method.

Return type:

TypeVar(T)

instantiate_dependency(cls, **kwargs)[source]#

Controls the instantiation of a dependency given by its cls and arbitrary kwargs, updated by this instances inst_dict.

Return type:

TaskArrayFunction

property all_shifts: set[str]#
run_requires(reqs=None, _cache=None)[source]#

Recursively runs the requires_func() of this instance and all dependencies. reqs defaults to an empty dictionary which should be filled to store the requirements.

Return type:

dict

run_setup(reqs, inputs, reader_targets=None, _cache=None)[source]#

Recursively runs the setup_func() of this instance and all dependencies. reqs corresponds to the requirements created by run_requires(), and inputs are their outputs. reader_targets defaults to an empty InsertableDict which should be filled to store targets of columnar data that are to be included in an event chunk loop.

Return type:

dict[str, law.FileSystemTarget]

class ChunkedIOHandler(source, source_type=None, chunk_size=100000, pool_size=2, open_options=None, read_options=None, read_columns=None, iter_message='handling chunk {pos.index}', debug=False)[source]#

Bases: object

Allows reading one or multiple files and iterating through chunks of their content with multi-threaded read operations. Chunks and their positions (denoted by start and stop markers, and the index of the chunk itself) are accessed by iterating through a handler instance. Also, this handler allows interactions with the internal queue handling tasks in multiple-threads to effectively write output chunks within the same pool of threads.

The content to load is configurable through source, which can be a file path or an opened file object, and a source_type, which defines how the source should be opened and traversed for chunking. See the classmethods open_... and read_... below for implementation details and get_source_handler() for a list of currently supported sources.

Example:

# iterate through a single file
# (creating the handler and iterating through it in the same line)
for chunk, position in ChunkedIOHandler("data.root", source_type="coffea_root"):
    # chunk is a NanoAODEventsArray as returned by read_coffea_root
    jet_pts = chunk.Jet.pt
    print(f"jet pts of chunk {chunk.index}: {jet_pts}")
# iterate through multiple files simultaneously
# (also, now creating the handler first and then iterating through it)
with ChunkedIOHandler(
    ("data.root", "masks.parquet"),
    source_type=("coffea_root", "awkward_parquet"),
) as handler:
    for (chunk, masks), position in handler:
        # chunk is a NanoAODEventsArray as returned by read_coffea_root
        # masks is an awkward array as returned by read_awkward_parquet
        selected_jet_pts = chunk[masks].Jet.pt
        print(f"selected jet pts of chunk {chunk.index}: {selected_jet_pts}")

        # add a callback to the task queue, e.g. for saving a column
        handler.queue(ak.to_parquet, (selected_jet_pts, "some/path.parquet"))

The maximum size of the chunks and the number of threads to load them can be configured through chunk_size and pool_size. Chunks are fully loaded into memory before they are yielded to be used in the main thread.

In addition, open_options and read_options are forwarded to internal open and read implementations to further control and optimize IO. For instance, they can be configured to select only a subset of (nested) columns to read from disk. However, since this is highly dependent on the specific source type, read_columns can be defined as a set (!) of strings (in nano-style dot format) or Route objects and is added to either open_options or read_options internally (source type dependent).

If source refers to a single object, source_type, open_options, read_options and read_columns should be single values as well. Otherwise, if source is a sequence of sources, the other arguments can be sequences as well with the same length.

During iteration, before chunks are yielded, an optional message iter_message is printed when set, receiving the respective ChunkPosition as the field pos for formatting.

Classes:

SourceHandler(type, open, close, read)

ChunkPosition(index, entry_start, ...)

ReadResult(chunk, chunk_pos)

Methods:

create_chunk_position(n_entries, chunk_size, ...)

Creates and returns a ChunkPosition object based on the total number of entries n_entries, the maximum chunk_size, and the index of the chunk chunk_index.

get_source_handler(source_type, source)

Takes a source_type (see list below) and gathers information about how to open, read content from, and close a specific source, and returns that information in a named SourceHandler tuple.

open_uproot_root(source[, open_options, ...])

Opens an uproot tree from a root file at source and returns a 2-tuple (tree, entries).

close_uproot_root(source_object)

Closes the file that contains the TTree source_object.

read_uproot_root(source_object, chunk_pos[, ...])

Given an uproot TTree source_object, returns an awkward array chunk referred to by chunk_pos.

open_coffea_root(source[, open_options, ...])

Opens an uproot file at source for subsequent processing with coffea and returns a 2-tuple (uproot file, tree entries).

close_coffea_root(source_object)

Closes a ROOT file referred to by source_object if it is a ReadOnlyDirectory.

read_coffea_root(source_object, chunk_pos[, ...])

Given a file location or opened uproot file source_object, returns an awkward array chunk referred to by chunk_pos, assuming nanoAOD structure.

open_coffea_parquet(source[, open_options, ...])

Given a parquet file located at source, returns a 2-tuple (source, entries).

close_coffea_parquet(source_object)

This is a placeholder method and has no effect.

read_coffea_parquet(source_object, chunk_pos)

Given a the location of a parquet file source_object, returns an awkward array chunk referred to by chunk_pos, assuming nanoAOD structure.

open_awkward_parquet(source[, open_options, ...])

Opens a parquet file saved at source, loads the content as an dask awkward array, wrapped by a DaskArrayReader, and returns a 2-tuple (array, length).

close_awkward_parquet(source_object)

Closes the dask array wrapper referred to by source_object.

read_awkward_parquet(source_object, chunk_pos)

Given a DaskArrayReader source_object, returns the chunk referred to by chunk_pos as a full copy loaded into memory.

open()

Opens all previously registered sources and preloads all source objects to read content from later on.

close()

Closes all cached, opened files and deletes loaded source objects.

queue(*args, **kwargs)

Adds a new task to the internal task queue with all args and kwargs forwarded to TaskQueue.add().

Attributes:

n_chunks

Returns the number of chunks this instance will iterate over based on the number of entries n_entries and the configured chunk_size.

closed

Returns whether the instance is closed for reading.

class SourceHandler(type, open, close, read)#

Bases: tuple

Attributes:

close

Alias for field number 2

open

Alias for field number 1

read

Alias for field number 3

type

Alias for field number 0

close#

Alias for field number 2

open#

Alias for field number 1

read#

Alias for field number 3

type#

Alias for field number 0

class ChunkPosition(index, entry_start, entry_stop, max_chunk_size)#

Bases: tuple

Attributes:

entry_start

Alias for field number 1

entry_stop

Alias for field number 2

index

Alias for field number 0

max_chunk_size

Alias for field number 3

entry_start#

Alias for field number 1

entry_stop#

Alias for field number 2

index#

Alias for field number 0

max_chunk_size#

Alias for field number 3

class ReadResult(chunk, chunk_pos)#

Bases: tuple

Attributes:

chunk

Alias for field number 0

chunk_pos

Alias for field number 1

chunk#

Alias for field number 0

chunk_pos#

Alias for field number 1

classmethod create_chunk_position(n_entries, chunk_size, chunk_index)[source]#

Creates and returns a ChunkPosition object based on the total number of entries n_entries, the maximum chunk_size, and the index of the chunk chunk_index.

Return type:

ChunkPosition

classmethod get_source_handler(source_type, source)[source]#

Takes a source_type (see list below) and gathers information about how to open, read content from, and close a specific source, and returns that information in a named SourceHandler tuple.

When source_type is None but an arbitrary source is set, the type is derived from that object, and an exception is raised in case no type can be inferred.

Currently supported source types are: :rtype: SourceHandler

  • “uproot_root”

  • “coffea_root”

  • “coffea_parquet”

  • “awkward_parquet”

classmethod open_uproot_root(source, open_options=None, read_columns=None)[source]#

Opens an uproot tree from a root file at source and returns a 2-tuple (tree, entries). source can be the path of the file, an already opened, readable uproot file (assuming the tree is called “Events”), or a 2-tuple whose second item defines the name of the tree to be loaded. When a new file is opened, it receives open_options. Passing read_columns has no effect.

Return type:

tuple[uproot.TTree, int]

classmethod close_uproot_root(source_object)[source]#

Closes the file that contains the TTree source_object.

Return type:

None

classmethod read_uproot_root(source_object, chunk_pos, read_options=None, read_columns=None)[source]#

Given an uproot TTree source_object, returns an awkward array chunk referred to by chunk_pos. read_options are passed to uproot.TTree.arrays. read_columns are converted to strings and, if not already present, added as field filter_name to read_options.

Return type:

ak.Array

classmethod open_coffea_root(source, open_options=None, read_columns=None)[source]#

Opens an uproot file at source for subsequent processing with coffea and returns a 2-tuple (uproot file, tree entries). source can be the path of the file, an already opened, readable uproot file (assuming the tree is called “Events”), or a 2-tuple whose second item defines the name of the tree to be loaded. open_options are forwarded to uproot.open if a new file is opened. Passing read_columns has no effect.

Return type:

tuple[uproot.ReadOnlyDirectory, int]

classmethod close_coffea_root(source_object)[source]#

Closes a ROOT file referred to by source_object if it is a ReadOnlyDirectory. In case a string is passed, this method does nothing.

Return type:

None

classmethod read_coffea_root(source_object, chunk_pos, read_options=None, read_columns=None)[source]#

Given a file location or opened uproot file source_object, returns an awkward array chunk referred to by chunk_pos, assuming nanoAOD structure. read_options are passed to coffea.nanoevents.NanoEventsFactory.from_root. read_columns are converted to strings and, if not already present, added as nested field iteritems_options.filter_name to read_options.

Return type:

coffea.nanoevents.methods.base.NanoEventsArray

classmethod open_coffea_parquet(source, open_options=None, read_columns=None)[source]#

Given a parquet file located at source, returns a 2-tuple (source, entries). Passing open_options or read_columns has no effect.

Return type:

tuple[str, int]

classmethod close_coffea_parquet(source_object)[source]#

This is a placeholder method and has no effect.

Return type:

None

classmethod read_coffea_parquet(source_object, chunk_pos, read_options=None, read_columns=None)[source]#

Given a the location of a parquet file source_object, returns an awkward array chunk referred to by chunk_pos, assuming nanoAOD structure. read_options are passed to coffea.nanoevents.NanoEventsFactory.from_parquet. read_columns are converted to strings and, if not already present, added as nested field parquet_options.read_dictionary to read_options.

Return type:

coffea.nanoevents.methods.base.NanoEventsArray

classmethod open_awkward_parquet(source, open_options=None, read_columns=None)[source]#

Opens a parquet file saved at source, loads the content as an dask awkward array, wrapped by a DaskArrayReader, and returns a 2-tuple (array, length). open_options and chunk_size are forwarded to DaskArrayReader. read_columns are converted to strings and, if not already present, added as field columns to open_options.

Return type:

tuple[ak.Array, int]

classmethod close_awkward_parquet(source_object)[source]#

Closes the dask array wrapper referred to by source_object.

Return type:

None

classmethod read_awkward_parquet(source_object, chunk_pos, read_options=None, read_columns=None)[source]#

Given a DaskArrayReader source_object, returns the chunk referred to by chunk_pos as a full copy loaded into memory. Passing neither read_options nor read_columns has an effect.

Return type:

ak.Array

property n_chunks: int#

Returns the number of chunks this instance will iterate over based on the number of entries n_entries and the configured chunk_size. In case n_entries was not initialzed yet (via open()), an AttributeError is raised.

property closed: bool#

Returns whether the instance is closed for reading.

open()[source]#

Opens all previously registered sources and preloads all source objects to read content from later on. Nothing happens if this instance is already opened (i.e. not closed).

Return type:

None

close()[source]#

Closes all cached, opened files and deletes loaded source objects. Nothing happens if the instance is already closed for reading.

Return type:

None

queue(*args, **kwargs)[source]#

Adds a new task to the internal task queue with all args and kwargs forwarded to TaskQueue.add().

Return type:

None

eval_item(s: str) Any = <columnflow.columnar_util.ItemEval object>#

ItemEval singleton mimicking a function.

get_ak_routes(ak_array, max_depth=0)[source]#

Extracts all routes pointing to columns of a potentially deeply nested awkward array ak_array and returns them in a list of Route instances. Example:

# let arr be a nested array (e.g. from opening nano files via coffea)
# (note that route objects serialize to strings using dot format)

print(get_ak_routes(arr))
# [
#    "event",
#    "luminosityBlock",
#    "run",
#    "Jet.pt",
#    "Jet.mass",
#    ...
# ]

When positive, max_depth controls the maximum size of returned route tuples. When negative, routes are shortened by the passed amount of elements. In both cases, only unique routes are returned.

Return type:

list[Route]

has_ak_column(ak_array, route)[source]#

Returns whether an awkward array ak_array contains a nested field identified by a route. A route can be a Route instance, a tuple of strings where each string refers to a subfield, e.g. ("Jet", "pt"), or a string with dot format (e.g. "Jet.pt").

Return type:

bool

set_ak_column(ak_array, route, value, value_type=None)[source]#

Inserts a new column into awkward array ak_array and returns a new view with the column added or overwritten.

The column can be defined through a route, i.e., a Route instance, a tuple of strings where each string refers to a subfield, e.g. ("Jet", "pt"), or a string with dot format (e.g. "Jet.pt"), and the column value itself. Intermediate, non-existing fields are automatically created. When a value_type is defined, ak_array is casted into this type before it is inserted.

Example:

arr = ak.zip({"Jet": {"pt": [30], "eta": [2.5]}})

set_ak_column(arr, "Jet.mass", [40])
set_ak_column(arr, "Muon.pt", [25])  # creates subfield "Muon" first
Return type:

ak.Array

Note

Issues can arise in cases where the route to add already exists and has a different type than the newly added value. For this reason, existing columns are removed first, creating a view to operate on.

remove_ak_column(ak_array, route, remove_empty=True, silent=False)[source]#

Removes a route from an awkward array ak_array and returns a new view with the corresponding column removed. When route points to a nested field that would be empty after removal, the parent field is removed completely unless remove_empty is False.

Note that route can be a Route instance, a sequence of strings where each string refers to a subfield, e.g. ("Jet", "pt"), or a string with dot format (e.g. "Jet.pt"). Unless silent is True, a ValueError is raised when the route does not exist.

Return type:

ak.Array

add_ak_alias(ak_array, src_route, dst_route, remove_src=False, missing_strategy='original')[source]#

Adds an alias to an awkward array ak_array pointing the array at src_route to dst_route and returns a new view with the alias applied.

Note that existing columns referred to by dst_route might be overwritten. When remove_src is True, a view of the input array is returned with the column referred to by src_route missing. Both routes can be Route instances, a tuple of strings where each string refers to a subfield, e.g. ("Jet", "pt"), or a string with dot format (e.g. "Jet.pt").

In case src_route does not exist, missing_strategy is applied: :rtype: ak.Array

  • "original": If existing, dst_route remains unchanged.

  • "remove": If existing, dst_route is removed.

  • "raise": A ValueError is raised.

Examples:

# example 1
events = ak.Array(...)  # contains Jet.pt and Jet.pt_jec_up

events = add_ak_alias(events, "Jet.pt_jec_up", "Jet.pt")
# -> events.Jet.pt will now be the same as Jet.pt_jec_up

events = add_ak_alias(events, "Jet.pt_jec_up", "Jet.pt", remove_src=True)
# -> again, events.Jet.pt will now be the same as Jet.pt_jec_up but the latter is removed


# example 2
events = ak.Array(...)  # contains only Jet.pt

events = add_ak_alias(events, "Jet.pt_jec_up", "Jet.pt")
# -> the source column "Jet.pt_jec_up" does not exist, so nothing happens

events = add_ak_alias(events, "Jet.pt_jec_up", "Jet.pt", missing_strategy="raise")
# -> an exception will be raised since the source column is missing

events = add_ak_alias(events, "Jet.pt_jec_up", "Jet.pt", missing_strategy="remove")
# -> the destination column "Jet.pt" will be removed as there is no source column to alias
add_ak_aliases(ak_array, aliases, **kwargs)[source]#

Adds multiple aliases, given in a dictionary mapping destination columns to source columns, to an awkward array ak_array and returns a new view with the aliases applied.

Each column in this dictionary can be referred to by a Route instance, a tuple of strings where each string refers to a subfield, e.g. ("Jet", "pt"), or a string with dot format (e.g. "Jet.pt").

All additional kwargs are forwarded to add_ak_aliases().

Return type:

ak.Array

update_ak_array(ak_array, *others, overwrite_routes=True, add_routes=False, concat_routes=False)[source]#

Updates an awkward array ak_array with the content of multiple different arrays others and potentially (see below) returns a new view. Internally, get_ak_routes() is used to obtain the list of all routes pointing to potentially deeply nested arrays.

If two columns overlap during this update process, four different cases can be configured to occur: :rtype: ak.Array

  1. If concat_routes is either True or a list of routes containing the route in question, the columns are concatenated along the last axis. This obviously implies that their shapes must be compatible.

  2. If case 1 does not apply and add_routes is either True or a list of routes containing the route in question, the columns are added using the plus operator, forwarding the actual implementation to awkward.

  3. If cases 1 and 2 do not apply and overwrite_routes is either True or a list of routes containing the route in question, new columns (right most in others) overwrite existing ones. A new view is returned in case this case occurs at least once.

  4. If none of the cases above apply, columns remain unchanged.

flatten_ak_array(ak_array, routes=None, nano_format=False)[source]#

Flattens a nested awkward array ak_array into a dictionary that maps joined column names to single awkward arrays. The returned dictionary might be used in conjuction with ak.Array to create a single array again.

get_ak_routes() is used internally to determine the nested structure of the array. Names of flat columns in the returned dictionary follow the standard dot format by default, and nano-style underscore format if nano_format is True. The columns to save can be defined via routes which can be a sequence or set of column names or a function receiving a column name and returning a bool.

Return type:

OrderedDict

sort_ak_fields(ak_array, sort_fn=None)[source]#

Recursively sorts all fields of an awkward array ak_array and returns a new view. When a sort_fn is set, it is used internally for sorting field names.

Return type:

ak.Array

sorted_ak_to_parquet(ak_array, *args, **kwargs)[source]#

Sorts the fields in an awkward array ak_array recursively with sort_ak_fields() and saves it as a parquet file using awkward.to_parquet which receives all additional args and kwargs. :rtype: None

Note

Since the order of fields in awkward arrays resulting from reading nano files might be arbitrary (depending on streamer info in the original root files), but formats such as parquet highly depend on the order for building internal table schemas, one should always make use of this function! Otherwise, operations like file merging might fail due to differently ordered schemas.

sorted_ak_to_root(ak_array, path, tree_name='events')[source]#

Sorts the fields in an awkward array ak_array recursively with sort_ak_fields() and saves it as a root tree named tree_name to a file at path using uproot.

Please note that optional types, denoted by e.g. "?float32", cannot be saved in root trees and are therefore converted to their non-optional equivalent using awkward.drop_none().

Parameters:
  • ak_array (Array) – The input array.

  • path (str) – The path of the root file to create.

  • tree_name (str, default: 'events') – The name of the tree to create inside the root file.

Return type:

None

attach_behavior(ak_array, type_name, behavior=None, keep_fields=None, skip_fields=None)[source]#

Attaches behavior of type type_name to an awkward array ak_array and returns it. type_name must be a key of a behavior dictionary which defaults to the “behavior” attribute of ak_array when present. Otherwise, a ValueError is raised.

By default, all subfields of ak_array are kept. For further control, keep_fields (skip_fields) can contain names or name patterns of fields that are kept (filtered). keep_fields has priority, i.e., when it is set, skip_fields is not considered.

Return type:

ak.Array

layout_ak_array(data_array, layout_array)[source]#

Takes a data_array and structures its contents into the same structure as layout_array, with up to one level of nesting. In particular, this function can be used to create new awkward arrays from existing numpy arrays and forcing a known, potentially ragged shape to it. Example:

a = np.array([1.0, 2.0, 3.0, 4.0, 5.0])
b = ak.Array([[], [0, 0], [], [0, 0, 0]])

c = layout_ak_array(a, b)
# <Array [[], [1.0, 2.0], [], [3.0, 4.0, 5.0]] type='4 * var * float32'>
Return type:

ak.Array

flat_np_view(ak_array, axis=1)[source]#

Takes an ak_array and returns a fully flattened numpy view. The flattening is applied along axis. See ak.flatten for more info. :rtype: array

Note

Changes applied in-place to that view are transferred to the original ak_array, but only when the axis is not None but an integer value. For this reason, passing axis=None will cause an exception to be thrown.

deferred_column(call_func=None)#

Subclass factory to be used as a decorator wrapping a call_func that will be used as the new __call__ method. Note that the use of super() inside the decorated method should always be done with arguments (i.e., class and instance).

Return type:

ArrayFunction.DeferredColumn | Callable

optional_column(*routes)[source]#

Takes one or several objects routes whose type can be anything that is accepted by the Route constructor, and returns a single or a set of route objects being tagged "optional".

Return type:

Route | set[Route]