external

Contents

external#

Tasks dealing with external data.

Classes:

GetDatasetLFNs(*args, **kwargs)

Task to get list of logical file names (LFNs).

GetDatasetLFNsWrapper(*args, **kwargs)

ExternalFile(location[, subpaths, version])

Container object to define an external file resource that is understood by (e.g.) tasks.external.BundleExternalFiles.

BundleExternalFiles(*args, **kwargs)

Task to collect external files.

BundleExternalFilesWrapper(*args, **kwargs)

class GetDatasetLFNs(*args, **kwargs)[source]#

Bases: DatasetTask, TransferLocalFile

Task to get list of logical file names (LFNs).

Attributes:

Methods:

resolve_param_values(params)

Resolve parameter values params from command line and propagate them to this set of parameters.

single_output()

Creates a remote target file for the final .json file containing the list of LFNs.

run()

Run function for this task.

get_dataset_lfns_dasgoclient(dataset_inst, ...)

Get the LNF information with the dasgoclient.

iter_nano_files(task[, fs, lfn_indices, ...])

Generator function that reduces the boilerplate code for looping over files referred to by lfn_indices given the lfns obtained by this task which needs to be complete for this function to succeed.

replicas = <luigi.parameter.IntParameter object>#
validate = <law.parameter.OptionalBoolParameter object>#
version = None#

Version parameter - deactivated for GetDatasetLFNs

classmethod resolve_param_values(params)[source]#

Resolve parameter values params from command line and propagate them to this set of parameters.

Parameters:

params (DotDict) – Parameters provided at command line level.

Return type:

DotDict

Returns:

Updated list of parameter values.

property sandbox: str#

Defines sandbox for this task.

Returns:

Path to shell script that sets up the requested sandbox.

single_output()[source]#

Creates a remote target file for the final .json file containing the list of LFNs.

Return type:

FileSystemFileTarget

Returns:

Law remote target with the initialized output name

run()[source]#

Run function for this task.

Raises:

ValueError – If number of loaded LFNs does not correspond to number of LFNs specified in this dataset_info_inst.

get_dataset_lfns_dasgoclient(dataset_inst, shift_inst, dataset_key)[source]#

Get the LNF information with the dasgoclient.

Parameters:
  • dataset_inst (Dataset) – Current dataset instance, currently not used.

  • shift_inst (Shift) – Current shift instance, currently not used.

  • dataset_key (str) – DAS key identifier for the current dataset.

Raises:

Exception – If query with dasgoclient fails.

Return type:

list[str]

Returns:

The list of LFNs corresponding to the dataset with the identifier dataset_key.

iter_nano_files(task, fs=None, lfn_indices=None, eager_lookup=1, skip_fallback=False)[source]#

Generator function that reduces the boilerplate code for looping over files referred to by lfn_indices given the lfns obtained by this task which needs to be complete for this function to succeed.

When lfn_indices are not given, task must be a branch of a DatasetTask workflow whose branch value is used instead.

Parameters:
  • task (AnalysisTask | DatasetTask) – Current task that needs to access the nanoAOD files

  • fs (str | Sequence[str] | None, default: None) – Name of the local or remote file system where the LFNs are located, defaults to None

  • lfn_indices (list[int] | None, default: None) – List of indices of LFNs that are processed by this task instance, defaults to None

  • eager_lookup (bool | int, default: 1) – Look at the next fs if stat takes too long, defaults to 1

  • skip_fallback (bool, default: False) – Skip the fallback mechanism to fetch the LFN, defaults to False

Raises:
  • TypeError – If task is not of type BaseWorkflow or not a task analyzing a single branch in the task tree

  • Exception – If current task is not complete as indicated with self.complete()

  • ValueError – If no fs is provided at call and none can be found in either the config instance or the law config.

  • Exception – If a given LFN cannot be found at any fs

Yield:

a file target that points to a LFN

Return type:

None

