base

Contents

base#

Generic tools and base tasks that are defined along typical objects in an analysis.

Classes:

Requirements(*others, **kwargs)

Container for task-level requirements of different tasks.

OutputLocation(value)

Output location flag.

TaskShifts([local, upstream])

Container for local and upstream shifts at a point in the task graph.

BaseTask(*args, **kwargs)

AnalysisTask(*args, **kwargs)

ConfigTask(*args, **kwargs)

ShiftTask(*args, **kwargs)

DatasetTask(*args, **kwargs)

CommandTask(*args, **kwargs)

A task that provides convenience methods to work with shell commands, i.e., printing them on the command line and executing them with error handling.

Functions:

wrapper_factory(base_cls, require_cls, enable)

Factory function creating wrapper task classes, inheriting from base_cls and WrapperTask, that do nothing but require multiple instances of require_cls.

class Requirements(*others, **kwargs)[source]#

Bases: DotDict

Container for task-level requirements of different tasks.

Can be initialized with other Requirement instances and additional keyword arguments kwargs, which are added.

class OutputLocation(value)[source]#

Bases: Enum

Output location flag.

Attributes:

config = 'config'#
local = 'local'#
wlcg = 'wlcg'#
wlcg_mirrored = 'wlcg_mirrored'#
class TaskShifts(local=<factory>, upstream=<factory>)[source]#

Bases: object

Container for local and upstream shifts at a point in the task graph.

Attributes:

local: set[str]#
upstream: set[str]#
class BaseTask(*args, **kwargs)[source]#

Bases: Task

Attributes:

Methods:

task_namespace = 'cf'#

This value can be overridden to set the namespace that will be used. (See Task.namespaces_famlies_and_ids) If it’s not specified and you try to read this value anyway, it will return garbage. Please use get_task_namespace() to read the namespace.

Note that setting this value with @property will not work, because this is a class level value.

reqs = {}#
get_params_dict()[source]#
Return type:

dict[str, Any]

exclude_index = False#
exclude_params_index = {}#
exclude_params_repr = {}#
exclude_params_repr_empty = {}#
exclude_params_req = {}#
exclude_params_req_get = {}#
exclude_params_req_set = {}#
class AnalysisTask(*args, **kwargs)[source]#

Bases: BaseTask, SandboxTask

Attributes:

Classes:

Methods:

modify_param_values(params)

Hook to modify command line arguments before instances of this class are created.

resolve_param_values(params)

get_analysis_inst(analysis)

req_params(inst, **kwargs)

Returns parameters that are jointly defined in this class and another task instance of some other class.

get_default_version(inst, params)

Determines the default version for instances of this task class when created through req() from another task inst given parameters params.

get_config_lookup_keys(inst_or_params)

Returns a dictionary with keys that can be used to lookup state specific values in a config or dictionary, such as default task versions or output locations.

get_array_function_dict(params)

find_config_objects(names, container, object_cls)

Returns all names of objects of type object_cls known to a container (e.g. od.Analysis or od.Config) that match names.

resolve_config_default(*, param, ...[, ...])

Resolves a given parameter value param, checks if it should be placed with a default value when empty, and in this case, does the actual default value resolution.

resolve_config_default_and_groups(*, param, ...)

This method is similar to resolve_config_default() in that it checks if a parameter value param is empty and should be replaced with a default value.

build_repr(objects, *[, sep, prepend_count, ...])

Generic method to construct a string representation given a single or a sequece of objects.

cached_value(key, func)

Upon first invocation, the function func is called and its return value is stored under key in _cached_values.

reset_sandbox(sandbox)

Resets the sandbox to a new sandbox value.

store_parts()

Returns a law.util.InsertableDict whose values are used to create a store path.

local_path(*path[, store_parts_modifier])

Joins path fragments from store (defaulting to default_store), store_parts() and path and returns the joined path.

local_target(*path[, store_parts_modifier])

Creates either a local file or directory target, depending on dir, forwarding all path fragments, store and fs to local_path() and all kwargs the respective target class.

wlcg_path(*path[, store_parts_modifier])

