remote

Contents

remote#

Base classes and tools for working with remote tasks and targets.

Classes:

BundleRepo(*args, **kwargs)

BundleSoftware(*args, **kwargs)

SandboxFileTask(*args, **kwargs)

BuildBashSandbox(*args, **kwargs)

BundleBashSandbox(*args, **kwargs)

BundleCMSSWSandbox(*args, **kwargs)

RemoteWorkflowMixin()

Mixin class for custom remote workflows adding common functionality.

HTCondorWorkflow(*args, **kwargs)

SlurmWorkflow(*args, **kwargs)

RemoteWorkflow(*args, **kwargs)

class BundleRepo(*args, **kwargs)[source]#

Bases: AnalysisTask, BundleGitRepository, TransferLocalFile

Attributes:

replicas

version

exclude_files

exclude_index

exclude_params_index

exclude_params_repr

exclude_params_repr_empty

exclude_params_req

exclude_params_req_get

exclude_params_req_set

exclude_params_sandbox

Methods:

get_repo_path()

single_output()

get_file_pattern()

output()

The output that this Task produces.

run()

The task run method, to be overridden in a subclass.

replicas = <luigi.parameter.IntParameter object>#
version = None#
exclude_files = ['docs', 'tests', 'data', 'assets', '.law', '.setups', '.data', '.github']#
get_repo_path()[source]#
single_output()[source]#
get_file_pattern()[source]#
output()[source]#

The output that this Task produces.

The output of the Task determines if the Task needs to be run–the task is considered finished iff the outputs all exist. Subclasses should override this method to return a single Target or a list of Target instances.

Implementation note

If running multiple workers, the output must be a resource that is accessible by all workers, such as a DFS or database. Otherwise, workers might compute the same output since they don’t see the work done by other workers.

See Task.output

run()[source]#

The task run method, to be overridden in a subclass.

See Task.run

exclude_index = False#
exclude_params_index = {}#
exclude_params_repr = {}#
exclude_params_repr_empty = {}#
exclude_params_req = {}#
exclude_params_req_get = {}#
exclude_params_req_set = {}#
exclude_params_sandbox = {'log_file', 'sandbox'}#
class BundleSoftware(*args, **kwargs)[source]#

Bases: AnalysisTask, TransferLocalFile

Attributes:

replicas

version

exclude_index

exclude_params_index

exclude_params_repr

exclude_params_repr_empty

exclude_params_req

exclude_params_req_get

exclude_params_req_set

exclude_params_sandbox

Methods:

single_output()

get_file_pattern()

run()

The task run method, to be overridden in a subclass.

replicas = <luigi.parameter.IntParameter object>#
version = None#
single_output()[source]#
get_file_pattern()[source]#
run()[source]#

The task run method, to be overridden in a subclass.

See Task.run

exclude_index = False#
exclude_params_index = {}#
exclude_params_repr = {}#
exclude_params_repr_empty = {}#
exclude_params_req = {}#
exclude_params_req_get = {}#
exclude_params_req_set = {}#
exclude_params_sandbox = {'log_file', 'sandbox'}#
class SandboxFileTask(*args, **kwargs)[source]#

Bases: AnalysisTask

Attributes:

sandbox_file

exclude_index

exclude_params_index

exclude_params_repr

exclude_params_repr_empty

exclude_params_req

exclude_params_req_get

exclude_params_req_set

exclude_params_sandbox

Methods:

resolve_param_values(params)

sandbox_file = <luigi.parameter.Parameter object>#
classmethod resolve_param_values(params)[source]#
exclude_index = False#
exclude_params_index = {}#
exclude_params_repr = {}#
exclude_params_repr_empty = {}#
exclude_params_req = {}#
exclude_params_req_get = {}#
exclude_params_req_set = {}#
exclude_params_sandbox = {'log_file', 'sandbox'}#
class BuildBashSandbox(*args, **kwargs)[source]#

Bases: SandboxFileTask

Attributes:

sandbox

version

exclude_params_index

exclude_index