configs = None#
exclude_index = False#
exclude_params_branch = {'user'}#
exclude_params_index = {'known_shifts', 'local_shift', 'user'}#
exclude_params_remote_workflow = {'known_shifts', 'local_shift'}#
exclude_params_repr = {'known_shifts', 'notify_custom', 'notify_mattermost', 'notify_slack', 'user'}#
exclude_params_repr_empty = {'source_path'}#
exclude_params_req = {'known_shifts', 'local_shift', 'notify_custom', 'notify_mattermost', 'notify_slack', 'user'}#
exclude_params_req_get = {}#
exclude_params_req_set = {}#
exclude_params_sandbox = {'known_shifts', 'local_shift', 'log_file', 'sandbox'}#
exclude_params_workflow = {'notify_custom', 'notify_mattermost', 'notify_slack', 'user'}#
class GetDatasetLFNsWrapper(*args, **kwargs)#

Bases: AnalysisTask, WrapperTask

Attributes:

Methods:

requires()

The Tasks that this Task depends on.

update_wrapper_params(params)

configs = <law.parameter.CSVParameter object>#
datasets = <law.parameter.CSVParameter object>#
exclude_index = False#
exclude_params_branch = {'user'}#
exclude_params_index = {'user'}#
exclude_params_repr = {'notify_custom', 'notify_mattermost', 'notify_slack', 'user'}#
exclude_params_repr_empty = {'skip_configs', 'skip_datasets', 'skip_shifts'}#
exclude_params_req = {'notify_custom', 'notify_mattermost', 'notify_slack', 'user'}#
exclude_params_req_get = {}#
exclude_params_req_set = {'configs', 'datasets', 'shifts', 'skip_configs', 'skip_datasets', 'skip_shifts', 'wrap_once'}#
exclude_params_sandbox = {'log_file', 'sandbox'}#
exclude_params_workflow = {'notify_custom', 'notify_mattermost', 'notify_slack', 'user'}#
replicas = <luigi.parameter.IntParameter object>#
requires() dict#

The Tasks that this Task depends on.

A Task will only run if all of the Tasks that it requires are completed. If your Task does not require any other Tasks, then you don’t need to override this method. Otherwise, a subclass can override this method to return a single Task, a list of Task instances, or a dict whose values are Task instances.

See Task.requires

Return type:

dict

shifts = <law.parameter.CSVParameter object>#
skip_configs = <law.parameter.CSVParameter object>#
skip_datasets = <law.parameter.CSVParameter object>#
skip_shifts = <law.parameter.CSVParameter object>#
source_path = <luigi.parameter.Parameter object>#
task_namespace = 'cf'#

This value can be overridden to set the namespace that will be used. (See Task.namespaces_famlies_and_ids) If it’s not specified and you try to read this value anyway, it will return garbage. Please use get_task_namespace() to read the namespace.

Note that setting this value with @property will not work, because this is a class level value.

update_wrapper_params(params)#
validate = <law.parameter.OptionalBoolParameter object>#
version = None#
class ExternalFile(location, subpaths=<factory>, version='v1')[source]#

Bases: object

Container object to define an external file resource that is understood by (e.g.) tasks.external.BundleExternalFiles. Example:

# refer to a simple file location
ExternalFile(location="path/to/file", version="v1")

# refer to a directory or archive that contains multiple files
ExternalFile(location="some/archive.tgz", subpaths={"file_name": "file/in/archive"}, version="v1")

Attributes:

Methods:

new(resource)

Factory method to create a new instance of ExternalFile with backwards-compatible parsing of simple strings and tuples.

location: str#
subpaths: dict[str, str]#
version: str = 'v1'#
single: bool = False#
single_key: ClassVar[str] = '_single_key'#
classmethod new(resource)[source]#

Factory method to create a new instance of ExternalFile with backwards-compatible parsing of simple strings and tuples.

Return type:

ExternalFile

class BundleExternalFiles(*args, **kwargs)[source]#

Bases: ConfigTask, TransferLocalFile

Task to collect external files.

This task is intended to download source files for other tasks, such as files containing corrections for objects, the “golden” json files, source files for the calculation of pileup weights, and others.

All information about the relevant external files is extracted from the given config_inst, which must contain an auxiliary field external_files like the following (all entries are optional and user-defined):