Joins path fragments from store_parts() and path and returns the joined path.

wlcg_target(*path[, store_parts_modifier])

Creates either a remote WLCG file or directory target, depending on dir, forwarding all path fragments to wlcg_path() and all kwargs the respective target class.

target(*path, **kwargs)

get_parquet_writer_opts([repeating_values])

Returns an option dictionary that can be passed as writer_opts to merge_parquet_task(), for instance, at the end of chunked processing steps that produce a single parquet file.

analysis = <luigi.parameter.Parameter object>#
version = <luigi.parameter.Parameter object>#
notify_slack = <law.contrib.slack.parameter.NotifySlackParameter object>#
notify_mattermost = <law.contrib.mattermost.parameter.NotifyMattermostParameter object>#
notify_custom = <law.parameter.NotifyCustomParameter object>#
allow_empty_sandbox = True#
sandbox = None#
message_cache_size = 25#
local_workflow_require_branches = False#
output_collection_cls#

alias of SiblingFileCollection

default_store = '$CF_STORE_LOCAL'#
default_wlcg_fs = 'wlcg_fs'#
default_output_location = 'config'#
exclude_params_index = {'user'}#
exclude_params_req = {'notify_custom', 'notify_mattermost', 'notify_slack', 'user'}#
exclude_params_repr = {'notify_custom', 'notify_mattermost', 'notify_slack', 'user'}#
exclude_params_branch = {'user'}#
exclude_params_workflow = {'notify_custom', 'notify_mattermost', 'notify_slack', 'user'}#
classmethod modify_param_values(params)[source]#

Hook to modify command line arguments before instances of this class are created.

Return type:

dict[str, Any]

classmethod resolve_param_values(params)[source]#
Return type:

dict[str, Any]

classmethod get_analysis_inst(analysis)[source]#
Return type:

Analysis

classmethod req_params(inst, **kwargs)[source]#

Returns parameters that are jointly defined in this class and another task instance of some other class. The parameters are used when calling Task.req(self).

Return type:

dict[str, Any]

classmethod get_default_version(inst, params)[source]#

Determines the default version for instances of this task class when created through req() from another task inst given parameters params.

Parameters:
  • inst (AnalysisTask) – The task instance from which this task should be created via req().

  • params (dict[str, Any]) – The parameters that are passed to the task instance.

Return type:

str | None

Returns:

The default version, or None if no default version can be defined.

classmethod get_config_lookup_keys(inst_or_params)[source]#

Returns a dictionary with keys that can be used to lookup state specific values in a config or dictionary, such as default task versions or output locations.

Parameters:

inst_or_params (AnalysisTask | dict[str, Any]) – The tasks instance or its parameters.

Return type:

law.util.InsertiableDict

Returns:

A dictionary with keys that can be used for nested lookup.

classmethod get_array_function_dict(params)[source]#
Return type:

dict[str, Any]

classmethod find_config_objects(names, container, object_cls, groups_str=None, accept_patterns=True, deep=False, strict=False, multi_strategy='first')[source]#

Returns all names of objects of type object_cls known to a container (e.g. od.Analysis or od.Config) that match names. A name can also be a pattern to match if accept_patterns is True, or, when given, the key of a mapping named group_str in the container auxiliary data that matches group names to object names.

When deep is True the lookup of objects in the container is recursive. When strict is True, an error is raised if no matches are found for any of the names.

container can also refer to a sequence of container objects. If this is the case, the default object retrieval is performed for all of them and the resulting values can be handled with five different strategies, controlled via multi_strategy: :rtype: list[str] | dict[od.UniqueObject, list[str]]

  • "first": The first resolved name is returned.

  • "same": The resolved names are forced to be identical and an exception is raised if they differ. The

    first resolved value is returned.

  • "union": The set union of all resolved names is returned in a list.

  • "intersection": The set intersection of all resolved names is returned in a list.

  • "all": The resolved values are returned in a dictionary mapped to their respective container.

Example:

