client.workflow
The overarching framework to create tasks and dependencies within.
Attributes
Classes
Initialization of the DistributorContext. |
|
(aka Batch, aka Swarm). |
Module Contents
- client.workflow.logger
- class client.workflow.DistributorContext(cluster_name: str, workflow_run_id: int, timeout: int)
Initialization of the DistributorContext.
- class client.workflow.Workflow(tool_version: jobmon.client.tool_version.ToolVersion, workflow_args: str = '', name: str = '', description: str = '', workflow_attributes: List | dict | None = None, max_concurrently_running: int = MaxConcurrentlyRunning.MAXCONCURRENTLYRUNNING, requester: jobmon.core.requester.Requester | None = None, chunk_size: int = 500)
Bases:
object(aka Batch, aka Swarm).
A Workflow is a framework by which a user may define the relationship between tasks and define the relationship between multiple runs of the same set of tasks. The great benefit of the Workflow is that it’s resumable. A Workflow can only be re-loaded if two things are shown to be exact matches to a previous Workflow:
- WorkflowArgs: It is recommended to pass a meaningful unique identifier
to workflow_args, to ease resuming. However, if the Workflow is a one-off project, you may instantiate the Workflow anonymously, without WorkflowArgs. Under the hood, the WorkflowArgs will default to a UUID which, as it is randomly generated, will be harder to remember and thus harder to resume.
Workflow args must be hashable. For example, CodCorrect or Como version might be passed as Args to the Workflow. For now, the assumption is WorkflowArgs is a string.
- The tasks added to the workflow. A Workflow is built up by
using Workflow.add_task(). In order to resume a Workflow, all the same tasks must be added with the same dependencies between tasks.
Initialization of the client workflow.
- Parameters:
tool_version – ToolVersion this workflow is associated
workflow_args – Unique identifier of a workflow
name – Name of the workflow
description – Description of the workflow
workflow_attributes – Attributes that make this workflow different from other workflows that the user wants to record.
max_concurrently_running – How many running jobs to allow in parallel
requester – object to communicate with the FastApi services.
chunk_size – how many tasks to bind in a single request
default_max_attempts – the default max attempts of the workflow for each array
- name = ''
- description = ''
- requester = None
- workflow_args_hash = 0
- error: BaseException | None = None
- property tool: jobmon.client.tool.Tool
Returns the associated tool to this workflow.
- property task_errors: Dict
Return a dict of error associated with a task.
- add_attributes(workflow_attributes: dict) None
Users can call either to update values of existing attributes or add new attributes.
- Parameters:
workflow_attributes – attributes to be bound to the db that describe this workflow.
- add_task(task: jobmon.client.task.Task) jobmon.client.task.Task
Add a task to the workflow to be executed.
Set semantics - add tasks once only, based on hash name.
- Parameters:
task – single task to add.
- add_tasks(tasks: Iterable[jobmon.client.task.Task]) None
Add a list of task to the workflow to be executed.
- set_default_compute_resources_from_yaml(cluster_name: str, yaml_file: str) None
Set default compute resources from a user provided yaml file for workflow level.
TODO: Implement this method.
- Parameters:
cluster_name – name of cluster to set default values for.
yaml_file – the yaml file that is providing the compute resource values.
- set_default_compute_resources_from_dict(cluster_name: str, dictionary: Dict[str, Any]) None
Set default compute resources for a given cluster_name.
- Parameters:
cluster_name – name of cluster to set default values for.
dictionary – dictionary of default compute resources to run tasks with. Can be overridden at task template, tool or task level.
- set_default_resource_scales_from_dict(cluster_name: str, dictionary: Dict[str, float]) None
Set default resource scales for a given cluster_name.
- Parameters:
cluster_name – name of cluster to set default values for.
dictionary – dictionary of default resource scales to adjust task resources with. Can be overridden at task template or task level.
- set_default_cluster_name(cluster_name: str) None
Set the default cluster.
- Parameters:
cluster_name – name of cluster to set as default.
- set_default_max_attempts(value: int) None
Set the max attempts.
- Parameters:
value – value of max_attempts.
- get_tasks_by_node_args(task_template_name: str, **kwargs: Any) List[jobmon.client.task.Task]
Query tasks by node args. Used for setting dependencies.
- run(fail_fast: bool = False, seconds_until_timeout: int = 36000, resume: bool = False, reset_running_jobs: bool = True, distributor_startup_timeout: int = 180, resume_timeout: int = 300, configure_logging: bool = False) str | None
Run the workflow.
Traverse the dag and submitting new tasks when their tasks have completed successfully.
- Parameters:
fail_fast – whether to break out of distributor on first failure.
seconds_until_timeout – amount of time (in seconds) to wait until the whole workflow times out. Submitted jobs will continue
resume – whether the workflow should be resumed or not, if it is not set to resume and an identical workflow already exists, the workflow will error out
reset_running_jobs – whether or not to reset running jobs upon resume
distributor_startup_timeout – amount of time to wait for the distributor process to start up
resume_timeout – seconds to wait for a workflow to become resumable before giving up
configure_logging – setup jobmon client logging. If False, no logging will be configured. If True, automatic component logging will be configured.
- Returns:
str of WorkflowRunStatus
- validate(strict: bool = True, raise_on_error: bool = False) None
Confirm that the tasks in this workflow are valid.
This method will: - access the database to confirm the requested resources are valid for the specified cluster - confirm that the workflow args are valid - make sure no task contains up/down stream tasks that are not in the workflow
- get_errors(limit: int = 1000) Dict[int, Dict[str, int | List[Dict[str, str | int]]]] | None
Method to get all errors.
Return a dictionary with the erring task_id as the key, and the Task.get_errors content as the value. When limit is specifically set as None from the client, this return set will pass back all the erred tasks in the workflow.