# cfg is the current config instance
cfg.x.external_files = DotDict.wrap({
    "jet_jerc": ExternalFile(f"{SOURCE_URL}/POG/JME/{year}{corr_postfix}_UL/jet_jerc.json.gz", version="v1"),

    # tau energy correction and scale factors
    "tau_sf": ExternalFile(f"{SOURCE_URL}/POG/TAU/{year}{corr_postfix}_UL/tau.json.gz", version="v1"),

    # electron scale factors
    "electron_sf": ExternalFile(f"{SOURCE_URL}/POG/EGM/{year}{corr_postfix}_UL/electron.json.gz", version="v1"),
})

All entries should be ExternalFile instances.

Attributes:

Methods:

create_unique_basename(path)

get_files_collection([output])

single_output()

output()

The output that this Task produces.

trace_transfer_output(output)

run()

The task run method, to be overridden in a subclass.

single_config = True#
replicas = <luigi.parameter.IntParameter object>#
recreate = <luigi.parameter.BoolParameter object>#
user = <luigi.parameter.Parameter object>#
version = None#
classmethod create_unique_basename(path)[source]#
Return type:

str | dict[str, str]

property files_hash: str#
property file_names: DotDict#
get_files_collection(output=None)[source]#
Return type:

SiblingFileCollection

property files: DotDict#
property files_dir: LocalDirectoryTarget#
single_output()[source]#
output()[source]#

The output that this Task produces.

The output of the Task determines if the Task needs to be run–the task is considered finished iff the outputs all exist. Subclasses should override this method to return a single Target or a list of Target instances.

Implementation note

If running multiple workers, the output must be a resource that is accessible by all workers, such as a DFS or database. Otherwise, workers might compute the same output since they don’t see the work done by other workers.

See Task.output

trace_transfer_output(output)[source]#
run()[source]#

The task run method, to be overridden in a subclass.

See Task.run

configs = None#
exclude_index = False#
exclude_params_branch = {'user'}#
exclude_params_index = {'known_shifts', 'user'}#
exclude_params_remote_workflow = {'known_shifts'}#
exclude_params_repr = {'known_shifts', 'notify_custom', 'notify_mattermost', 'notify_slack', 'user'}#
exclude_params_repr_empty = {'source_path'}#
exclude_params_req = {'known_shifts', 'notify_custom', 'notify_mattermost', 'notify_slack', 'user'}#
exclude_params_req_get = {}#
exclude_params_req_set = {}#
exclude_params_sandbox = {'known_shifts', 'log_file', 'sandbox'}#
exclude_params_workflow = {'notify_custom', 'notify_mattermost', 'notify_slack', 'user'}#
class BundleExternalFilesWrapper(*args, **kwargs)#

Bases: AnalysisTask, WrapperTask

Attributes:

Methods:

requires()

The Tasks that this Task depends on.

update_wrapper_params(params)

configs = <law.parameter.CSVParameter object>#
exclude_index = False#
exclude_params_branch = {'user'}#
exclude_params_index = {'user'}#
exclude_params_repr = {'notify_custom', 'notify_mattermost', 'notify_slack', 'user'}#
exclude_params_repr_empty = {'skip_configs'}#
exclude_params_req = {'notify_custom', 'notify_mattermost', 'notify_slack', 'user'}#
exclude_params_req_get = {}#
exclude_params_req_set = {'configs', 'skip_configs', 'wrap_once'}#
exclude_params_sandbox = {'log_file', 'sandbox'}#
exclude_params_workflow = {'notify_custom', 'notify_mattermost', 'notify_slack', 'user'}#
recreate = <luigi.parameter.BoolParameter object>#
replicas = <luigi.parameter.IntParameter object>#
requires() dict#

The Tasks that this Task depends on.

A Task will only run if all of the Tasks that it requires are completed. If your Task does not require any other Tasks, then you don’t need to override this method. Otherwise, a subclass can override this method to return a single Task, a list of Task instances, or a dict whose values are Task instances.

See Task.requires

Return type:

dict

skip_configs = <law.parameter.CSVParameter object>#
source_path = <luigi.parameter.Parameter object>#
task_namespace = 'cf'#

This value can be overridden to set the namespace that will be used. (See Task.namespaces_famlies_and_ids) If it’s not specified and you try to read this value anyway, it will return garbage. Please use get_task_namespace() to read the namespace.

Note that setting this value with @property will not work, because this is a class level value.

update_wrapper_params(params)#
user = <luigi.parameter.Parameter object>#
version = None#