find_config_objects(names=["st_tchannel_*"], container=config_inst, object_cls=od.Dataset)
# -> ["st_tchannel_t", "st_tchannel_tbar"]
classmethod resolve_config_default(*, param, task_params, container, default_str=None, multi_strategy='first')[source]#

Resolves a given parameter value param, checks if it should be placed with a default value when empty, and in this case, does the actual default value resolution.

This resolution is triggered only in case param refers to RESOLVE_DEFAULT, a 1-tuple containing this attribute, or None. If so, the default is identified via the default_str from an order.AuxDataMixin container and points to an auxiliary that can be either a string or a function. In the latter case, it is called with the task class, the container instance, and all task parameters. Note that when no container is given, param is returned unchanged.

container can also refer to a sequence of order.AuxDataMixin objects. If this is the case, the default resolution is performed for all of them and the resulting values can be handled with five different strategies, controlled via multi_strategy: :rtype: Any | list[Any] | dict[od.AuxDataMixin, Any]

  • "first": The first resolved value is returned.

  • "same": The resolved values are forced to be identical and an exception is raised if they differ. The

    first resolved value is returned.

  • "union": The set union of all resolved values is returned in a list.

  • "intersection": The set intersection of all resolved values is returned in a list.

  • "all": The resolved values are returned in a dictionary mapped to their respective container.

Example:

# assuming this is your config
config_inst = od.Config(
    id=1,
    name="my_config",
    aux={
        "default_selector": "my_selector",
    },
)

# and these are the task parameters
params = {
    "config_inst": config_inst,
}

AnalysisTask.resolve_config_default(
    param=RESOLVE_DEFAULT,
    task_params=params,
    container=config_inst,  # <-- same as passing the "config_inst" key of params
    default_str="default_selector",
)
# -> "my_selector"

Example where the default points to a function:

 def default_selector(task_cls, config_inst, task_params) -> str:
     # determine the selector based on dynamic conditions
     return "my_other_selector

 config_inst = od.Config(
     id=1,
     name="my_config",
     aux={
         "default_selector": default_selector,  # <-- function
     },
 )

AnalysisTask.resolve_config_default(
     param=RESOLVE_DEFAULT,
     task_params=params,
     container=config_inst,
     default_str="default_selector",
 )
 # -> "my_other_selector"
classmethod resolve_config_default_and_groups(*, param, task_params, container, groups_str, default_str=None, multi_strategy='first', debug=False)[source]#

This method is similar to resolve_config_default() in that it checks if a parameter value param is empty and should be replaced with a default value. All arguments except for groups_str are forwarded to this method.

What this method does in addition is that it checks if the values contained in param (after default value resolution) refers to a group of values identified via the groups_str from the order.AuxDataMixin container that maps a string to a tuple of strings. If it does, each value in param that refers to a group is expanded by the actual group values.

Example:

# assuming this is your config
config_inst = od.Config(
    id=1,
    name="my_config",
    aux={
        "default_producer": "my_producers",
        "producer_groups": {
            "my_producers": ["producer_1", "producer_2"],
            "my_other_producers": ["my_producers", "producer_3", "producer_4"],
        },
    },
)

# and these are the task parameters
params = {
    "config_inst": config_inst,
}

AnalysisTask.resolve_config_default_and_groups(
    param=RESOLVE_DEFAULT,
    task_params=params,
    container=config_inst,  # <-- same as passing the "config_inst" key of params
    default_str="default_producer",
    groups_str="producer_groups",
)
# -> ["producer_1", "producer_2"]

Example showing recursive group expansion:

# assuming config_inst and params are the same as above

AnalysisTask.resolve_config_default_and_groups(
    param="my_other_producers",  # <-- points to a group that contains another group
    task_params=params,
    container=config_inst,
    default_str="default_producer",  # <-- not used as param is set explicitly
    groups_str="producer_groups",
)
# -> ["producer_1", "producer_2", "producer_3", "producer_4"]
Return type:

Any | list[Any] | dict[od.AuxDataMixin, Any]

classmethod build_repr(objects, *, sep='__', prepend_count=False, max_len=-1, max_count=3, hash_len=10)[source]#

Generic method to construct a string representation given a single or a sequece of objects.