exclude_params_repr

exclude_params_repr_empty

exclude_params_req

exclude_params_req_get

exclude_params_req_set

exclude_params_sandbox

Methods:

resolve_param_values(params)

output()

The output that this Task produces.

run()

The task run method, to be overridden in a subclass.

sandbox = <luigi.parameter.Parameter object>#
version = None#
exclude_params_index = {'sandbox'}#
classmethod resolve_param_values(params)[source]#
output()[source]#

The output that this Task produces.

The output of the Task determines if the Task needs to be run–the task is considered finished iff the outputs all exist. Subclasses should override this method to return a single Target or a list of Target instances.

Implementation note

If running multiple workers, the output must be a resource that is accessible by all workers, such as a DFS or database. Otherwise, workers might compute the same output since they don’t see the work done by other workers.

See Task.output

run()[source]#

The task run method, to be overridden in a subclass.

See Task.run

exclude_index = False#
exclude_params_repr = {}#
exclude_params_repr_empty = {}#
exclude_params_req = {}#
exclude_params_req_get = {}#
exclude_params_req_set = {}#
exclude_params_sandbox = {'log_file', 'sandbox'}#
class BundleBashSandbox(*args, **kwargs)[source]#

Bases: AnalysisTask, TransferLocalFile

Attributes:

sandbox_file

replicas

version

reqs

checksum

exclude_index

exclude_params_index

exclude_params_repr

exclude_params_repr_empty

exclude_params_req

exclude_params_req_get

exclude_params_req_set

exclude_params_sandbox

Methods:

requires()

The Tasks that this Task depends on.

single_output()

get_file_pattern()

run()

The task run method, to be overridden in a subclass.

sandbox_file = <luigi.parameter.Parameter object>#
replicas = <luigi.parameter.IntParameter object>#
version = None#
reqs = {'BuildBashSandbox': <class 'columnflow.tasks.framework.remote.BuildBashSandbox'>}#
requires()[source]#

The Tasks that this Task depends on.

A Task will only run if all of the Tasks that it requires are completed. If your Task does not require any other Tasks, then you don’t need to override this method. Otherwise, a subclass can override this method to return a single Task, a list of Task instances, or a dict whose values are Task instances.

See Task.requires

property checksum#
single_output()[source]#
get_file_pattern()[source]#
run()[source]#

The task run method, to be overridden in a subclass.

See Task.run

exclude_index = False#
exclude_params_index = {}#
exclude_params_repr = {}#
exclude_params_repr_empty = {}#
exclude_params_req = {}#
exclude_params_req_get = {}#
exclude_params_req_set = {}#
exclude_params_sandbox = {'log_file', 'sandbox'}#
class BundleCMSSWSandbox(*args, **kwargs)[source]#

Bases: SandboxFileTask, BundleCMSSW, TransferLocalFile

Attributes:

sandbox_file

replicas

version

exclude

include

reqs

exclude_index

exclude_params_index

exclude_params_repr

exclude_params_repr_empty

exclude_params_req

exclude_params_req_get

exclude_params_req_set

exclude_params_sandbox

Methods:

requires()

The Tasks that this Task depends on.

get_cmssw_path()

single_output()

output()

The output that this Task produces.

get_file_pattern()

run()

The task run method, to be overridden in a subclass.

sandbox_file = <luigi.parameter.Parameter object>#
replicas = <luigi.parameter.IntParameter object>#
version = None#
exclude = '^src/tmp'#
include = ('venv', 'venvs')#
reqs = {'BuildBashSandbox': <class 'columnflow.tasks.framework.remote.BuildBashSandbox'>}#
requires()[source]#

The Tasks that this Task depends on.

A Task will only run if all of the Tasks that it requires are completed. If your Task does not require any other Tasks, then you don’t need to override this method. Otherwise, a subclass can override this method to return a single Task, a list of Task instances, or a dict whose values are Task instances.

See Task.requires

get_cmssw_path()[source]#
single_output()[source]#
output()[source]#

The output that this Task produces.

