base

Contents

base#

Generic tools and base tasks that are defined along typical objects in an analysis.

Classes:

Requirements(*others, **kwargs)

General class for requirements of different tasks.

BaseTask(*args, **kwargs)

OutputLocation(value)

Output location flag.

AnalysisTask(*args, **kwargs)

ConfigTask(*args, **kwargs)

ShiftTask(*args, **kwargs)

DatasetTask(*args, **kwargs)

CommandTask(*args, **kwargs)

A task that provides convenience methods to work with shell commands, i.e., printing them on the command line and executing them with error handling.

Functions:

wrapper_factory(base_cls, require_cls, enable)

Factory function creating wrapper task classes, inheriting from base_cls and WrapperTask, that do nothing but require multiple instances of require_cls.

class Requirements(*others, **kwargs)[source]#

Bases: DotDict

General class for requirements of different tasks.

Can be initialized with other DotDict instances and additional keyword arguments kwargs, which are added.

class BaseTask(*args, **kwargs)[source]#

Bases: Task

Attributes:

task_namespace

This value can be overridden to set the namespace that will be used.

reqs

exclude_index

exclude_params_index

exclude_params_repr

exclude_params_repr_empty

exclude_params_req

exclude_params_req_get

exclude_params_req_set

task_namespace = 'cf'#

This value can be overridden to set the namespace that will be used. (See Task.namespaces_famlies_and_ids) If it’s not specified and you try to read this value anyway, it will return garbage. Please use get_task_namespace() to read the namespace.

Note that setting this value with @property will not work, because this is a class level value.

reqs = {}#
exclude_index = False#
exclude_params_index = {}#
exclude_params_repr = {}#
exclude_params_repr_empty = {}#
exclude_params_req = {}#
exclude_params_req_get = {}#
exclude_params_req_set = {}#
class OutputLocation(value)[source]#

Bases: Enum

Output location flag.

Attributes:

config

local

wlcg

config = 'config'#
local = 'local'#
wlcg = 'wlcg'#
class AnalysisTask(*args, **kwargs)[source]#

Bases: BaseTask, SandboxTask

Attributes:

analysis

version

allow_empty_sandbox

sandbox

message_cache_size

local_workflow_require_branches

default_store

default_wlcg_fs

default_output_location

exclude_index

exclude_params_index

exclude_params_repr

exclude_params_repr_empty

exclude_params_req

exclude_params_req_get

exclude_params_req_set

exclude_params_sandbox

Classes:

output_collection_cls

alias of SiblingFileCollection

Methods:

modify_param_values(params)

Hook to modify command line arguments before instances of this class are created.

resolve_param_values(params)

rtype:

dict

get_analysis_inst(analysis)

rtype:

Analysis

req_params(inst, **kwargs)

Returns parameters that are jointly defined in this class and another task instance of some other class.

get_version_map(task)

rtype:

dict[str, str | Callable]

get_version_map_value(inst, params[, ...])

rtype:

str | None

get_known_shifts(config_inst, params)

Returns two sets of shifts in a tuple: shifts implemented by _this_ task, and depdenent shifts implemented by upstream tasks.

get_array_function_kwargs([task])

rtype:

dict[str, Any]

get_calibrator_kwargs(*args, **kwargs)

rtype:

dict[str, Any]

get_selector_kwargs(*args, **kwargs)

rtype:

dict[str, Any]

get_producer_kwargs(*args, **kwargs)

rtype:

dict[str, Any]

get_weight_producer_kwargs(*args, **kwargs)

rtype:

dict[str, Any]

find_config_objects(names, container, object_cls)