Parameters:
  • objects (Any | Sequence[Any]) – The object or objects to be represented.

  • sep (str, default: '__') – The separator used to join the objects.

  • prepend_count (bool, default: False) – When True, the number of objects is prepended to the string, followed by sep.

  • max_len (int, default: -1) – The maximum length of the string. If exceeded, the string is truncated and hashed.

  • max_count (int, default: 3) – The maximum number of objects to include in the string. Additional objects are hashed, but only if the resulting representation length does not exceed max_len. If so, the overall truncation and hashing is applied instead.

  • hash_len (int, default: 10) – The length of the hash that is appended to the string when it is truncated.

Return type:

str

Returns:

The string representation.

cached_value(key, func)[source]#

Upon first invocation, the function func is called and its return value is stored under key in _cached_values. Subsequent calls with the same key return the cached value.

Parameters:
  • key (str) – The key under which the value is stored.

  • func (Callable[[], TypeVar(T)]) – The function that is called to generate the value.

Return type:

TypeVar(T)

Returns:

The cached value.

reset_sandbox(sandbox)[source]#

Resets the sandbox to a new sandbox value.

Return type:

None

store_parts()[source]#

Returns a law.util.InsertableDict whose values are used to create a store path. For instance, the parts {"keyA": "a", "keyB": "b", 2: "c"} lead to the path “a/b/c”. The keys can be used by subclassing tasks to overwrite values.

Return type:

InsertableDict

Returns:

Dictionary with parts that will be translated into an output directory path.

local_path(*path, store_parts_modifier=None, **kwargs)[source]#

Joins path fragments from store (defaulting to default_store), store_parts() and path and returns the joined path. In case a fs is defined, it should refer to the config section of a local file system, and consequently, store is not prepended to the returned path as the resolution of absolute paths is handled by that file system.

Return type:

str

local_target(*path, store_parts_modifier=None, **kwargs)[source]#

Creates either a local file or directory target, depending on dir, forwarding all path fragments, store and fs to local_path() and all kwargs the respective target class.

Return type:

law.LocalTarget

wlcg_path(*path, store_parts_modifier=None)[source]#

Joins path fragments from store_parts() and path and returns the joined path.

The full URI to the target is not considered as it is usually defined in [wlcg_fs] sections in the law config and hence subject to wlcg_target().

Return type:

str

wlcg_target(*path, store_parts_modifier=None, **kwargs)[source]#

Creates either a remote WLCG file or directory target, depending on dir, forwarding all path fragments to wlcg_path() and all kwargs the respective target class. When None, fs defaults to the default_wlcg_fs class level attribute.

Return type:

law.wclg.WLCGTarget

target(*path, **kwargs)[source]#
Return type:

law.LocalTarget | law.wlcg.WLCGTarget | law.MirroredTarget

get_parquet_writer_opts(repeating_values=False)[source]#

Returns an option dictionary that can be passed as writer_opts to merge_parquet_task(), for instance, at the end of chunked processing steps that produce a single parquet file. See ParquetWriter for valid options.

This method can be overwritten in subclasses to customize the exact behavior.

Parameters:

repeating_values (bool, default: False) – Whether the values to be written have predominantly repeating values, in which case differnt compression and encoding strategies are followed.

Return type:

dict[str, Any]

Returns:

A dictionary with options that can be passed to parquet writer objects.

exclude_index = False#
exclude_params_repr_empty = {}#
exclude_params_req_get = {}#
exclude_params_req_set = {}#
exclude_params_sandbox = {'log_file', 'sandbox'}#
class ConfigTask(*args, **kwargs)[source]#

Bases: AnalysisTask

Attributes:

Methods:

modify_task_attributes()

Hook that is called by law's task register meta class right after subclass creation to update class-level attributes.

has_single_config()

Returns whether the class is configured to use a single config.

ensure_single_config(value, *[, attr])

Ensures that the single_config flag of this task is set to value by raising an exception if it is not.

config_mode()

Returns a string representation of this task's config mode.

resolve_param_values(params)

resolve_instances(params, shifts)