The output of the Task determines if the Task needs to be run–the task is considered finished iff the outputs all exist. Subclasses should override this method to return a single Target or a list of Target instances.

Implementation note

If running multiple workers, the output must be a resource that is accessible by all workers, such as a DFS or database. Otherwise, workers might compute the same output since they don’t see the work done by other workers.

See Task.output

get_file_pattern()[source]#
run()[source]#

The task run method, to be overridden in a subclass.

See Task.run

exclude_index = False#
exclude_params_index = {}#
exclude_params_repr = {}#
exclude_params_repr_empty = {}#
exclude_params_req = {}#
exclude_params_req_get = {}#
exclude_params_req_set = {}#
exclude_params_sandbox = {'log_file', 'sandbox'}#
class RemoteWorkflowMixin[source]#

Bases: object

Mixin class for custom remote workflows adding common functionality.

Attributes:

skip_destination_info

Methods:

add_bundle_requirements(reqs)

Adds requirements related to bundles of the repository, conda environment, bash and cmssw sandboxes to reqs.

add_bundle_render_variables(config, reqs)

Adds render variables to the job config related to repository, conda environment, bash and cmssw sandboxes, depending on which requirements are present in reqs.

add_common_configs(config, reqs, *[, ...])

Adds job settings like common input files or render variables to the job config.

common_destination_info(info)

Hook to modify the additional info printed along logs of the workflow.

skip_destination_info: bool = False#
add_bundle_requirements(reqs)[source]#

Adds requirements related to bundles of the repository, conda environment, bash and cmssw sandboxes to reqs.

Parameters:

reqs (dict[str, AnalysisTask]) – Dictionary of workflow requirements to be extended.

Return type:

None

add_bundle_render_variables(config, reqs)[source]#

Adds render variables to the job config related to repository, conda environment, bash and cmssw sandboxes, depending on which requirements are present in reqs.

Parameters:
  • reqs (dict[str, AnalysisTask]) – Dictionary of workflow requirements.

  • config (Config) – The job law.BaseJobFileFactory.Config whose render variables should be set.

Return type:

None

add_common_configs(config, reqs, *, law_config=True, voms=True, kerberos=False, wlcg=True)[source]#

Adds job settings like common input files or render variables to the job config. Workflow requirements are given as reqs to let common options potentially depend on them. Additional keyword arguments control specific behavior of this method.

Parameters:
  • reqs (dict[str, AnalysisTask]) – Dictionary of workflow requirements.

  • config (Config) – The job law.BaseJobFileFactory.Config.

  • law_config (bool, default: True) – Whether the law config should be forwarded (via render variables or input file).

  • voms (bool, default: True) – Whether the voms proxy file should be forwarded.

  • kerberos (bool, default: False) – Whether the kerberos proxy file should be forwarded.

  • wlcg (bool, default: True) – Whether WLCG specific settings should be added.

Return type:

None

common_destination_info(info)[source]#

Hook to modify the additional info printed along logs of the workflow.

Return type:

dict[str, str]

class HTCondorWorkflow(*args, **kwargs)[source]#

Bases: AnalysisTask, HTCondorWorkflow, RemoteWorkflowMixin

Attributes:

transfer_logs

max_runtime

htcondor_logs

htcondor_cpus

htcondor_gpus

htcondor_memory

htcondor_flavor

htcondor_share_software

exclude_params_branch

htcondor_forward_env_variables

reqs

exclude_index

exclude_params_htcondor_workflow

exclude_params_index

exclude_params_remote_workflow

exclude_params_repr

exclude_params_repr_empty

exclude_params_req

exclude_params_req_get

exclude_params_req_set

exclude_params_sandbox

exclude_params_workflow

Methods:

htcondor_workflow_requires()

htcondor_output_directory()

htcondor_bootstrap_file()

htcondor_job_config(config, job_num, branches)

htcondor_use_local_scheduler()

htcondor_destination_info(info)

rtype:

dict[str, str]

