client.workflow

The overarching framework to create tasks and dependencies within.

Attributes

logger

Classes

DistributorContext

Workflow

(aka Batch, aka Swarm).

Module Contents

client.workflow.logger[source]
class client.workflow.DistributorContext(cluster_name: str, workflow_run_id: int, timeout: int)[source]
_cluster_name[source]
_workflow_run_id[source]
_timeout[source]
__enter__() DistributorContext[source]

Starts the Distributor Process.

__exit__(exc_type: BaseException | None, exc_value: BaseException | None, exc_traceback: types.TracebackType | None) None[source]

Stops the Distributor Process.

alive() bool[source]
_shutdown() str[source]
static derive_jobmon_command_from_env() str | None[source]

If a singularity path is provided, use it when running the worker node.

class client.workflow.Workflow(tool_version: jobmon.client.tool_version.ToolVersion, workflow_args: str = '', name: str = '', description: str = '', workflow_attributes: List | dict | None = None, max_concurrently_running: int = MaxConcurrentlyRunning.MAXCONCURRENTLYRUNNING, requester: jobmon.core.requester.Requester | None = None, chunk_size: int = 500)[source]

Bases: object

(aka Batch, aka Swarm).

A Workflow is a framework by which a user may define the relationship between tasks and define the relationship between multiple runs of the same set of tasks. The great benefit of the Workflow is that it’s resumable. A Workflow can only be re-loaded if two things are shown to be exact matches to a previous Workflow:

  1. WorkflowArgs: It is recommended to pass a meaningful unique identifier

    to workflow_args, to ease resuming. However, if the Workflow is a one-off project, you may instantiate the Workflow anonymously, without WorkflowArgs. Under the hood, the WorkflowArgs will default to a UUID which, as it is randomly generated, will be harder to remember and thus harder to resume.

    Workflow args must be hashable. For example, CodCorrect or Como version might be passed as Args to the Workflow. For now, the assumption is WorkflowArgs is a string.

  2. The tasks added to the workflow. A Workflow is built up by

    using Workflow.add_task(). In order to resume a Workflow, all the same tasks must be added with the same dependencies between tasks.

_tool_version[source]
name[source]
description[source]
max_concurrently_running: int[source]
requester[source]
_dag[source]
tasks: Dict[int, jobmon.client.task.Task][source]
arrays: Dict[str, jobmon.client.array.Array][source]
_chunk_size: int[source]
workflow_args_hash[source]
workflow_attributes: Dict[str, Any][source]
_clusters: Dict[str, jobmon.core.cluster.Cluster][source]
_task_resources: Dict[int, jobmon.client.task_resources.TaskResources][source]
default_cluster_name: str = ''[source]
_default_max_attempts: int | None = None[source]
default_compute_resources_set: Dict[str, Dict[str, Any]][source]
default_resource_scales_set: Dict[str, Dict[str, float]][source]
_fail_after_n_executions = 1000000000[source]
last_workflow_run_id: int | None = None[source]
property tool: jobmon.client.tool.Tool[source]

Returns the associated tool to this workflow.

property is_bound: bool[source]

If the workflow has been bound to the db.

property workflow_id: int[source]

If the workflow is bound then it will have been given an id.

property dag_id: int[source]

If it has been bound, it will have an associated dag_id.

property task_hash: int[source]

Hash of all of the tasks.

property task_errors: Dict[source]

Return a dict of error associated with a task.

property default_max_attempts: int | None[source]

Return the workflow default max attempts.

add_attributes(workflow_attributes: dict) None[source]

Users can call either to update values of existing attributes or add new attributes.

Parameters:

workflow_attributes – attributes to be bound to the db that describe this workflow.

add_task(task: jobmon.client.task.Task) jobmon.client.task.Task[source]

Add a task to the workflow to be executed.

Set semantics - add tasks once only, based on hash name.

Parameters:

task – single task to add.

add_tasks(tasks: Sequence[jobmon.client.task.Task]) None[source]

Add a list of task to the workflow to be executed.

set_default_compute_resources_from_yaml(cluster_name: str, yaml_file: str) None[source]