Build the array function instances.

resolve_param_values_pre_init(params)

Resolve parameters before the array function instances have been initialized.

resolve_param_values_post_init(params)

Resolve parameters after the array function instances have been initialized.

resolve_shifts(params)

Resolve shifts

get_known_shifts(params, shifts)

Adjusts the local and upstream fields of the shifts object to include shifts implemented by _this_ task, and dependent shifts that are implemented by upstream tasks.

req_params(inst, *args, **kwargs)

Returns parameters that are jointly defined in this class and another task instance of some other class.

broadcast_to_configs(value, name, n_config_insts)

get_config_lookup_keys(inst_or_params)

Returns a dictionary with keys that can be used to lookup state specific values in a config or dictionary, such as default task versions or output locations.

get_array_function_dict(params)

store_parts()

Returns a law.util.InsertableDict whose values are used to create a store path.

find_keep_columns(collection)

Returns a set of Route objects describing columns that should be kept given a type of column collection.

config = <luigi.parameter.Parameter object>#
configs = <law.parameter.CSVParameter object>#
known_shifts = <luigi.parameter.Parameter object>#
exclude_params_req = {'known_shifts', 'notify_custom', 'notify_mattermost', 'notify_slack', 'user'}#
exclude_params_sandbox = {'known_shifts', 'log_file', 'sandbox'}#
exclude_params_remote_workflow = {'known_shifts'}#
exclude_params_index = {'known_shifts', 'user'}#
exclude_params_repr = {'known_shifts', 'notify_custom', 'notify_mattermost', 'notify_slack', 'user'}#
config_store_anchor = 'config'#
classmethod modify_task_attributes()[source]#

Hook that is called by law’s task register meta class right after subclass creation to update class-level attributes.

Return type:

None

abstract property single_config: bool#
classmethod has_single_config()[source]#

Returns whether the class is configured to use a single config.

Raises:

AttributeError – When the class does not specify the single_config attribute.

Return type:

bool

Returns:

True if the class uses a single config, False otherwise.

classmethod ensure_single_config(value, *, attr=None)[source]#

Ensures that the single_config flag of this task is set to value by raising an exception if it is not. This method is typically used to guard the access to attributes. If so, attr is used in the exception message to reflect this.

Parameters:
  • value (bool) – The value to compare the flag with.

  • attr (str | None, default: None) – The attribute that triggered the check.

Return type:

None

classmethod config_mode()[source]#

Returns a string representation of this task’s config mode.

Return type:

str

Returns:

“single” if the task has a single config, “multi” otherwise.

classmethod resolve_param_values(params)[source]#
Return type:

dict[str, Any]

classmethod resolve_instances(params, shifts)[source]#

Build the array function instances. For single-config/dataset tasks, resolve_instances is implemented by mixin classes such as the ProducersMixin. For multi-config tasks, resolve_instances from the upstream task is called for each config instance. If the resolve_instances function needs to be called for other combinations of parameters (e.g. per dataset), it can be overwritten by the task class.

Parameters:
  • params (dict[str, Any]) – Dictionary of task parameters.

  • shifts (TaskShifts) – Collection of local and global shifts.

Return type:

dict[str, Any]

Returns:

Updated dictionary of task parameters.

classmethod resolve_param_values_pre_init(params)[source]#

Resolve parameters before the array function instances have been initialized.

Parameters:

params (dict[str, Any]) – Dictionary of task parameters.

Return type:

dict[str, Any]

Returns:

Updated dictionary of task parameters.

classmethod resolve_param_values_post_init(params)[source]#

Resolve parameters after the array function instances have been initialized.

Parameters:

params (dict[str, Any]) – Dictionary of task parameters.

Return type:

dict[str, Any]

Returns:

Updated dictionary of task parameters.

classmethod resolve_shifts(params)[source]#

Resolve shifts

Parameters:

params (dict[str, Any]) – Dictionary of task parameters.

Return type:

dict[str, Any]

Returns:

Updated dictionary of task parameters.

classmethod get_known_shifts(params, shifts)[source]#

