Quickstart

Jobmon is a job-control system used for automating scientific workflows and running them on distributed computing systems. It manages complex job and resource dependencies and manages computing environment instability, ensuring dependably and assisting in troubleshooting when needed. It is developed and maintained by IHME’s Scientific Computing team.

Jobmon’s vision is to make it as easy as possible for everyone to run any kind of code on any compute platform, reliably, and efficiently. Jobmon should allow people to sleep easily on the weekend because they do not have to manually monitor their applications.

Installation at IHME

Plugins

Jobmon runs jobs on IHME’s Slurm cluster. Jobmon can also execute Tasks locally on a single machine using either sequential execution or multiprocessing, although these operating modes are really only suitable for testing or proof of concept.

To use either of the clusters with Jobmon users need to install their Jobmon plugin. If a user wants to use Slurm with Jobmon, they would need to have the core Jobmon software and the Jobmon Slurm plugin installed.

Users can either: 1. install Jobmon core and the plugins individually using “pip,” or 2. install Jobmon core and the Slurm plugin together with a single conda command.

Conda install

To install core Jobmon and both plugins using conda:

conda install ihme_jobmon -k --channel https://artifactory.ihme.washington.edu/artifactory/api/conda/conda-scicomp --channel conda-forge

Pip install

To install just core jobmon (no cluster plugins) via pip:

pip install jobmon

To install the preconfigured Slurm plugin:

pip install jobmon_installer_ihme

To install both at once via pip:

pip install jobmon[ihme]

Then issue a “jobmon_config update” command to configure the web service and port, as described on the hub at Jobmon Conda Versions

Note

If you get the error “Could not find a version that satisfies the requirement jobmon (from version: )” then create (or append) the following to your ~/.pip/pip.conf:

[global]
extra-index-url = https://artifactory.ihme.washington.edu/artifactory/api/pypi/pypi-shared/simple
trusted-host = artifactory.ihme.washington.edu

Jobmon Learning

For a deeper dive in to Jobmon, check out some of our courses:
  1. About Jobmon

  2. Learn Jobmon

  3. Jobmon Retry

Check IHME Learn to see if there are any upcoming trainings.

Jobmon GUI

You can view all your workflows, monitor their progress, and dive into the details of their tasks using the Jobmon GUI.

Getting Started

The Jobmon controller script (i.e. the code defining the workflow) must be written in Python or R. The modeling code can be in Python, R, Stata, C++, or in fact any language.

The controller script interacts with Jobmon by creating a Workflow and then iteratively adding Task to it. Each Workflow is uniquely defined by its WorkflowArgs and its set of Tasks.

Jobmon allows you to resume workflows (see Resuming a Workflow). A Workflow can only be resumed if the WorkflowArgs and all Tasks added to it are exact matches to the previous Workflow.

Create a Workflow

A Workflow is essentially a set of Tasks, their configuration details, and the dependencies between them. For example, a series of jobs that models one disease could be a Workflow.

A task is a single executable object in the workflow; a command that will be run.

A dependency from Task A to Task B means that B will not execute until A has successfully completed. We say that Task A is upstream of Task B. Conversely, Task B is downstream of Task A. If A always fails (up to its retry limit) then B will never be started, and the Workflow as a whole will fail.

In general a task can have many upstreams. A Task will not start until all of its upstreams have successfully completed, potentially after multiple attempts.

The Tasks and their dependencies form a directed-acyclic graph (DAG). where the tasks are the nodes, and the edges are the dependencies.

For more about the objects go to Core Concepts.

import getpass
import uuid

from jobmon.client.tool import Tool

"""
TODO
1. Add injectable variables for IHME specific file paths and URLs
2. Replicate this file in the training repo? 
3. R versions

Instructions:

  This workflow is similar to many IHME modelling workflows, with a class three-phase fork-and-join task flow:
  1. A data preparation phase, with one job. In a real modelling pipeline this would split a large input file
     input file into manageable pieces and clean the data. In our example this task generates dummy
     intermediate files for the next phase.
  2. A broad phase with one Task per (dummy) location_id.
  3. A summarization phase consisting of a single Task that reads the intermediate results from the individual
     location models.

  The steps in this example are:
  1. Create a tool
  2. Create a workflow using the tool from step 1
  3. Create three task templates using the tool from step 1
     a. Template for the Data Prep Task
     b. Template for the separate location tasks
     c. Template for the Summarization Phase
  4. Create tasks using the templates from step 3
     a. Add the necessary edge dependencies
  5. Add created tasks to the workflow
  6. Run the workflow

To actually run the provided example:
  Make sure Jobmon is installed in your activated conda environment, and that you're on
  the Slurm cluster in a srun session. From the root of the repo, run:
     $ python training_scripts/workflow_template_example.py
"""

user = getpass.getuser()
wf_uuid = uuid.uuid4()

# Create a tool
tool = Tool(name="quickstart_tool_python")

# Create a workflow, and set the executor
workflow = tool.create_workflow(
    name=f"quickstart_workflow_{wf_uuid}",
)

# Create task templates