transfer_logs = <luigi.parameter.BoolParameter object>#
max_runtime = <law.parameter.DurationParameter object>#
htcondor_logs = <luigi.parameter.BoolParameter object>#
htcondor_cpus = <luigi.parameter.IntParameter object>#
htcondor_gpus = <luigi.parameter.IntParameter object>#
htcondor_memory = <law.parameter.BytesParameter object>#
htcondor_flavor = <luigi.parameter.ChoiceParameter object>#
htcondor_share_software = <luigi.parameter.BoolParameter object>#
exclude_params_branch = {'acceptance', 'branches', 'cancel_jobs', 'cleanup_jobs', 'htcondor_cpus', 'htcondor_flavor', 'htcondor_gpus', 'htcondor_logs', 'htcondor_memory', 'htcondor_pool', 'htcondor_scheduler', 'htcondor_share_software', 'ignore_submission', 'job_workers', 'max_runtime', 'no_poll', 'parallel_jobs', 'pilot', 'poll_fails', 'poll_interval', 'retries', 'shuffle_jobs', 'submission_threads', 'tasks_per_job', 'tolerance', 'transfer_logs', 'walltime'}#
htcondor_forward_env_variables = {'CF_BASE': 'cf_base', 'CF_CERN_USER': 'cf_cern_user', 'CF_LOCAL_SCHEDULER': 'cf_local_scheduler', 'CF_REPO_BASE': 'cf_repo_base', 'CF_STORE_LOCAL': 'cf_store_local', 'CF_STORE_NAME': 'cf_store_name'}#
reqs = {'BuildBashSandbox': <class 'columnflow.tasks.framework.remote.BuildBashSandbox'>, 'BundleBashSandbox': <class 'columnflow.tasks.framework.remote.BundleBashSandbox'>, 'BundleCMSSWSandbox': <class 'columnflow.tasks.framework.remote.BundleCMSSWSandbox'>, 'BundleRepo': <class 'columnflow.tasks.framework.remote.BundleRepo'>, 'BundleSoftware': <class 'columnflow.tasks.framework.remote.BundleSoftware'>}#
htcondor_workflow_requires()[source]#
htcondor_output_directory()[source]#
htcondor_bootstrap_file()[source]#
htcondor_job_config(config, job_num, branches)[source]#
htcondor_use_local_scheduler()[source]#
htcondor_destination_info(info)[source]#
Return type:

dict[str, str]

exclude_index = False#
exclude_params_htcondor_workflow = {}#
exclude_params_index = {'effective_workflow'}#
exclude_params_remote_workflow = {}#
exclude_params_repr = {'cancel_jobs', 'cleanup_jobs', 'workflow'}#
exclude_params_repr_empty = {}#
exclude_params_req = {'effective_workflow'}#
exclude_params_req_get = {}#
exclude_params_req_set = {}#
exclude_params_sandbox = {'log_file', 'sandbox'}#
exclude_params_workflow = {'branch'}#
class SlurmWorkflow(*args, **kwargs)[source]#

Bases: AnalysisTask, SlurmWorkflow, RemoteWorkflowMixin

Attributes:

transfer_logs

max_runtime

slurm_partition

slurm_flavor

exclude_params_branch

slurm_forward_env_variables

reqs

exclude_index

exclude_params_index

exclude_params_remote_workflow

exclude_params_repr

exclude_params_repr_empty

exclude_params_req

exclude_params_req_get

exclude_params_req_set

exclude_params_sandbox

exclude_params_slurm_workflow

exclude_params_workflow

Methods:

slurm_workflow_requires()

slurm_output_directory()

slurm_bootstrap_file()

slurm_job_config(config, job_num, branches)

slurm_destination_info(info)

rtype:

dict[str, str]

