Workflows

A Workflow is the top-level container for your computational pipeline. It encompasses all your tasks and their dependencies.

Note

This page is being developed. For now, see Core Concepts for workflow basics and Advanced Usage for advanced patterns.

Creating Workflows

Basic workflow creation:

from jobmon.client.tool import Tool

tool = Tool(name="my_application")

workflow = tool.create_workflow(
    name="my_workflow",
    workflow_args="version_1"
)

Workflow Arguments

The workflow_args parameter uniquely identifies your workflow:

  • Same args = same workflow (can resume)

  • Different args = different workflow (fresh start)

# These are two different workflows
wf1 = tool.create_workflow(name="pipeline", workflow_args="v1")
wf2 = tool.create_workflow(name="pipeline", workflow_args="v2")

# This resumes wf1 if it exists
wf1_resumed = tool.create_workflow(name="pipeline", workflow_args="v1")

Workflow Attributes

Track additional metadata with your workflow:

workflow = tool.create_workflow(
    name="my_workflow",
    workflow_args="v1",
    workflow_attributes={
        "release_id": 123,
        "description": "Production run for Q4"
    }
)

Running Workflows

Basic run:

result = workflow.run()

With options:

status = workflow.run(
    resume=True,               # Resume if workflow exists
    fail_fast=True,            # Stop on first failure
    seconds_until_timeout=7200 # 2 hour timeout
)

Checking Results

The run() method returns a status string:

from jobmon.core.constants import WorkflowRunStatus

status = workflow.run()

if status == WorkflowRunStatus.DONE:
    print("Workflow completed successfully!")
elif status == WorkflowRunStatus.ERROR:
    print("Workflow failed - check task statuses")

# Use CLI or GUI to inspect failures:
# jobmon workflow_tasks -w <workflow_id> -s FATAL

Resuming Workflows

Workflows can be resumed after failures:

By Recreating

# Use the same workflow_args
workflow = tool.create_workflow(
    name="my_workflow",
    workflow_args="v1"
)
workflow.add_tasks([...])
workflow.run(resume=True)

By ID (CLI)

jobmon workflow_resume -w <workflow_id> -c slurm

For more details, see Resuming a Workflow in the advanced usage guide.

Concurrency Control

Limit concurrent tasks:

workflow = tool.create_workflow(
    name="my_workflow",
    max_concurrently_running=500
)

See Also