Set default compute resources from a user provided yaml file for workflow level.

TODO: Implement this method.

Parameters:
  • cluster_name – name of cluster to set default values for.

  • yaml_file – the yaml file that is providing the compute resource values.

set_default_compute_resources_from_dict(cluster_name: str, dictionary: Dict[str, Any]) None[source]

Set default compute resources for a given cluster_name.

Parameters:
  • cluster_name – name of cluster to set default values for.

  • dictionary – dictionary of default compute resources to run tasks with. Can be overridden at task template, tool or task level.

set_default_resource_scales_from_dict(cluster_name: str, dictionary: Dict[str, float]) None[source]

Set default resource scales for a given cluster_name.

Parameters:
  • cluster_name – name of cluster to set default values for.

  • dictionary – dictionary of default resource scales to adjust task resources with. Can be overridden at task template or task level.

set_default_cluster_name(cluster_name: str) None[source]

Set the default cluster.

Parameters:

cluster_name – name of cluster to set as default.

set_default_max_attempts(value: int) None[source]

Set the max attempts.

Parameters:

value – value of max_attempts.

get_tasks_by_node_args(task_template_name: str, **kwargs: Any) List[jobmon.client.task.Task][source]

Query tasks by node args. Used for setting dependencies.

set_max_concurrently_running(task_template_name: str, max_concurrently_running: int) None[source]
run(fail_fast: bool = False, seconds_until_timeout: int = 36000, resume: bool = False, reset_running_jobs: bool = True, distributor_startup_timeout: int = 180, resume_timeout: int = 300, configure_logging: bool = False) str | None[source]

Run the workflow.

Traverse the dag and submitting new tasks when their tasks have completed successfully.

Parameters:
  • fail_fast – whether to break out of distributor on first failure.

  • seconds_until_timeout – amount of time (in seconds) to wait until the whole workflow times out. Submitted jobs will continue

  • resume – whether the workflow should be resumed or not, if it is not set to resume and an identical workflow already exists, the workflow will error out

  • reset_running_jobs – whether or not to reset running jobs upon resume

  • distributor_startup_timeout – amount of time to wait for the distributor process to start up

  • resume_timeout – seconds to wait for a workflow to become resumable before giving up

  • configure_logging – setup jobmon logging. If False, no logging will be configured. If True, default logging will be configured.

Returns:

str of WorkflowRunStatus

set_task_template_max_concurrency_limit(task_template_name: str, limit: int) None[source]
validate(strict: bool = True, raise_on_error: bool = False) None[source]

Confirm that the tasks in this workflow are valid.

This method will: - access the database to confirm the requested resources are valid for the specified cluster - confirm that the workflow args are valid - make sure no task contains up/down stream tasks that are not in the workflow

bind() None[source]

Get a workflow_id.

_bind_tasks(reset_if_running: bool = True, chunk_size: int = 500) None[source]
_bind_task_args(chunk_size: int = 500) None[source]

Bind all task args to the database.

Loop through our bound task dict in chunks in order to bind new args and arg types to the database.

_bind_task_attributes(chunk_size: int = 500) None[source]
get_errors(limit: int = 1000) Dict[int, Dict[str, int | List[Dict[str, str | int]]]] | None[source]

Method to get all errors.

Return a dictionary with the erring task_id as the key, and the Task.get_errors content as the value. When limit is specifically set as None from the client, this return set will pass back all the erred tasks in the workflow.

get_cluster_by_name(cluster_name: str) jobmon.core.cluster.Cluster[source]

Check if the cluster that the task specified is in the cache.

If the cluster is not in the cache, create it and add to cache.

_set_original_task_resources(task: jobmon.client.task.Task) None[source]
_matching_wf_args_diff_hash() None[source]

Check that that an existing workflow does not contain different tasks.

Check that an existing workflow with the same workflow_args does not have a different hash, this would indicate that thgat the workflow contains different tasks.

__hash__() int[source]

Hash to encompass tool version id, workflow args, tasks and dag.

__repr__() str[source]

A representation string for a Workflow instance.