data_prep_template = tool.get_task_template(
    default_compute_resources={
        "queue": "all.q",
        "cores": 1,
        "memory": "1G",
        "runtime": "1m",
        "stdout": "/tmp",
        "stderr": "/tmp",
        "project": "my_slurm_account",
    },
    template_name="quickstart_data_prep_template",
    default_cluster_name="slurm",
    command_template="python "
                     "/code_dir/docsource/quickstart_tasks/data_prep.py "
                     "--location_set_id {location_set_id} "
                     "--root_data_dir {root_data_dir} "
                     "--log_level {log_level}",
    node_args=[],
    task_args=["location_set_id", "root_data_dir"],
    op_args=["log_level"],
)

parallel_by_location_template = tool.get_task_template(
    default_compute_resources={
        "queue": "all.q",
        "cores": 2,
        "memory": "1G",
        "runtime": "10m",
        "stdout": "/tmp",
        "stderr": "/tmp",
        "project": "my_slurm_account"
    },
    template_name="quickstart_location_template",
    default_cluster_name="slurm",
    command_template="python "
                     "/code_dir/docsource/quickstart_tasks/one_location.py "
                     "--location_id {location_id} "
                     "--root_data_dir {root_data_dir} "
                     "--log_level {log_level} ",
    node_args=["location_id"],
    task_args=["root_data_dir"],
    op_args=["log_level"],
)

summarization_template = tool.get_task_template(
    default_compute_resources={
        "queue": "all.q",
        "cores": 2,
        "memory": "1G",
        "runtime": "10m",
        "stdout": "/tmp",
        "stderr": "/tmp",
        "project": "my_slurm_account"
    },
    template_name="quickstart_summarization_template",
    default_cluster_name="slurm",
    command_template="python /code_dir/docsource/quickstart_tasks/summarization.py "
                     "--root_data_dir {root_data_dir} "
                     "--log_level {log_level}",
    node_args=[],
    task_args=["root_data_dir"],
    op_args=["log_level"],
)


# Create tasks
location_set_id = 5
location_set = list(range(location_set_id))
root_data_dir = f"/home/{user}/quickstart/data"

data_prep_task = data_prep_template.create_task(
    name="data_prep_task",
    upstream_tasks=[],
    root_data_dir=root_data_dir,
    location_set_id=location_set_id,
    log_level="DEBUG"
)
workflow.add_tasks([data_prep_task])

location_tasks = parallel_by_location_template.create_tasks(
    root_data_dir=root_data_dir,
    location_id=location_set,
    log_level="DEBUG"
)
workflow.add_tasks(location_tasks)

summarization_task = summarization_template.create_task(
    root_data_dir=root_data_dir,
    log_level="DEBUG"
)
workflow.add_tasks([summarization_task])

# Connect the dependencies. Notice the use of get_tasks_by_node_args

for loc_id in location_set:
    foo = {"location_id": loc_id}
    single_task = workflow.get_tasks_by_node_args("quickstart_location_template", **foo)
    # Notice it returns a set. Should only be one
    single_task[0].add_upstream(data_prep_task)
    summarization_task.add_upstream(single_task[0])

# Calling workflow.bind() first just so that we can get the workflow id
workflow.bind()
print("Workflow creation complete.")
print(f"Running workflow with ID {workflow.workflow_id}.")
print("If you have a Jobmon GUI deployed, see the Jobmon GUI for full information:")
print(f"https://jobmon-gui.mydomain.com/#/workflow/{workflow.workflow_id}/tasks")


# run workflow
status = workflow.run()
print(f"Workflow {workflow.workflow_id} completed with status {status}.")

Constructing a Workflow and adding a few Tasks is simple:

Note

Unique Workflows: If you know that your Workflow is to be used for a one-off project only, you may choose to use an anonymous Workflow, meaning you leave workflow_args blank. In this case, WorkflowArgs will default to a UUID which, as it is randomly generated, will be harder to remember and thus is not recommended for use cases outside of the one-off project. A workflow’s uniqueness is based on its command, upstreams and downstreams, and workflow_args.

Compute Resources

Compute Resources are used to allocate resources to your tasks. You can specify memory, cores, runtime, queue, stdout, stderr, and project.

For IHME’s Slurm cluster the defaults for all queues are: * One core * 1G memory, and * Ten minutes runtime.

These values might change in the future.

You can specify that you want to run your jobs on an “archive” node (i.e., a node with access to /snfs1 a.k.a “the J-drive”). Add the following key value pair to their compute resources: "constraints": "archive".

Note

By default Workflows are set to time out if all of your tasks haven’t completed after 10 hours (36,000 seconds). If your Workflow times out before your tasks have finished running, those tasks will continue running, but you will need to restart your Workflow again. You can change the Workflow timeout period if your tasks combined run longer than 10 hours.

Getting Additional Help

Inside IHME

The Scientific Computing team is always available to answer your questions or to consult on Jobmon.

To contact the team via Slack:
  • #jobmon-users to ask questions or raise concerns about Jobmon.

To set up a consultation:
  • Create a Help Desk ticket asking for a consultation: SciComp Help Desk.

  • A Scientific Computing team member will reach out to you to schedule a consultation meeting.

To raise a Scientific Computing help desk request:

When requesting help try to provide the team with as much information as you have about your problem. Please include your Workflow id, the Jobmon version that you’re using, and any TaskInstance error logs that you have.

Important announcements are on the slack channel #jobmon-announce

Outside IHME

Please use GitHub to communicate your issue.