Returns all names of objects of type object_cls known to a container (e.g.

resolve_config_default(task_params, param[, ...])

Resolves a given parameter value param, checks if it should be placed with a default value when empty, and in this case, does the actual default value resolution.

resolve_config_default_and_groups(...[, ...])

This method is similar to resolve_config_default() in that it checks if a parameter value param is empty and should be replaced with a default value.

store_parts()

Returns a law.util.InsertableDict whose values are used to create a store path.

local_path(*path[, store, fs])

Joins path fragments from store (defaulting to default_store), store_parts() and path and returns the joined path.

local_target(*path[, dir, store, fs])

Creates either a local file or directory target, depending on dir, forwarding all path fragments, store and fs to local_path() and all kwargs the respective target class.

wlcg_path(*path)

Joins path fragments from store_parts() and path and returns the joined path.

wlcg_target(*path[, dir, fs])

Creates either a remote WLCG file or directory target, depending on dir, forwarding all path fragments to wlcg_path() and all kwargs the respective target class.

target(*path[, location])

get_parquet_writer_opts([repeating_values])

Returns an option dictionary that can be passed as writer_opts to merge_parquet_task(), for instance, at the end of chunked processing steps that produce a single parquet file.

analysis = <luigi.parameter.Parameter object>#
version = <luigi.parameter.Parameter object>#
allow_empty_sandbox = True#
sandbox = None#
message_cache_size = 25#
local_workflow_require_branches = False#
output_collection_cls#

alias of SiblingFileCollection Methods:

from_directory(directory, **kwargs)

default_store = '$CF_STORE_LOCAL'#
default_wlcg_fs = 'wlcg_fs'#
default_output_location = 'config'#
classmethod modify_param_values(params)[source]#

Hook to modify command line arguments before instances of this class are created.

Return type:

dict

classmethod resolve_param_values(params)[source]#
Return type:

dict

classmethod get_analysis_inst(analysis)[source]#
Return type:

Analysis

classmethod req_params(inst, **kwargs)[source]#

Returns parameters that are jointly defined in this class and another task instance of some other class. The parameters are used when calling Task.req(self).

Return type:

dict

classmethod get_version_map(task)[source]#
Return type:

dict[str, str | Callable]

classmethod get_version_map_value(inst, params, version_map=None)[source]#
Return type:

str | None

classmethod get_known_shifts(config_inst, params)[source]#

Returns two sets of shifts in a tuple: shifts implemented by _this_ task, and depdenent shifts implemented by upstream tasks.

Return type:

tuple[set[str], set[str]]

classmethod get_array_function_kwargs(task=None, **params)[source]#
Return type:

dict[str, Any]

classmethod get_calibrator_kwargs(*args, **kwargs)[source]#
Return type:

dict[str, Any]

classmethod get_selector_kwargs(*args, **kwargs)[source]#
Return type:

dict[str, Any]

classmethod get_producer_kwargs(*args, **kwargs)[source]#
Return type:

dict[str, Any]

classmethod get_weight_producer_kwargs(*args, **kwargs)[source]#
Return type:

dict[str, Any]

classmethod find_config_objects(names, container, object_cls, object_groups=None, accept_patterns=True, deep=False)[source]#

Returns all names of objects of type object_cls known to a container (e.g. od.Analysis or od.Config) that match names. A name can also be a pattern to match if accept_patterns is True, or, when given, the key of a mapping object_group that matches group names to object names. When deep is True the lookup of objects in the container is recursive. Example:

find_config_objects(["st_tchannel_*"], config_inst, od.Dataset)
# -> ["st_tchannel_t", "st_tchannel_tbar"]
Return type:

list[str]

classmethod resolve_config_default(task_params, param, container='config_inst', default_str=None, multiple=False)[source]#

Resolves a given parameter value param, checks if it should be placed with a default value when empty, and in this case, does the actual default value resolution.

This resolution is triggered only in case param refers to RESOLVE_DEFAULT, a 1-tuple containing this attribute, or None, If so, the default is identified via the default_str from an order.AuxDataMixin container and points to an auxiliary that can be either a string or a function. In the latter case, it is called with the task class, the container instance, and all task parameters. Note that when no container is given, param is returned unchanged.

When multiple is True, a tuple is returned. If multiple is False and the resolved parameter is an iterable, the first entry is returned.

Example:

def resolve_param_values(params):
    params["producer"] = AnalysisTask.resolve_config_default(
        params,
        params.get("producer"),
        container=params["config_inst"]
        default_str="default_producer",
        multiple=True,
    )

config_inst = od.Config(
    id=0,
    name="my_config",
    aux={"default_producer": ["my_producer_1", "my_producer_2"]},
)

params = {
    "config_inst": config_inst,
    "producer": RESOLVE_DEFAULT,
}
resolve_param_values(params)  # sets params["producer"] to ("my_producer_1", "my_producer_2")

params = {
    "config_inst": config_inst,
    "producer": "some_other_producer",
}
resolve_param_values(params)  # sets params["producer"] to "some_other_producer"

Example where the default points to a function:

def resolve_param_values(params):
    params["ml_model"] = AnalysisTask.resolve_config_default(
        params,
        params.get("ml_model"),
        container=params["config_inst"]
        default_str="default_ml_model",
        multiple=True,
    )

# a function that chooses the ml_model based on an attibute that is set in an inference_model
def default_ml_model(task_cls, container, task_params):
    default_ml_model = None

    # check if task is using an inference model
    if "inference_model" in task_params.keys():
        inference_model = task_params.get("inference_model", None)

        # if inference model is not set, assume it's the container default
        if inference_model in (None, "NO_STR"):
            inference_model = container.x.default_inference_model

        # get the default_ml_model from the inference_model_inst
        inference_model_inst = columnflow.inference.InferenceModel._subclasses[inference_model]
        default_ml_model = getattr(inference_model_inst, "ml_model_name", default_ml_model)

        return default_ml_model

    return default_ml_model

config_inst = od.Config(
    id=0,
    name="my_config",
    aux={"default_ml_model": default_ml_model},
)

@inference_model(ml_model_name="default_ml_model")
def my_inference_model(self):
    # some inference model implementation
    ...

params = {"config_inst": config_inst, "ml_model": None, "inference_model": "my_inference_model"}
resolve_param_values(params)  # sets params["ml_model"] to "my_ml_model"

params = {"config_inst": config_inst, "ml_model": "some_ml_model", "inference_model": "my_inference_model"}
resolve_param_values(params)  # sets params["ml_model"] to "some_ml_model"
Return type:

str | tuple | Any | None

classmethod resolve_config_default_and_groups(task_params, param, container='config_inst', default_str=None, groups_str=None)[source]#

This method is similar to resolve_config_default() in that it checks if a parameter value param is empty and should be replaced with a default value. See the referenced method for documentation on task_params, param, container and default_str.

What this method does in addition is that it checks if the values contained in param (after default value resolution) refers to a group of values identified via the groups_str from the order.AuxDataMixin container that maps a string to a tuple of strings. If it does, each value in param that refers to a group is expanded by the actual group values.

Example:

config_inst = od.Config(
    id=0,
    name="my_config",
    aux={
        "default_producer": ["features_1", "my_producer_group"],
        "producer_groups": {"my_producer_group": ["features_2", "features_3"]},
    },
)

params = {"producer": RESOLVE_DEFAULT}

AnalysisTask.resolve_config_default_and_groups(
    params,
    params.get("producer"),
    container=config_inst,
    default_str="default_producer",
    groups_str="producer_groups",
)
# -> ("features_1", "features_2", "features_3")
Return type:

tuple[str]

store_parts()[source]#

Returns a law.util.InsertableDict whose values are used to create a store path. For instance, the parts {"keyA": "a", "keyB": "b", 2: "c"} lead to the path “a/b/c”. The keys can be used by subclassing tasks to overwrite values.

Return type:

InsertableDict

Returns:

Dictionary with parts to create a path to store intermediary results.

local_path(*path, store=None, fs=None)[source]#

Joins path fragments from store (defaulting to default_store), store_parts() and path and returns the joined path. In case a fs is defined, it should refer to the config section of a local file system, and consequently, store is not prepended to the returned path as the resolution of absolute paths is handled by that file system.

local_target(*path, dir=False, store=None, fs=None, **kwargs)[source]#

Creates either a local file or directory target, depending on dir, forwarding all path fragments, store and fs to local_path() and all kwargs the respective target class.

wlcg_path(*path)[source]#

Joins path fragments from store_parts() and path and returns the joined path.

The full URI to the target is not considered as it is usually defined in [wlcg_fs] sections in the law config and hence subject to wlcg_target().

wlcg_target(*path, dir=False, fs=default_wlcg_fs, **kwargs)[source]#

Creates either a remote WLCG file or directory target, depending on dir, forwarding all path fragments to wlcg_path() and all kwargs the respective target class. When None, fs defaults to the default_wlcg_fs class level attribute.

target(*path, location=None, **kwargs)[source]#
get_parquet_writer_opts(repeating_values=False)[source]#

Returns an option dictionary that can be passed as writer_opts to merge_parquet_task(), for instance, at the end of chunked processing steps that produce a single parquet file. See ParquetWriter for valid options.

This method can be overwritten in subclasses to customize the exact behavior.

Parameters:

repeating_values (bool, default: False) – Whether the values to be written have predominantly repeating values, in which case differnt compression and encoding strategies are followed.

Return type:

dict[str, Any]

Returns:

A dictionary with options that can be passed to parquet writer objects.

exclude_index = False#
exclude_params_index = {}#
exclude_params_repr = {}#
exclude_params_repr_empty = {}#
exclude_params_req = {}#
exclude_params_req_get = {}#
exclude_params_req_set = {}#
exclude_params_sandbox = {'log_file', 'sandbox'}#
class ConfigTask(*args, **kwargs)[source]#

Bases: AnalysisTask

Attributes:

config

exclude_index

exclude_params_index

exclude_params_repr

exclude_params_repr_empty

exclude_params_req

exclude_params_req_get

exclude_params_req_set

exclude_params_sandbox

Methods:

resolve_param_values(params)

rtype:

dict

get_version_map(task)

get_array_function_kwargs([task])

store_parts()

Returns a law.util.InsertableDict whose values are used to create a store path.

find_keep_columns(collection)

Returns a set of Route objects describing columns that should be kept given a type of column collection.

config = <luigi.parameter.Parameter object>#
classmethod resolve_param_values(params)[source]#
Return type:

dict

classmethod get_version_map(task)[source]#
classmethod get_array_function_kwargs(task=None, **params)[source]#
store_parts()[source]#

Returns a law.util.InsertableDict whose values are used to create a store path. For instance, the parts {"keyA": "a", "keyB": "b", 2: "c"} lead to the path “a/b/c”. The keys can be used by subclassing tasks to overwrite values.

Returns:

Dictionary with parts to create a path to store intermediary results.

find_keep_columns(collection)[source]#

Returns a set of Route objects describing columns that should be kept given a type of column collection.

Parameters:

collection (ColumnCollection) – The collection to return.

Return type:

set[Route]

Returns:

A set of Route objects.

exclude_index = False#
exclude_params_index = {}#
exclude_params_repr = {}#
exclude_params_repr_empty = {}#
exclude_params_req = {}#
exclude_params_req_get = {}#
exclude_params_req_set = {}#
exclude_params_sandbox = {'log_file', 'sandbox'}#
class ShiftTask(*args, **kwargs)[source]#

Bases: ConfigTask

Attributes:

shift

local_shift

exclude_params_index

exclude_params_req

exclude_params_sandbox

exclude_params_remote_workflow

allow_empty_shift

exclude_index

exclude_params_repr

exclude_params_repr_empty

exclude_params_req_get

exclude_params_req_set

Methods:

modify_param_values(params)

When "config" and "shift" are set, this method evaluates them to set the global shift.

resolve_param_values(params)

rtype:

dict

get_array_function_kwargs([task])

store_parts()

Returns a law.util.InsertableDict whose values are used to create a store path.

shift = <luigi.parameter.Parameter object>#
local_shift = <luigi.parameter.Parameter object>#
exclude_params_index = {'local_shift'}#
exclude_params_req = {'local_shift'}#
exclude_params_sandbox = {'local_shift', 'log_file', 'sandbox'}#
exclude_params_remote_workflow = {'local_shift'}#
allow_empty_shift = False#
classmethod modify_param_values(params)[source]#

When “config” and “shift” are set, this method evaluates them to set the global shift. For that, it takes the shifts stored in the config instance and compares it with those defined by this class.

classmethod resolve_param_values(params)[source]#
Return type:

dict

classmethod get_array_function_kwargs(task=None, **params)[source]#
store_parts()[source]#

Returns a law.util.InsertableDict whose values are used to create a store path. For instance, the parts {"keyA": "a", "keyB": "b", 2: "c"} lead to the path “a/b/c”. The keys can be used by subclassing tasks to overwrite values.

Returns:

Dictionary with parts to create a path to store intermediary results.

exclude_index = False#
exclude_params_repr = {}#
exclude_params_repr_empty = {}#
exclude_params_req_get = {}#
exclude_params_req_set = {}#
class DatasetTask(*args, **kwargs)[source]#

Bases: ShiftTask

Attributes:

dataset

file_merging

file_merging_factor

Returns the number of files that are handled in one branch.

exclude_index

exclude_params_index

exclude_params_remote_workflow

exclude_params_repr

exclude_params_repr_empty

exclude_params_req

exclude_params_req_get

exclude_params_req_set

exclude_params_sandbox

Methods:

resolve_param_values(params)

get_known_shifts(config_inst, params)

Returns two sets of shifts in a tuple: shifts implemented by _this_ task, and depdenent shifts implemented by upstream tasks.

get_array_function_kwargs([task])

store_parts()

Returns a law.util.InsertableDict whose values are used to create a store path.

create_branch_map()

Define the branch map for when this task is used as a workflow.

dataset = <luigi.parameter.Parameter object>#
file_merging = None#
classmethod resolve_param_values(params)[source]#
classmethod get_known_shifts(config_inst, params)[source]#

Returns two sets of shifts in a tuple: shifts implemented by _this_ task, and depdenent shifts implemented by upstream tasks.

Return type:

tuple[set[str], set[str]]

classmethod get_array_function_kwargs(task=None, **params)[source]#
store_parts()[source]#

Returns a law.util.InsertableDict whose values are used to create a store path. For instance, the parts {"keyA": "a", "keyB": "b", 2: "c"} lead to the path “a/b/c”. The keys can be used by subclassing tasks to overwrite values.

Returns:

Dictionary with parts to create a path to store intermediary results.

property file_merging_factor#

Returns the number of files that are handled in one branch. Consecutive merging steps are not handled yet.

create_branch_map()[source]#

Define the branch map for when this task is used as a workflow. By default, use the merging information provided by file_merging_factor to return a dictionary which maps branches to one or more input file indices. E.g. 1 -> [3, 4, 5] would mean that branch 1 is simultaneously handling input file indices 3, 4 and 5.

exclude_index = False#
exclude_params_index = {'local_shift'}#
exclude_params_remote_workflow = {'local_shift'}#
exclude_params_repr = {}#
exclude_params_repr_empty = {}#
exclude_params_req = {'local_shift'}#
exclude_params_req_get = {}#
exclude_params_req_set = {}#
exclude_params_sandbox = {'local_shift', 'log_file', 'sandbox'}#
class CommandTask(*args, **kwargs)[source]#

Bases: AnalysisTask

A task that provides convenience methods to work with shell commands, i.e., printing them on the command line and executing them with error handling.

Attributes:

print_command

custom_args

exclude_index

exclude_params_req

interactive_params

run_command_in_tmp

exclude_params_index

exclude_params_repr

exclude_params_repr_empty

exclude_params_req_get

exclude_params_req_set

exclude_params_sandbox

Methods:

build_command()

touch_output_dirs()

run_command(cmd[, optional])

run(**kwargs)

The task run method, to be overridden in a subclass.

pre_run_command()

post_run_command()

print_command = <law.parameter.CSVParameter object>#
custom_args = <luigi.parameter.Parameter object>#
exclude_index = True#
exclude_params_req = {'custom_args'}#
interactive_params = ['print_deps', 'print_status', 'print_output', 'fetch_output', 'remove_output', 'print_command']#
run_command_in_tmp = False#
build_command()[source]#
touch_output_dirs()[source]#
run_command(cmd, optional=False, **kwargs)[source]#
run(**kwargs)[source]#

The task run method, to be overridden in a subclass.

See Task.run

pre_run_command()[source]#
exclude_params_index = {}#
exclude_params_repr = {}#
exclude_params_repr_empty = {}#
exclude_params_req_get = {}#
exclude_params_req_set = {}#
exclude_params_sandbox = {'log_file', 'sandbox'}#
post_run_command()[source]#
wrapper_factory(base_cls, require_cls, enable, cls_name=None, attributes=None, docs=None)[source]#

Factory function creating wrapper task classes, inheriting from base_cls and WrapperTask, that do nothing but require multiple instances of require_cls. Unless cls_name is defined, the name of the created class defaults to the name of require_cls plus “Wrapper”. Additional attributes are added as class-level members when given.

The instances of require_cls to be required in the requires() method can be controlled by task parameters. These parameters can be enabled through the string sequence enable, which currently accepts:

  • configs, skip_configs

  • shifts, skip_shifts

  • datasets, skip_datasets

This allows to easily build wrapper tasks that loop over (combinations of) parameters that are either defined in the analysis or config, which would otherwise lead to mostly redundant code. Example:

class MyTask(DatasetTask):
    ...

MyTaskWrapper = wrapper_factory(
    base_cls=ConfigTask,
    require_cls=MyTask,
    enable=["datasets", "skip_datasets"],
)

# this allows to run (e.g.)
# law run MyTaskWrapper --datasets st_* --skip-datasets *_tbar

When building the requirements, the full combinatorics of parameters is considered. However, certain conditions apply depending on enabled features. For instance, in order to use the “configs” feature (adding a parameter “–configs” to the created class, allowing to loop over a list of config instances known to an analysis), require_cls must be at least a ConfigTask accepting “–config” (mind the singular form), whereas base_cls must explicitly not.

Parameters:
  • base_cls (law.task.base.Task) – Base class for this wrapper

  • require_cls (AnalysisTask) – Task class to be wrapped

  • enable (Sequence[str]) – Enable these parameters to control the wrapped Task class instance. Currently allowed parameters are: “configs”, “skip_configs”, “shifts”, “skip_shifts”, “datasets”, “skip_datasets”

  • cls_name (str | None, default: None) – Name of the wrapper instance. If None, defaults to the name of the WrapperTask class + “Wrapper”

  • attributes (dict | None, default: None) – Add these attributes as class-level members of the new WrapperTask class

  • docs (str | None, default: None) – Manually set the documentation string __doc__ of the new WrapperTask class instance

Raises:
  • ValueError – If a parameter provided with enable is not in the list of known parameters

  • TypeError – If any parameter in enable is incompatible with the WrapperTask class instance or the inheritance structure of corresponding classes

  • ValueError – when configs are enabled but not found in the analysis config instance

  • ValueError – when shifts are enabled but not found in the analysis config instance

  • ValueError – when datasets are enabled but not found in the analysis config instance

Return type:

law.task.base.Register

Returns:

The new WrapperTask for the Task class required_cls