Adjusts the local and upstream fields of the shifts object to include shifts implemented by _this_ task, and dependent shifts that are implemented by upstream tasks.

Parameters:
  • params (dict[str, Any]) – Dictionary of task parameters.

  • shifts (TaskShifts) – TaskShifts object to adjust.

Return type:

None

resolution_task_cls = None#
classmethod req_params(inst, *args, **kwargs)[source]#

Returns parameters that are jointly defined in this class and another task instance of some other class. The parameters are used when calling Task.req(self).

Return type:

dict[str, Any]

classmethod broadcast_to_configs(value, name, n_config_insts)[source]#
Return type:

tuple[Any]

classmethod get_config_lookup_keys(inst_or_params)[source]#

Returns a dictionary with keys that can be used to lookup state specific values in a config or dictionary, such as default task versions or output locations.

Parameters:

inst_or_params (ConfigTask | dict[str, Any]) – The tasks instance or its parameters.

Return type:

law.util.InsertiableDict

Returns:

A dictionary with keys that can be used for nested lookup.

classmethod get_array_function_dict(params)[source]#
Return type:

dict[str, Any]

property config_repr: str#
store_parts()[source]#

Returns a law.util.InsertableDict whose values are used to create a store path. For instance, the parts {"keyA": "a", "keyB": "b", 2: "c"} lead to the path “a/b/c”. The keys can be used by subclassing tasks to overwrite values.

Return type:

InsertableDict

Returns:

Dictionary with parts that will be translated into an output directory path.

find_keep_columns(collection)[source]#

Returns a set of Route objects describing columns that should be kept given a type of column collection.

Parameters:

collection (ColumnCollection) – The collection to return.

Return type:

set[Route]

Returns:

A set of Route objects.

exclude_index = False#
exclude_params_branch = {'user'}#
exclude_params_repr_empty = {}#
exclude_params_req_get = {}#
exclude_params_req_set = {}#
exclude_params_workflow = {'notify_custom', 'notify_mattermost', 'notify_slack', 'user'}#
class ShiftTask(*args, **kwargs)[source]#

Bases: ConfigTask

Attributes:

Methods:

resolve_shifts(params)

Resolve shifts

get_config_lookup_keys(inst_or_params)

Returns a dictionary with keys that can be used to lookup state specific values in a config or dictionary, such as default task versions or output locations.

store_parts()

Returns a law.util.InsertableDict whose values are used to create a store path.

shift = <luigi.parameter.Parameter object>#
local_shift = <luigi.parameter.Parameter object>#
exclude_params_index = {'known_shifts', 'local_shift', 'user'}#
exclude_params_req = {'known_shifts', 'local_shift', 'notify_custom', 'notify_mattermost', 'notify_slack', 'user'}#
exclude_params_sandbox = {'known_shifts', 'local_shift', 'log_file', 'sandbox'}#
exclude_params_remote_workflow = {'known_shifts', 'local_shift'}#
allow_empty_shift = False#
classmethod resolve_shifts(params)[source]#

Resolve shifts

Parameters:

params (dict[str, Any]) – Dictionary of task parameters.

Return type:

dict[str, Any]

Returns:

Updated dictionary of task parameters.

classmethod get_config_lookup_keys(inst_or_params)[source]#

Returns a dictionary with keys that can be used to lookup state specific values in a config or dictionary, such as default task versions or output locations.

Parameters:

inst_or_params (ShiftTask | dict[str, Any]) – The tasks instance or its parameters.

Return type:

law.util.InsertiableDict

Returns:

A dictionary with keys that can be used for nested lookup.

store_parts()[source]#

Returns a law.util.InsertableDict whose values are used to create a store path. For instance, the parts {"keyA": "a", "keyB": "b", 2: "c"} lead to the path “a/b/c”. The keys can be used by subclassing tasks to overwrite values.

Return type:

InsertableDict

Returns:

Dictionary with parts that will be translated into an output directory path.

