Quickstart
Jobmon is a job-control system used for automating scientific workflows and running them on distributed computing systems. It manages complex job and resource dependencies and manages computing environment instability, ensuring dependably and assisting in troubleshooting when needed. It is developed and maintained by IHME’s Scientific Computing team.
Jobmon’s vision is to make it as easy as possible for everyone to run any kind of code on any compute platform, reliably, and efficiently. Jobmon should allow people to sleep easily on the weekend because they do not have to manually monitor their applications.
Installation at IHME
Plugins
Jobmon runs jobs on IHME’s Slurm cluster. Jobmon can also execute Tasks locally on a single machine using either sequential execution or multiprocessing, although these operating modes are really only suitable for testing or proof of concept.
To use either of the clusters with Jobmon users need to install their Jobmon plugin. If a user wants to use Slurm with Jobmon, they would need to have the core Jobmon software and the Jobmon Slurm plugin installed.
Users can either: 1. install Jobmon core and the plugins individually using “pip,” or 2. install Jobmon core and the Slurm plugin together with a single conda command.
Conda install
To install core Jobmon and both plugins using conda:
conda install ihme_jobmon -k --channel https://artifactory.ihme.washington.edu/artifactory/api/conda/conda-scicomp --channel conda-forge
Pip install
To install just core jobmon (no cluster plugins) via pip:
pip install jobmon
To install the preconfigured Slurm plugin:
pip install jobmon_installer_ihme
To install both at once via pip:
pip install jobmon[ihme]
Then issue a “jobmon_config update” command to configure the web service and port, as described on the hub at Jobmon Conda Versions
Note
If you get the error “Could not find a version that satisfies the requirement jobmon (from version: )” then create (or append) the following to your ~/.pip/pip.conf:
[global]
extra-index-url = https://artifactory.ihme.washington.edu/artifactory/api/pypi/pypi-shared/simple
trusted-host = artifactory.ihme.washington.edu
Jobmon Learning
- For a deeper dive in to Jobmon, check out some of our courses:
Check IHME Learn to see if there are any upcoming trainings.
Jobmon GUI
You can view all your workflows, monitor their progress, and dive into the details of their tasks using the Jobmon GUI.
Getting Started
The Jobmon controller script (i.e. the code defining the workflow) must be written in Python or R. The modeling code can be in Python, R, Stata, C++, or in fact any language.
The controller script interacts with Jobmon by creating a Workflow and then iteratively adding Task to it. Each Workflow is uniquely defined by its WorkflowArgs and its set of Tasks.
Jobmon allows you to resume workflows (see Resuming a Workflow). A Workflow can only be resumed if the WorkflowArgs and all Tasks added to it are exact matches to the previous Workflow.
Create a Workflow
A Workflow is essentially a set of Tasks, their configuration details, and the dependencies between them. For example, a series of jobs that models one disease could be a Workflow.
A task is a single executable object in the workflow; a command that will be run.
A dependency from Task A to Task B means that B will not execute until A has successfully completed. We say that Task A is upstream of Task B. Conversely, Task B is downstream of Task A. If A always fails (up to its retry limit) then B will never be started, and the Workflow as a whole will fail.
In general a task can have many upstreams. A Task will not start until all of its upstreams have successfully completed, potentially after multiple attempts.
The Tasks and their dependencies form a directed-acyclic graph (DAG). where the tasks are the nodes, and the edges are the dependencies.
For more about the objects go to Core Concepts.
import getpass
import uuid
from jobmon.client.tool import Tool
"""
TODO
1. Add injectable variables for IHME specific file paths and URLs
2. Replicate this file in the training repo?
3. R versions
Instructions:
This workflow is similar to many IHME modelling workflows, with a class three-phase fork-and-join task flow:
1. A data preparation phase, with one job. In a real modelling pipeline this would split a large input file
input file into manageable pieces and clean the data. In our example this task generates dummy
intermediate files for the next phase.
2. A broad phase with one Task per (dummy) location_id.
3. A summarization phase consisting of a single Task that reads the intermediate results from the individual
location models.
The steps in this example are:
1. Create a tool
2. Create a workflow using the tool from step 1
3. Create three task templates using the tool from step 1
a. Template for the Data Prep Task
b. Template for the separate location tasks
c. Template for the Summarization Phase
4. Create tasks using the templates from step 3
a. Add the necessary edge dependencies
5. Add created tasks to the workflow
6. Run the workflow
To actually run the provided example:
Make sure Jobmon is installed in your activated conda environment, and that you're on
the Slurm cluster in a srun session. From the root of the repo, run:
$ python training_scripts/workflow_template_example.py
"""
user = getpass.getuser()
wf_uuid = uuid.uuid4()
# Create a tool
tool = Tool(name="quickstart_tool_python")
# Create a workflow, and set the executor
workflow = tool.create_workflow(
name=f"quickstart_workflow_{wf_uuid}",
)
# Create task templates
data_prep_template = tool.get_task_template(
default_compute_resources={
"queue": "all.q",
"cores": 1,
"memory": "1G",
"runtime": "1m",
"stdout": "/tmp",
"stderr": "/tmp",
"project": "my_slurm_account",
},
template_name="quickstart_data_prep_template",
default_cluster_name="slurm",
command_template="python "
"/code_dir/docsource/quickstart_tasks/data_prep.py "
"--location_set_id {location_set_id} "
"--root_data_dir {root_data_dir} "
"--log_level {log_level}",
node_args=[],
task_args=["location_set_id", "root_data_dir"],
op_args=["log_level"],
)
parallel_by_location_template = tool.get_task_template(
default_compute_resources={
"queue": "all.q",
"cores": 2,
"memory": "1G",
"runtime": "10m",
"stdout": "/tmp",
"stderr": "/tmp",
"project": "my_slurm_account"
},
template_name="quickstart_location_template",
default_cluster_name="slurm",
command_template="python "
"/code_dir/docsource/quickstart_tasks/one_location.py "
"--location_id {location_id} "
"--root_data_dir {root_data_dir} "
"--log_level {log_level} ",
node_args=["location_id"],
task_args=["root_data_dir"],
op_args=["log_level"],
)
summarization_template = tool.get_task_template(
default_compute_resources={
"queue": "all.q",
"cores": 2,
"memory": "1G",
"runtime": "10m",
"stdout": "/tmp",
"stderr": "/tmp",
"project": "my_slurm_account"
},
template_name="quickstart_summarization_template",
default_cluster_name="slurm",
command_template="python /code_dir/docsource/quickstart_tasks/summarization.py "
"--root_data_dir {root_data_dir} "
"--log_level {log_level}",
node_args=[],
task_args=["root_data_dir"],
op_args=["log_level"],
)
# Create tasks
location_set_id = 5
location_set = list(range(location_set_id))
root_data_dir = f"/home/{user}/quickstart/data"
data_prep_task = data_prep_template.create_task(
name="data_prep_task",
upstream_tasks=[],
root_data_dir=root_data_dir,
location_set_id=location_set_id,
log_level="DEBUG"
)
workflow.add_tasks([data_prep_task])
location_tasks = parallel_by_location_template.create_tasks(
root_data_dir=root_data_dir,
location_id=location_set,
log_level="DEBUG"
)
workflow.add_tasks(location_tasks)
summarization_task = summarization_template.create_task(
root_data_dir=root_data_dir,
log_level="DEBUG"
)
workflow.add_tasks([summarization_task])
# Connect the dependencies. Notice the use of get_tasks_by_node_args
for loc_id in location_set:
foo = {"location_id": loc_id}
single_task = workflow.get_tasks_by_node_args("quickstart_location_template", **foo)
# Notice it returns a set. Should only be one
single_task[0].add_upstream(data_prep_task)
summarization_task.add_upstream(single_task[0])
# Calling workflow.bind() first just so that we can get the workflow id
workflow.bind()
print("Workflow creation complete.")
print(f"Running workflow with ID {workflow.workflow_id}.")
print("If you have a Jobmon GUI deployed, see the Jobmon GUI for full information:")
print(f"https://jobmon-gui.mydomain.com/#/workflow/{workflow.workflow_id}/tasks")
# run workflow
status = workflow.run()
print(f"Workflow {workflow.workflow_id} completed with status {status}.")
library(jobmonr)
# Create a workflow
username <- Sys.getenv("USER")
# Parameters for the workflow
root_data_dir <- paste0("/home/", username, "/quickstart/data")
location_set_id <- as.integer(5)
location_ids <- seq(from=0, to=location_set_id)
# Create a tool
my_tool <- tool(name="r_quickstart_tool_R")
# Set the tool compute resources
jobmonr::set_default_tool_resources(
tool=my_tool,
default_cluster_name="slurm",
resources=list(
"cores"=1,
"queue"="all.q",
"runtime"="2m",
"memory"="1G",
"project"="proj_scicomp"
)
)
# Bind a workflow to the tool
wf <- workflow(my_tool, name=paste0("quickstart_workflow_", Sys.Date()))
# Create template to run our script
data_prep_template <- task_template(tool=my_tool,
template_name="quickstart_data_prep_template",
command_template=paste(
Sys.getenv("RETICULATE_PYTHON"),
"/code_dir/docsource/quickstart_tasks/data_prep.py",
"--location_set_id {location_set_id}",
"--root_data_dir {root_data_dir}",
"--log_level {log_level}",
sep=" "),
task_args=list("location_set_id", "root_data_dir"),
op_args=list("log_level"))
# Optional: default resources can be updated at the task or task template level
jobmonr::set_default_template_resources(
task_template=data_prep_template,
default_cluster_name="slurm",
resources=list(
"queue"="all.q",
"constraints"="archive"
)
)
parallel_by_location_template <- task_template(tool=my_tool,
template_name="quickstart_location_template",
command_template=paste(
Sys.getenv("RETICULATE_PYTHON"),
"/code_dir/docsource/quickstart_tasks/one_location.py",
"--location_id {location_id}",
"--root_data_dir {root_data_dir}",
"--log_level {log_level}",
sep=" "),
node_args=list("location_id"),
task_args=list("root_data_dir"),
op_args=list("log_level"))
summarization_template <- task_template(tool=my_tool,
template_name="quickstart_summarization_template",
command_template=paste(
Sys.getenv("RETICULATE_PYTHON"),
"/code_dir/docsource/quickstart_tasks/summarization.py",
"--root_data_dir {root_data_dir}",
"--log_level {log_level}",
sep=" "),
task_args=list("root_data_dir"),
op_args=list("log_level"))
# Now create the tasks
data_prep_task <- task(task_template=data_prep_template,
name="data_prep_task",
root_data_dir=root_data_dir,
location_set_id=location_set_id,
log_level="DEBUG")
# Add tasks to the workflow
wf <- add_tasks(wf, list(data_prep_task))
# Create all the location tasks in one call
location_tasks <- array_tasks(
task_template=parallel_by_location_template,
name="location_task_",
upstream_tasks=list(data_prep_task),
location_id=location_ids,
root_data_dir=root_data_dir,
log_level="DEBUG")
wf <- add_tasks(wf, location_tasks)
summarization_task <- task(task_template=summarization_template,
name="summarization_task",
upstream_tasks=location_tasks,
root_data_dir=root_data_dir,
log_level="DEBUG")
wf <- add_tasks(wf, list(summarization_task))
# Run it
wfr <- run(
workflow=wf,
resume=FALSE,
seconds_until_timeout=7200)
Constructing a Workflow and adding a few Tasks is simple:
Note
Unique Workflows: If you know that your Workflow is to be used for a one-off project only, you may choose to use an anonymous Workflow, meaning you leave workflow_args blank. In this case, WorkflowArgs will default to a UUID which, as it is randomly generated, will be harder to remember and thus is not recommended for use cases outside of the one-off project. A workflow’s uniqueness is based on its command, upstreams and downstreams, and workflow_args.
Compute Resources
Compute Resources are used to allocate resources to your tasks. You can specify memory, cores, runtime, queue, stdout, stderr, and project.
For IHME’s Slurm cluster the defaults for all queues are: * One core * 1G memory, and * Ten minutes runtime.
These values might change in the future.
You can specify that you want to run your jobs on an “archive” node
(i.e., a node with access to /snfs1
a.k.a “the J-drive”). Add the following key value pair to
their compute resources: "constraints": "archive".
Note
By default Workflows are set to time out if all of your tasks haven’t completed after 10 hours (36,000 seconds). If your Workflow times out before your tasks have finished running, those tasks will continue running, but you will need to restart your Workflow again. You can change the Workflow timeout period if your tasks combined run longer than 10 hours.
Getting Additional Help
Inside IHME
The Scientific Computing team is always available to answer your questions or to consult on Jobmon.
- To contact the team via Slack:
#jobmon-users to ask questions or raise concerns about Jobmon.
- To set up a consultation:
Create a Help Desk ticket asking for a consultation: SciComp Help Desk.
A Scientific Computing team member will reach out to you to schedule a consultation meeting.
- To raise a Scientific Computing help desk request:
When requesting help try to provide the team with as much information as you have about your problem. Please include your Workflow id, the Jobmon version that you’re using, and any TaskInstance error logs that you have.
Important announcements are on the slack channel #jobmon-announce
Outside IHME
Please use GitHub to communicate your issue.