transfer_logs = <luigi.parameter.BoolParameter object>#
max_runtime = <law.parameter.DurationParameter object>#
slurm_partition = <luigi.parameter.Parameter object>#
slurm_flavor = <luigi.parameter.ChoiceParameter object>#
exclude_params_branch = {'acceptance', 'branches', 'cancel_jobs', 'cleanup_jobs', 'ignore_submission', 'job_workers', 'max_runtime', 'no_poll', 'parallel_jobs', 'pilot', 'poll_fails', 'poll_interval', 'retries', 'shuffle_jobs', 'slurm_flavor', 'slurm_partition', 'submission_threads', 'tasks_per_job', 'tolerance', 'transfer_logs', 'walltime'}#
slurm_forward_env_variables = {'CF_BASE': 'cf_base', 'CF_CERN_USER': 'cf_cern_user', 'CF_LOCAL_SCHEDULER': 'cf_local_scheduler', 'CF_REPO_BASE': 'cf_repo_base', 'CF_STORE_LOCAL': 'cf_store_local', 'CF_STORE_NAME': 'cf_store_name'}#
reqs = {'BuildBashSandbox': <class 'columnflow.tasks.framework.remote.BuildBashSandbox'>}#
slurm_workflow_requires()[source]#
slurm_output_directory()[source]#
slurm_bootstrap_file()[source]#
slurm_job_config(config, job_num, branches)[source]#
slurm_destination_info(info)[source]#
Return type:

dict[str, str]

exclude_index = False#
exclude_params_index = {'effective_workflow'}#
exclude_params_remote_workflow = {}#
exclude_params_repr = {'cancel_jobs', 'cleanup_jobs', 'workflow'}#
exclude_params_repr_empty = {}#
exclude_params_req = {'effective_workflow'}#
exclude_params_req_get = {}#
exclude_params_req_set = {}#
exclude_params_sandbox = {'log_file', 'sandbox'}#
exclude_params_slurm_workflow = {}#
exclude_params_workflow = {'branch'}#
class RemoteWorkflow(*args, **kwargs)[source]#

Bases: HTCondorWorkflow, SlurmWorkflow

Attributes:

reqs

exclude_index

exclude_params_branch

exclude_params_htcondor_workflow

exclude_params_index

exclude_params_remote_workflow

exclude_params_repr

exclude_params_repr_empty

exclude_params_req

exclude_params_req_get

exclude_params_req_set

exclude_params_sandbox

exclude_params_slurm_workflow

exclude_params_workflow

reqs = {'BuildBashSandbox': <class 'columnflow.tasks.framework.remote.BuildBashSandbox'>, 'BundleBashSandbox': <class 'columnflow.tasks.framework.remote.BundleBashSandbox'>, 'BundleCMSSWSandbox': <class 'columnflow.tasks.framework.remote.BundleCMSSWSandbox'>, 'BundleRepo': <class 'columnflow.tasks.framework.remote.BundleRepo'>, 'BundleSoftware': <class 'columnflow.tasks.framework.remote.BundleSoftware'>}#
exclude_index = False#
exclude_params_branch = {'acceptance', 'branches', 'cancel_jobs', 'cleanup_jobs', 'htcondor_cpus', 'htcondor_flavor', 'htcondor_gpus', 'htcondor_logs', 'htcondor_memory', 'htcondor_pool', 'htcondor_scheduler', 'htcondor_share_software', 'ignore_submission', 'job_workers', 'max_runtime', 'no_poll', 'parallel_jobs', 'pilot', 'poll_fails', 'poll_interval', 'retries', 'shuffle_jobs', 'slurm_flavor', 'slurm_partition', 'submission_threads', 'tasks_per_job', 'tolerance', 'transfer_logs', 'walltime'}#
exclude_params_htcondor_workflow = {}#
exclude_params_index = {'effective_workflow'}#
exclude_params_remote_workflow = {}#
exclude_params_repr = {'cancel_jobs', 'cleanup_jobs', 'workflow'}#
exclude_params_repr_empty = {}#
exclude_params_req = {'effective_workflow'}#
exclude_params_req_get = {}#
exclude_params_req_set = {}#
exclude_params_sandbox = {'log_file', 'sandbox'}#
exclude_params_slurm_workflow = {}#
exclude_params_workflow = {'branch'}#