exclude_index = False#
exclude_params_branch = {'user'}#
exclude_params_repr = {'known_shifts', 'notify_custom', 'notify_mattermost', 'notify_slack', 'user'}#
exclude_params_repr_empty = {}#
exclude_params_req_get = {}#
exclude_params_req_set = {}#
exclude_params_workflow = {'notify_custom', 'notify_mattermost', 'notify_slack', 'user'}#
class DatasetTask(*args, **kwargs)[source]#

Bases: ShiftTask

Attributes:

Methods:

resolve_param_values_pre_init(params)

Resolve parameters before the array function instances have been initialized.

get_known_shifts(params, shifts)

Adjusts the local and upstream fields of the shifts object to include shifts implemented by _this_ task, and dependent shifts that are implemented by upstream tasks.

get_config_lookup_keys(inst_or_params)

Returns a dictionary with keys that can be used to lookup state specific values in a config or dictionary, such as default task versions or output locations.

get_array_function_dict(params)

store_parts()

Returns a law.util.InsertableDict whose values are used to create a store path.

create_branch_map()

Define the branch map for when this task is used as a workflow.

single_config = True#
dataset = <luigi.parameter.Parameter object>#
file_merging = None#
classmethod resolve_param_values_pre_init(params)[source]#

Resolve parameters before the array function instances have been initialized.

Parameters:

params (dict[str, Any]) – Dictionary of task parameters.

Return type:

dict[str, Any]

Returns:

Updated dictionary of task parameters.

classmethod get_known_shifts(params, shifts)[source]#

Adjusts the local and upstream fields of the shifts object to include shifts implemented by _this_ task, and dependent shifts that are implemented by upstream tasks.

Parameters:
  • params (dict[str, Any]) – Dictionary of task parameters.

  • shifts (TaskShifts) – TaskShifts object to adjust.

Return type:

None

classmethod get_config_lookup_keys(inst_or_params)[source]#

Returns a dictionary with keys that can be used to lookup state specific values in a config or dictionary, such as default task versions or output locations.

Parameters:

inst_or_params (DatasetTask | dict[str, Any]) – The tasks instance or its parameters.

Return type:

law.util.InsertiableDict

Returns:

A dictionary with keys that can be used for nested lookup.

classmethod get_array_function_dict(params)[source]#
Return type:

dict[str, Any]

store_parts()[source]#

Returns a law.util.InsertableDict whose values are used to create a store path. For instance, the parts {"keyA": "a", "keyB": "b", 2: "c"} lead to the path “a/b/c”. The keys can be used by subclassing tasks to overwrite values.

Return type:

InsertableDict

Returns:

Dictionary with parts that will be translated into an output directory path.

property file_merging_factor: int#

Returns the number of files that are handled in one branch. When the file_merging attribute is set to a positive integer, this value is returned. Otherwise, if the value is zero, the original number of files is used instead.

Consecutive merging steps are not handled yet.

create_branch_map()[source]#

Define the branch map for when this task is used as a workflow. By default, use the merging information provided by file_merging_factor to return a dictionary which maps branches to one or more input file indices. E.g. 1 -> [3, 4, 5] would mean that branch 1 is simultaneously handling input file indices 3, 4 and 5.

configs = None#
exclude_index = False#
exclude_params_branch = {'user'}#
exclude_params_index = {'known_shifts', 'local_shift', 'user'}#
exclude_params_remote_workflow = {'known_shifts', 'local_shift'}#
exclude_params_repr = {'known_shifts', 'notify_custom', 'notify_mattermost', 'notify_slack', 'user'}#
exclude_params_repr_empty = {}#
exclude_params_req = {'known_shifts', 'local_shift', 'notify_custom', 'notify_mattermost', 'notify_slack', 'user'}#
exclude_params_req_get = {}#
exclude_params_req_set = {}#
exclude_params_sandbox = {'known_shifts', 'local_shift', 'log_file', 'sandbox'}#
exclude_params_workflow = {'notify_custom', 'notify_mattermost', 'notify_slack', 'user'}#
class CommandTask(*args, **kwargs)[source]#

Bases: AnalysisTask

A task that provides convenience methods to work with shell commands, i.e., printing them on the command line and executing them with error handling.

Attributes:

