client.workflow

The overarching framework to create tasks and dependencies within.

Attributes

logger

Classes

DistributorContext

Initialization of the DistributorContext.

Workflow

(aka Batch, aka Swarm).

Module Contents

client.workflow.logger
class client.workflow.DistributorContext(cluster_name: str, workflow_run_id: int, timeout: int)

Initialization of the DistributorContext.

wait_for_startup_signal(timeout: int = 180) bool

Wait for startup signal with non-blocking reads to handle timing issues.

alive() bool
static derive_jobmon_command_from_env() str | None

If a singularity path is provided, use it when running the worker node.

class client.workflow.Workflow(tool_version: jobmon.client.tool_version.ToolVersion, workflow_args: str = '', name: str = '', description: str = '', workflow_attributes: List | dict | None = None, max_concurrently_running: int = MaxConcurrentlyRunning.MAXCONCURRENTLYRUNNING, requester: jobmon.core.requester.Requester | None = None, chunk_size: int = 500)

Bases: object

(aka Batch, aka Swarm).

A Workflow is a framework by which a user may define the relationship between tasks and define the relationship between multiple runs of the same set of tasks. The great benefit of the Workflow is that it’s resumable. A Workflow can only be re-loaded if two things are shown to be exact matches to a previous Workflow:

  1. WorkflowArgs: It is recommended to pass a meaningful unique identifier

    to workflow_args, to ease resuming. However, if the Workflow is a one-off project, you may instantiate the Workflow anonymously, without WorkflowArgs. Under the hood, the WorkflowArgs will default to a UUID which, as it is randomly generated, will be harder to remember and thus harder to resume.

    Workflow args must be hashable. For example, CodCorrect or Como version might be passed as Args to the Workflow. For now, the assumption is WorkflowArgs is a string.

  2. The tasks added to the workflow. A Workflow is built up by

    using Workflow.add_task(). In order to resume a Workflow, all the same tasks must be added with the same dependencies between tasks.

Initialization of the client workflow.

Parameters:
  • tool_version – ToolVersion this workflow is associated

  • workflow_args – Unique identifier of a workflow

  • name – Name of the workflow

  • description – Description of the workflow

  • workflow_attributes – Attributes that make this workflow different from other workflows that the user wants to record.

  • max_concurrently_running – How many running jobs to allow in parallel

  • requester – object to communicate with the FastApi services.

  • chunk_size – how many tasks to bind in a single request

  • default_max_attempts – the default max attempts of the workflow for each array

name = ''
description = ''
max_concurrently_running: int = 2147483647
requester = None
tasks: Dict[int, jobmon.client.task.Task]
arrays: Dict[str, jobmon.client.array.Array]
workflow_args_hash = 0
workflow_attributes: Dict[str, Any]
default_cluster_name: str = ''
default_compute_resources_set: Dict[str, Dict[str, Any]]
default_resource_scales_set: Dict[str, Dict[str, float]]
last_workflow_run_id: int | None = None
error: BaseException | None = None
property tool: jobmon.client.tool.Tool

Returns the associated tool to this workflow.

property is_bound: bool

If the workflow has been bound to the db.

property workflow_id: int

If the workflow is bound then it will have been given an id.

property dag_id: int

If it has been bound, it will have an associated dag_id.

property task_hash: int

Hash of all of the tasks.

property task_errors: Dict

Return a dict of error associated with a task.

property default_max_attempts: int | None

Return the workflow default max attempts.

add_attributes(workflow_attributes: dict) None

Users can call either to update values of existing attributes or add new attributes.

Parameters:

workflow_attributes – attributes to be bound to the db that describe this workflow.

add_task(task: jobmon.client.task.Task) jobmon.client.task.Task

Add a task to the workflow to be executed.

Set semantics - add tasks once only, based on hash name.

Parameters:

task – single task to add.

add_tasks(tasks: Iterable[jobmon.client.task.Task]) None

Add a list of task to the workflow to be executed.

set_default_compute_resources_from_yaml(cluster_name: str, yaml_file: str) None

Set default compute resources from a user provided yaml file for workflow level.

TODO: Implement this method.

Parameters:
  • cluster_name – name of cluster to set default values for.

  • yaml_file – the yaml file that is providing the compute resource values.

set_default_compute_resources_from_dict(cluster_name: str, dictionary: Dict[str, Any]) None

Set default compute resources for a given cluster_name.

Parameters:
  • cluster_name – name of cluster to set default values for.

  • dictionary – dictionary of default compute resources to run tasks with. Can be overridden at task template, tool or task level.

set_default_resource_scales_from_dict(cluster_name: str, dictionary: Dict[str, float]) None

Set default resource scales for a given cluster_name.

Parameters:
  • cluster_name – name of cluster to set default values for.

  • dictionary – dictionary of default resource scales to adjust task resources with. Can be overridden at task template or task level.

set_default_cluster_name(cluster_name: str) None

Set the default cluster.

Parameters:

cluster_name – name of cluster to set as default.

set_default_max_attempts(value: int) None

Set the max attempts.

Parameters:

value – value of max_attempts.

get_tasks_by_node_args(task_template_name: str, **kwargs: Any) List[jobmon.client.task.Task]

Query tasks by node args. Used for setting dependencies.

set_max_concurrently_running(task_template_name: str, max_concurrently_running: int) None
run(fail_fast: bool = False, seconds_until_timeout: int = 36000, resume: bool = False, reset_running_jobs: bool = True, distributor_startup_timeout: int = 180, resume_timeout: int = 300, configure_logging: bool = False) str | None

Run the workflow.

Traverse the dag and submitting new tasks when their tasks have completed successfully.

Parameters:
  • fail_fast – whether to break out of distributor on first failure.

  • seconds_until_timeout – amount of time (in seconds) to wait until the whole workflow times out. Submitted jobs will continue

  • resume – whether the workflow should be resumed or not, if it is not set to resume and an identical workflow already exists, the workflow will error out

  • reset_running_jobs – whether or not to reset running jobs upon resume

  • distributor_startup_timeout – amount of time to wait for the distributor process to start up

  • resume_timeout – seconds to wait for a workflow to become resumable before giving up

  • configure_logging – setup jobmon client logging. If False, no logging will be configured. If True, automatic component logging will be configured.

Returns:

str of WorkflowRunStatus

set_task_template_max_concurrency_limit(task_template_name: str, limit: int) None
validate(strict: bool = True, raise_on_error: bool = False) None

Confirm that the tasks in this workflow are valid.

This method will: - access the database to confirm the requested resources are valid for the specified cluster - confirm that the workflow args are valid - make sure no task contains up/down stream tasks that are not in the workflow

bind() None

Get a workflow_id.

get_errors(limit: int = 1000) Dict[int, Dict[str, int | List[Dict[str, str | int]]]] | None

Method to get all errors.

Return a dictionary with the erring task_id as the key, and the Task.get_errors content as the value. When limit is specifically set as None from the client, this return set will pass back all the erred tasks in the workflow.

get_cluster_by_name(cluster_name: str) jobmon.core.cluster.Cluster

Check if the cluster that the task specified is in the cache.

If the cluster is not in the cache, create it and add to cache.