Tasks

Tasks are the fundamental unit of work in Jobmon. Each task represents a single command to be executed.

Note

This page is being developed. For now, see Core Concepts for task basics and Advanced Usage for advanced patterns.

Task Templates

Before creating tasks, you define a TaskTemplate:

template = tool.get_task_template(
    template_name="process_data",
    command_template="python process.py --input {input_file} --output {output_file}",
    node_args=["input_file"],      # Unique per task
    task_args=["output_file"],     # Unique per workflow run
    op_args=[],                    # Operational (script paths, etc.)
)

Understanding Arguments

  • node_args: Make each task unique within a workflow. Typically parallelization dimensions like location_id, year.

  • task_args: Change between workflow runs but don’t affect task uniqueness. Typically data versions or release IDs.

  • op_args: Operational arguments that don’t affect task identity. Typically script paths or log verbosity.

Creating Tasks

Create tasks from templates:

task = template.create_task(
    name="process_location_1",
    input_file="/data/loc_1.csv",
    output_file="/output/loc_1.csv",
    compute_resources={
        "cores": 2,
        "memory": "8G",
        "runtime": "1h",
    }
)

Task Naming

Task names should be descriptive and unique within a workflow:

# Good: descriptive and unique
task = template.create_task(name=f"process_loc_{location_id}_year_{year}", ...)

# Bad: not unique
task = template.create_task(name="process", ...)

Dependencies

Tasks can depend on other tasks:

# task2 runs after task1 completes
task2.add_upstream(task1)

Multiple dependencies:

# aggregate_task waits for all process_tasks
for task in process_tasks:
    aggregate_task.add_upstream(task)

Or set during task creation:

task2 = template.create_task(
    name="downstream",
    upstream_tasks=[task1, task3],
    ...
)

Task Attributes

Track custom metadata:

task = template.create_task(
    name="my_task",
    task_attributes={
        "location_id": 1,
        "model_version": "2.1"
    },
    ...
)

Retries

Configure retry behavior:

task = template.create_task(
    name="my_task",
    max_attempts=3,  # Retry up to 3 times
    ...
)

For resource-related failures, Jobmon automatically scales resources before retrying.

See Compute Resources for details on resource scaling.

Checking Task Status

After running a workflow:

# Get resource usage for a completed task
usage = task.resource_usage()
print(f"Memory used: {usage['memory']} bytes")
print(f"Runtime: {usage['runtime']} seconds")
print(f"Attempts: {usage['num_attempts']}")

See Also