Methods:

build_command()

touch_output_dirs()

run_command(cmd[, optional])

run(**kwargs)

The task run method, to be overridden in a subclass.

pre_run_command()

post_run_command()

print_command = <law.parameter.CSVParameter object>#
custom_args = <luigi.parameter.Parameter object>#
exclude_index = True#
exclude_params_req = {'custom_args', 'notify_custom', 'notify_mattermost', 'notify_slack', 'user'}#
interactive_params = ['print_deps', 'print_status', 'print_output', 'fetch_output', 'remove_output', 'print_command']#
run_command_in_tmp = False#
build_command()[source]#
Return type:

str | list[str]

touch_output_dirs()[source]#
Return type:

None

run_command(cmd, optional=False, **kwargs)[source]#
Return type:

subprocess.Popen

exclude_params_branch = {'user'}#
exclude_params_index = {'user'}#
exclude_params_repr = {'notify_custom', 'notify_mattermost', 'notify_slack', 'user'}#
exclude_params_repr_empty = {}#
exclude_params_req_get = {}#
exclude_params_req_set = {}#
exclude_params_sandbox = {'log_file', 'sandbox'}#
exclude_params_workflow = {'notify_custom', 'notify_mattermost', 'notify_slack', 'user'}#
run(**kwargs)[source]#

The task run method, to be overridden in a subclass.

See Task.run

pre_run_command()[source]#
Return type:

None

post_run_command()[source]#
Return type:

None

wrapper_factory(base_cls, require_cls, enable, cls_name=None, attributes=None, docs=None)[source]#

Factory function creating wrapper task classes, inheriting from base_cls and WrapperTask, that do nothing but require multiple instances of require_cls. Unless cls_name is defined, the name of the created class defaults to the name of require_cls plus “Wrapper”. Additional attributes are added as class-level members when given.

The instances of require_cls to be required in the requires() method can be controlled by task parameters. These parameters can be enabled through the string sequence enable, which currently accepts:

  • configs, skip_configs

  • shifts, skip_shifts

  • datasets, skip_datasets

This allows to easily build wrapper tasks that loop over (combinations of) parameters that are either defined in the analysis or config, which would otherwise lead to mostly redundant code.

Example:

class MyTask(DatasetTask):
    ...

MyTaskWrapper = wrapper_factory(
    base_cls=ConfigTask,
    require_cls=MyTask,
    enable=["datasets", "skip_datasets"],
)

# this allows to run (e.g.)
# law run MyTaskWrapper --datasets st_* --skip-datasets *_tbar

When building the requirements, the full combinatorics of parameters is considered. However, certain conditions apply depending on enabled features. For instance, in order to use the “configs” feature (adding a parameter “–configs” to the created class, allowing to loop over a list of config instances known to an analysis), require_cls must be at least a ConfigTask accepting “–config” (mind the singular form), whereas base_cls must explicitly not.

Parameters:
  • base_cls (law.task.base.Task) – Base class for this wrapper

  • require_cls (AnalysisTask) – Task class to be wrapped

  • enable (Sequence[str]) – Enable these parameters to control the wrapped Task class instance. Currently allowed parameters are: “configs”, “skip_configs”, “shifts”, “skip_shifts”, “datasets”, “skip_datasets”

  • cls_name (str | None, default: None) – Name of the wrapper instance. If None, defaults to the name of the WrapperTask class + “Wrapper”

  • attributes (dict | None, default: None) – Add these attributes as class-level members of the new WrapperTask class

  • docs (str | None, default: None) – Manually set the documentation string __doc__ of the new WrapperTask class instance

Raises:
  • ValueError – If a parameter provided with enable is not in the list of known parameters

  • TypeError – If any parameter in enable is incompatible with the WrapperTask class instance or the inheritance structure of corresponding classes

  • ValueError – when configs are enabled but not found in the analysis config instance

  • ValueError – when shifts are enabled but not found in the analysis config instance

  • ValueError – when datasets are enabled but not found in the analysis config instance

Return type:

law.task.base.Register

Returns:

The new WrapperTask for the Task class required_cls