external#
Tasks dealing with external data.
Classes:
|
Task to get list of logical file names (LFNs). |
|
|
|
Container object to define an external file resource that is understood by (e.g.) |
|
Task to collect external files. |
|
- class GetDatasetLFNs(*args, **kwargs)[source]#
Bases:
DatasetTask,TransferLocalFileTask to get list of logical file names (LFNs).
Attributes:
Version parameter - deactivated for
GetDatasetLFNsDefines sandbox for this task.
Methods:
resolve_param_values(params)Resolve parameter values params from command line and propagate them to this set of parameters.
Creates a remote target file for the final .json file containing the list of LFNs.
run()Run function for this task.
get_dataset_lfns_dasgoclient(dataset_inst, ...)Get the LNF information with the
dasgoclient.iter_nano_files(task[, fs, lfn_indices, ...])Generator function that reduces the boilerplate code for looping over files referred to by lfn_indices given the lfns obtained by this task which needs to be complete for this function to succeed.
- replicas = <luigi.parameter.IntParameter object>#
- validate = <law.parameter.OptionalBoolParameter object>#
- version = None#
Version parameter - deactivated for
GetDatasetLFNs
- classmethod resolve_param_values(params)[source]#
Resolve parameter values params from command line and propagate them to this set of parameters.
- property sandbox: str#
Defines sandbox for this task.
- Returns:
Path to shell script that sets up the requested sandbox.
- single_output()[source]#
Creates a remote target file for the final .json file containing the list of LFNs.
- Return type:
- Returns:
Law remote target with the initialized output name
- run()[source]#
Run function for this task.
- Raises:
ValueError – If number of loaded LFNs does not correspond to number of LFNs specified in this
dataset_info_inst.
- get_dataset_lfns_dasgoclient(dataset_inst, shift_inst, dataset_key)[source]#
Get the LNF information with the
dasgoclient.- Parameters:
- Raises:
Exception – If query with
dasgoclientfails.- Return type:
- Returns:
The list of LFNs corresponding to the dataset with the identifier dataset_key.
- iter_nano_files(task, fs=None, lfn_indices=None, eager_lookup=1, skip_fallback=False)[source]#
Generator function that reduces the boilerplate code for looping over files referred to by lfn_indices given the lfns obtained by this task which needs to be complete for this function to succeed.
When lfn_indices are not given, task must be a branch of a
DatasetTaskworkflow whose branch value is used instead.- Parameters:
task (AnalysisTask | DatasetTask) – Current task that needs to access the nanoAOD files
fs (str | Sequence[str] | None, default:
None) – Name of the local or remote file system where the LFNs are located, defaults to Nonelfn_indices (list[int] | None, default:
None) – List of indices of LFNs that are processed by this task instance, defaults to Noneeager_lookup (bool | int, default:
1) – Look at the next fs if stat takes too long, defaults to 1skip_fallback (bool, default:
False) – Skip the fallback mechanism to fetch the LFN, defaults to False
- Raises:
TypeError – If task is not of type
BaseWorkflowor not a task analyzing a single branch in the task treeException – If current task is not complete as indicated with
self.complete()ValueError – If no fs is provided at call and none can be found in either the config instance or the law config.
Exception – If a given LFN cannot be found at any fs
- Yield:
a file target that points to a LFN
- Return type:
None
- configs = None#
- exclude_index = False#
- exclude_params_branch = {'user'}#
- exclude_params_index = {'known_shifts', 'local_shift', 'user'}#
- exclude_params_remote_workflow = {'known_shifts', 'local_shift'}#
- exclude_params_repr = {'known_shifts', 'notify_custom', 'notify_mattermost', 'notify_slack', 'user'}#
- exclude_params_repr_empty = {'source_path'}#
- exclude_params_req = {'known_shifts', 'local_shift', 'notify_custom', 'notify_mattermost', 'notify_slack', 'user'}#
- exclude_params_req_get = {}#
- exclude_params_req_set = {}#
- exclude_params_sandbox = {'known_shifts', 'local_shift', 'log_file', 'sandbox'}#
- exclude_params_workflow = {'notify_custom', 'notify_mattermost', 'notify_slack', 'user'}#
- class GetDatasetLFNsWrapper(*args, **kwargs)#
Bases:
AnalysisTask,WrapperTaskAttributes:
This value can be overridden to set the namespace that will be used.
Methods:
requires()The Tasks that this Task depends on.
update_wrapper_params(params)- configs = <law.parameter.CSVParameter object>#
- datasets = <law.parameter.CSVParameter object>#
- exclude_index = False#
- exclude_params_branch = {'user'}#
- exclude_params_index = {'user'}#
- exclude_params_repr = {'notify_custom', 'notify_mattermost', 'notify_slack', 'user'}#
- exclude_params_repr_empty = {'skip_configs', 'skip_datasets', 'skip_shifts'}#
- exclude_params_req = {'notify_custom', 'notify_mattermost', 'notify_slack', 'user'}#
- exclude_params_req_get = {}#
- exclude_params_req_set = {'configs', 'datasets', 'shifts', 'skip_configs', 'skip_datasets', 'skip_shifts', 'wrap_once'}#
- exclude_params_sandbox = {'log_file', 'sandbox'}#
- exclude_params_workflow = {'notify_custom', 'notify_mattermost', 'notify_slack', 'user'}#
- replicas = <luigi.parameter.IntParameter object>#
- requires() dict#
The Tasks that this Task depends on.
A Task will only run if all of the Tasks that it requires are completed. If your Task does not require any other Tasks, then you don’t need to override this method. Otherwise, a subclass can override this method to return a single Task, a list of Task instances, or a dict whose values are Task instances.
See Task.requires
- Return type:
- shifts = <law.parameter.CSVParameter object>#
- skip_configs = <law.parameter.CSVParameter object>#
- skip_datasets = <law.parameter.CSVParameter object>#
- skip_shifts = <law.parameter.CSVParameter object>#
- source_path = <luigi.parameter.Parameter object>#
- task_namespace = 'cf'#
This value can be overridden to set the namespace that will be used. (See Task.namespaces_famlies_and_ids) If it’s not specified and you try to read this value anyway, it will return garbage. Please use
get_task_namespace()to read the namespace.Note that setting this value with
@propertywill not work, because this is a class level value.
- update_wrapper_params(params)#
- validate = <law.parameter.OptionalBoolParameter object>#
- version = None#
- class ExternalFile(location, subpaths=<factory>, version='v1')[source]#
Bases:
objectContainer object to define an external file resource that is understood by (e.g.)
tasks.external.BundleExternalFiles. Example:# refer to a simple file location ExternalFile(location="path/to/file", version="v1") # refer to a directory or archive that contains multiple files ExternalFile(location="some/archive.tgz", subpaths={"file_name": "file/in/archive"}, version="v1")
Attributes:
Methods:
new(resource)Factory method to create a new instance of
ExternalFilewith backwards-compatible parsing of simple strings and tuples.- classmethod new(resource)[source]#
Factory method to create a new instance of
ExternalFilewith backwards-compatible parsing of simple strings and tuples.- Return type:
ExternalFile
- class BundleExternalFiles(*args, **kwargs)[source]#
Bases:
ConfigTask,TransferLocalFileTask to collect external files.
This task is intended to download source files for other tasks, such as files containing corrections for objects, the “golden” json files, source files for the calculation of pileup weights, and others.
All information about the relevant external files is extracted from the given
config_inst, which must contain an auxiliary fieldexternal_fileslike the following (all entries are optional and user-defined):# cfg is the current config instance cfg.x.external_files = DotDict.wrap({ "jet_jerc": ExternalFile(f"{SOURCE_URL}/POG/JME/{year}{corr_postfix}_UL/jet_jerc.json.gz", version="v1"), # tau energy correction and scale factors "tau_sf": ExternalFile(f"{SOURCE_URL}/POG/TAU/{year}{corr_postfix}_UL/tau.json.gz", version="v1"), # electron scale factors "electron_sf": ExternalFile(f"{SOURCE_URL}/POG/EGM/{year}{corr_postfix}_UL/electron.json.gz", version="v1"), })
All entries should be
ExternalFileinstances.Attributes:
Methods:
create_unique_basename(path)get_files_collection([output])output()The output that this Task produces.
trace_transfer_output(output)run()The task run method, to be overridden in a subclass.
- single_config = True#
- replicas = <luigi.parameter.IntParameter object>#
- recreate = <luigi.parameter.BoolParameter object>#
- user = <luigi.parameter.Parameter object>#
- version = None#
- property files_dir: LocalDirectoryTarget#
- output()[source]#
The output that this Task produces.
The output of the Task determines if the Task needs to be run–the task is considered finished iff the outputs all exist. Subclasses should override this method to return a single
Targetor a list ofTargetinstances.- Implementation note
If running multiple workers, the output must be a resource that is accessible by all workers, such as a DFS or database. Otherwise, workers might compute the same output since they don’t see the work done by other workers.
See Task.output
- configs = None#
- exclude_index = False#
- exclude_params_branch = {'user'}#
- exclude_params_index = {'known_shifts', 'user'}#
- exclude_params_remote_workflow = {'known_shifts'}#
- exclude_params_repr = {'known_shifts', 'notify_custom', 'notify_mattermost', 'notify_slack', 'user'}#
- exclude_params_repr_empty = {'source_path'}#
- exclude_params_req = {'known_shifts', 'notify_custom', 'notify_mattermost', 'notify_slack', 'user'}#
- exclude_params_req_get = {}#
- exclude_params_req_set = {}#
- exclude_params_sandbox = {'known_shifts', 'log_file', 'sandbox'}#
- exclude_params_workflow = {'notify_custom', 'notify_mattermost', 'notify_slack', 'user'}#
- class BundleExternalFilesWrapper(*args, **kwargs)#
Bases:
AnalysisTask,WrapperTaskAttributes:
This value can be overridden to set the namespace that will be used.
Methods:
requires()The Tasks that this Task depends on.
update_wrapper_params(params)- configs = <law.parameter.CSVParameter object>#
- exclude_index = False#
- exclude_params_branch = {'user'}#
- exclude_params_index = {'user'}#
- exclude_params_repr = {'notify_custom', 'notify_mattermost', 'notify_slack', 'user'}#
- exclude_params_repr_empty = {'skip_configs'}#
- exclude_params_req = {'notify_custom', 'notify_mattermost', 'notify_slack', 'user'}#
- exclude_params_req_get = {}#
- exclude_params_req_set = {'configs', 'skip_configs', 'wrap_once'}#
- exclude_params_sandbox = {'log_file', 'sandbox'}#
- exclude_params_workflow = {'notify_custom', 'notify_mattermost', 'notify_slack', 'user'}#
- recreate = <luigi.parameter.BoolParameter object>#
- replicas = <luigi.parameter.IntParameter object>#
- requires() dict#
The Tasks that this Task depends on.
A Task will only run if all of the Tasks that it requires are completed. If your Task does not require any other Tasks, then you don’t need to override this method. Otherwise, a subclass can override this method to return a single Task, a list of Task instances, or a dict whose values are Task instances.
See Task.requires
- Return type:
- skip_configs = <law.parameter.CSVParameter object>#
- source_path = <luigi.parameter.Parameter object>#
- task_namespace = 'cf'#
This value can be overridden to set the namespace that will be used. (See Task.namespaces_famlies_and_ids) If it’s not specified and you try to read this value anyway, it will return garbage. Please use
get_task_namespace()to read the namespace.Note that setting this value with
@propertywill not work, because this is a class level value.
- update_wrapper_params(params)#
- user = <luigi.parameter.Parameter object>#
- version = None#