Quickstart
Jobmon is a job-control system used for automating scientific workflows and running them on distributed computing systems. It manages complex job and resource dependencies and manages computing environment instability, ensuring dependably and assisting in troubleshooting when needed. It is developed and maintained by IHME’s Scientific Computing team.
Jobmon’s vision is to make it as easy as possible for everyone to run any kind of code on any compute platform, reliably, and efficiently. Jobmon should allow people to sleep easily on the weekend because they do not have to manually monitor their applications.
Installation at IHME
Plugins
Jobmon runs jobs on IHME’s Slurm cluster. Jobmon can also execute Tasks locally on a single machine using either sequential execution or multiprocessing, although these operating modes are really only suitable for testing or proof of concept.
To use either of the clusters with Jobmon users need to install their Jobmon plugin. If a user wants to use Slurm with Jobmon, they would need to have the core Jobmon software and the Jobmon Slurm plugin installed.
Users can either: 1. install Jobmon core and the plugins individually using “pip,” or 2. install Jobmon core and the Slurm plugin together with a single conda command.
Conda install
To install core Jobmon and both plugins using conda:
conda install ihme_jobmon -k --channel https://artifactory.ihme.washington.edu/artifactory/api/conda/conda-scicomp --channel conda-forge
Pip install
To install just core jobmon (no cluster plugins) via pip:
pip install jobmon
To install the preconfigured Slurm plugin:
pip install jobmon_installer_ihme
To install both at once via pip:
pip install jobmon[ihme]
See versions at: Jobmon Conda Versions
Note
If you get the error “Could not find a version that satisfies the requirement jobmon (from version: )” then create (or append) the following to your ~/.pip/pip.conf:
[global]
extra-index-url = https://artifactory.ihme.washington.edu/artifactory/api/pypi/pypi-shared/simple
trusted-host = artifactory.ihme.washington.edu
New Releases
New Jobmon releases, including updates to jobmon_installer_ihme, will be announced in the #jobmon_users Slack channel. Release details- such as installer version, dates and included packages, are also available on the HUB at Jobmon Conda Versions.
Jobmon Learning
- For a deeper dive in to Jobmon, check out some of our courses:
Check IHME Learn to see if there are any upcoming trainings.
Jobmon GUI
You can view all your workflows, monitor their progress, and dive into the details of their tasks using the Jobmon GUI.
Getting Started
The Jobmon controller script (i.e. the code defining the workflow) must be written in Python or R. The modeling code can be in Python, R, Stata, C++, or in fact any language.
The controller script interacts with Jobmon by creating a Workflow and then iteratively adding Task to it. Each Workflow is uniquely defined by its WorkflowArgs and its set of Tasks.
Jobmon allows you to resume workflows (see Resuming a Workflow). A Workflow can only be resumed if the WorkflowArgs and all Tasks added to it are exact matches to the previous Workflow.
Create a Workflow
A Workflow is essentially a set of Tasks, their configuration details, and the dependencies between them. For example, a series of jobs that models one disease could be a Workflow.
A task is a single executable object in the workflow; a command that will be run.
A dependency from Task A to Task B means that B will not execute until A has successfully completed. We say that Task A is upstream of Task B. Conversely, Task B is downstream of Task A. If A always fails (up to its retry limit) then B will never be started, and the Workflow as a whole will fail.
In general a task can have many upstreams. A Task will not start until all of its upstreams have successfully completed, potentially after multiple attempts.
The Tasks and their dependencies form a directed-acyclic graph (DAG). where the tasks are the nodes, and the edges are the dependencies.
For more about the objects go to Core Concepts.
import getpass
import uuid
from jobmon.client.tool import Tool
"""
TODO
1. Add injectable variables for IHME specific file paths and URLs
2. Replicate this file in the training repo?
3. R versions
Instructions:
This workflow is similar to many IHME modelling workflows, with a class three-phase fork-and-join task flow:
1. A data preparation phase, with one job. In a real modelling pipeline this would split a large input file
input file into manageable pieces and clean the data. In our example this task generates dummy
intermediate files for the next phase.
2. A broad phase with one Task per (dummy) location_id.
3. A summarization phase consisting of a single Task that reads the intermediate results from the individual
location models.
The steps in this example are:
1. Create a tool
2. Create a workflow using the tool from step 1
3. Create three task templates using the tool from step 1
a. Template for the Data Prep Task
b. Template for the separate location tasks
c. Template for the Summarization Phase
4. Create tasks using the templates from step 3
a. Add the necessary edge dependencies
5. Add created tasks to the workflow
6. Run the workflow
To actually run the provided example:
Make sure Jobmon is installed in your activated conda environment, and that you're on
the Slurm cluster in a srun session. From the root of the repo, run:
$ python training_scripts/workflow_template_example.py
"""
user = getpass.getuser()
wf_uuid = uuid.uuid4()
# Create a tool
tool = Tool(name="quickstart_tool_python")
# Create a workflow, and set the executor
workflow = tool.create_workflow(
name=f"quickstart_workflow_{wf_uuid}",
)
# Create task templates
data_prep_template = tool.get_task_template(
default_compute_resources={
"queue": "all.q",
"cores": 1,
"memory": "1G",
"runtime": "1m",
"stdout": "/tmp",
"stderr": "/tmp",
"project": "my_slurm_account",
},
template_name="quickstart_data_prep_template",
default_cluster_name="slurm",
command_template="python "
"/code_dir/docsource/quickstart_tasks/data_prep.py "
"--location_set_id {location_set_id} "
"--root_data_dir {root_data_dir} "
"--log_level {log_level}",
node_args=[],
task_args=["location_set_id", "root_data_dir"],
op_args=["log_level"],
)
parallel_by_location_template = tool.get_task_template(
default_compute_resources={
"queue": "all.q",
"cores": 2,
"memory": "1G",
"runtime": "10m",
"stdout": "/tmp",
"stderr": "/tmp",
"project": "my_slurm_account"
},
template_name="quickstart_location_template",
default_cluster_name="slurm",
command_template="python "
"/code_dir/docsource/quickstart_tasks/one_location.py "
"--location_id {location_id} "
"--root_data_dir {root_data_dir} "
"--log_level {log_level} ",
node_args=["location_id"],
task_args=["root_data_dir"],
op_args=["log_level"],
)
summarization_template = tool.get_task_template(
default_compute_resources={
"queue": "all.q",
"cores": 2,
"memory": "1G",
"runtime": "10m",
"stdout": "/tmp",
"stderr": "/tmp",
"project": "my_slurm_account"
},
template_name="quickstart_summarization_template",
default_cluster_name="slurm",
command_template="python /code_dir/docsource/quickstart_tasks/summarization.py "
"--root_data_dir {root_data_dir} "
"--log_level {log_level}",
node_args=[],
task_args=["root_data_dir"],
op_args=["log_level"],
)
# Create tasks
location_set_id = 5
location_set = list(range(location_set_id))
root_data_dir = f"/home/{user}/quickstart/data"
data_prep_task = data_prep_template.create_task(
name="data_prep_task",
upstream_tasks=[],
root_data_dir=root_data_dir,
location_set_id=location_set_id,
log_level="DEBUG"
)
workflow.add_tasks([data_prep_task])
location_tasks = parallel_by_location_template.create_tasks(
root_data_dir=root_data_dir,
location_id=location_set,
log_level="DEBUG"
)
workflow.add_tasks(location_tasks)
summarization_task = summarization_template.create_task(
root_data_dir=root_data_dir,
log_level="DEBUG"
)
workflow.add_tasks([summarization_task])
# Connect the dependencies. Notice the use of get_tasks_by_node_args
for loc_id in location_set:
foo = {"location_id": loc_id}
single_task = workflow.get_tasks_by_node_args("quickstart_location_template", **foo)
# Notice it returns a set. Should only be one
single_task[0].add_upstream(data_prep_task)
summarization_task.add_upstream(single_task[0])
# Calling workflow.bind() first just so that we can get the workflow id
workflow.bind()
print("Workflow creation complete.")
print(f"Running workflow with ID {workflow.workflow_id}.")
print("If you have a Jobmon GUI deployed, see the Jobmon GUI for full information:")
print(f"https://jobmon-gui.mydomain.com/#/workflow/{workflow.workflow_id}")
# run workflow
status = workflow.run()
print(f"Workflow {workflow.workflow_id} completed with status {status}.")
library(jobmonr)
# Create a workflow
username <- Sys.getenv("USER")
# Parameters for the workflow
root_data_dir <- paste0("/home/", username, "/quickstart/data")
location_set_id <- as.integer(5)
location_ids <- seq(from=0, to=location_set_id)
# Create a tool
my_tool <- tool(name="r_quickstart_tool_R")
# Set the tool compute resources
jobmonr::set_default_tool_resources(
tool=my_tool,
default_cluster_name="slurm",
resources=list(
"cores"=1,
"queue"="all.q",
"runtime"="2m",
"memory"="1G",
"project"="proj_scicomp"
)
)
# Bind a workflow to the tool
wf <- workflow(my_tool, name=paste0("quickstart_workflow_", Sys.Date()))
# Create template to run our script
data_prep_template <- task_template(tool=my_tool,
template_name="quickstart_data_prep_template",
command_template=paste(
Sys.getenv("RETICULATE_PYTHON"),
"/code_dir/docsource/quickstart_tasks/data_prep.py",
"--location_set_id {location_set_id}",
"--root_data_dir {root_data_dir}",
"--log_level {log_level}",
sep=" "),
task_args=list("location_set_id", "root_data_dir"),
op_args=list("log_level"))
# Optional: default resources can be updated at the task or task template level
jobmonr::set_default_template_resources(
task_template=data_prep_template,
default_cluster_name="slurm",
resources=list(
"queue"="all.q",
"constraints"="archive"
)
)
parallel_by_location_template <- task_template(tool=my_tool,
template_name="quickstart_location_template",
command_template=paste(
Sys.getenv("RETICULATE_PYTHON"),
"/code_dir/docsource/quickstart_tasks/one_location.py",
"--location_id {location_id}",
"--root_data_dir {root_data_dir}",
"--log_level {log_level}",
sep=" "),
node_args=list("location_id"),
task_args=list("root_data_dir"),
op_args=list("log_level"))
summarization_template <- task_template(tool=my_tool,
template_name="quickstart_summarization_template",
command_template=paste(
Sys.getenv("RETICULATE_PYTHON"),
"/code_dir/docsource/quickstart_tasks/summarization.py",
"--root_data_dir {root_data_dir}",
"--log_level {log_level}",
sep=" "),
task_args=list("root_data_dir"),
op_args=list("log_level"))
# Now create the tasks
data_prep_task <- task(task_template=data_prep_template,
name="data_prep_task",
root_data_dir=root_data_dir,
location_set_id=location_set_id,
log_level="DEBUG")
# Add tasks to the workflow
wf <- add_tasks(wf, list(data_prep_task))
# Create all the location tasks in one call
location_tasks <- array_tasks(
task_template=parallel_by_location_template,
name="location_task_",
upstream_tasks=list(data_prep_task),
location_id=location_ids,
root_data_dir=root_data_dir,
log_level="DEBUG")
wf <- add_tasks(wf, location_tasks)
summarization_task <- task(task_template=summarization_template,
name="summarization_task",
upstream_tasks=location_tasks,
root_data_dir=root_data_dir,
log_level="DEBUG")
wf <- add_tasks(wf, list(summarization_task))
# Run it
wfr <- run(
workflow=wf,
resume=FALSE,
seconds_until_timeout=7200)
Constructing a Workflow and adding a few Tasks is simple:
Note
Unique Workflows: If you know that your Workflow is to be used for a one-off project only, you may choose to use an anonymous Workflow, meaning you leave workflow_args blank. In this case, WorkflowArgs will default to a UUID which, as it is randomly generated, will be harder to remember and thus is not recommended for use cases outside of the one-off project. A workflow’s uniqueness is based on its command, upstreams and downstreams, and workflow_args.
Compute Resources
Compute Resources are used to allocate resources to your tasks. You can specify memory, cores, runtime, queue, stdout, stderr, and project.
For IHME’s Slurm cluster the defaults for all queues are: * One core * 1G memory, and * Ten minutes runtime.
These values might change in the future.
You can specify that you want to run your jobs on an “archive” node
(i.e., a node with access to /snfs1
a.k.a “the J-drive”). Add the following key value pair to
their compute resources: "constraints": "archive".
Note
By default Workflows are set to time out if all of your tasks haven’t completed after 10 hours (36,000 seconds). If your Workflow times out before your tasks have finished running, those tasks will continue running, but you will need to restart your Workflow again. You can change the Workflow timeout period if your tasks combined run longer than 10 hours.
Task Generator
The Task Generator is a new feature in Jobmon, currently available only in Python, that enables users to generate tasks from an existing method. This feature utilizes serialization to convert method parameters into strings, forming a CLI command for submission to the cluster. Once on the cluster node, Jobmon deserializes the strings back into their original parameters, allowing the tasks to execute as intended.
Import
- The task generator can be imported from jobmon.core:
from jobmon.core import task_generator
- or
from jobmon.core.task_generator import task_generator
Parameters
Parameter |
Type |
Description |
|---|---|---|
serializers |
Dict |
A dict of {type: callable} to serialize the input parameters. |
naming_args |
Optional[List[str]] |
The args used to name the task. Does not apply to array tasks. |
max_attempts |
Optional[int] |
The maximum number of attempts to run the task. |
module_source_path |
Optional[str] |
The path to the module source code if the module is not in the current conda environment. |
Examples
Method with simple input parameter type:
@task_generator(
serializers={},
tool_name="test_tool",
module_source_path=full_script_path,
max_attempts=1,
naming_args=["foo"],
)
def simple_function(foo: int, bar: List[str] = []) -> None:
"""Simple task_function."""
print(f"foo: {foo}")
print(f"bar: {bar}")
The above code allows the user to use the “simple_function” method as a task generator to generate a single task or a task array.
To create a single task workflow:
tool = Tool("test_tool")
tool.set_default_compute_resources_from_dict(
cluster_name="slurm", compute_resources={"queue": "all.q"}
)
wf = tool.create_workflow()
compute_resources = {"queue": "all.q", "project": "proj_scicomp"}
task = simple_function.create_task(wf, foo=1, bar=["a", "b"], compute_resources=compute_resources)
wf.add_task(task)
wf.run()
The above code creates a workflow with a single task named “simple_function:foo=1” and runs it.
To create a task array workflow:
tool = Tool("test_tool")
tool.set_default_compute_resources_from_dict(
cluster_name="slurm", compute_resources={"queue": "all.q"}
)
wf = tool.create_workflow()
compute_resources = {"queue": "all.q", "project": "proj_scicomp"}
tasks = simple_function.create_tasks(compute_resources=compute_resources, foo=[1, 2], bar=[["a", "b"]])
wf.add_task_array(task_array)
wf.run()
The above code creates a workflow with a task array of two tasks with input (foo=1, bar=[“a”, “b”] and (foo=2, bar=[“a”, “b”]) and runs it.
Method with complex input parameter type:
class TestYear:
"""A fake YearRange class for testing"""
def __init__(self, year: int) -> None:
self.year = year
@staticmethod
def parse_year(year: str):
"""Parse a year range."""
return TestYear(int(year))
def __str__(self) -> str:
return str(self.year)
def __eq__(self, other):
return self.year == other.year
test_year_serializer = {TestYear: (str, TestYear.parse_year)}
@task_generator.task_generator(
serializers=test_year_serializer,
tool_name="test_tool",
module_source_path=full_script_path,
max_attempts=1,
naming_args=["year"],
)
def simple_function_with_serializer(year: TestYear) -> None:
"""Simple task_function."""
print(f"year: {year}")
The above code creates a testing class, TestYear, with a serializer to convert s string to TestYear, and a task generator, simple_function_with_serializer.
To create a single task workflow:
tool = Tool("test_tool")
tool.set_default_compute_resources_from_dict(
cluster_name="slurm", compute_resources={"queue": "all.q"}
)
wf = tool.create_workflow()
compute_resources = {"queue": "all.q", "project": "proj_scicomp"}
task = simple_function_with_serializer.create_task(wf,
year=TestYear(2021),
compute_resources=compute_resources)
wf.add_task(task)
wf.run()
The above code creates a workflow with a single task named “simple_function_with_serializer:year=2021” and runs it.
To create a workflow with function input containing special characters like a single quote:
import html
def special_char_encodeing(input: str) -> str:
"""Encode special characters."""
return html.escape(input)
def special_char_decoding(input: str) -> str:
"""Decode special characters."""
return html.unescape(input)
@task_generator.task_generator(
serializers={str: (special_char_encodeing, special_char_decoding)},
tool_name="test_tool",
module_source_path=full_script_path,
max_attempts=1,
naming_args=["foo"],
)
def special_chars_function(foo: str) -> None:
"""Simple task_function."""
print(f"foo: {foo}")
tool = Tool("test_tool")
tool.set_default_compute_resources_from_dict(
cluster_name="slurm", compute_resources={"queue": "all.q"}
)
wf = tool.create_workflow()
compute_resources = {"queue": "all.q", "project": "proj_scicomp"}
simple_function = task_generator_funcs.special_chars_function
task = simple_function.create_task(compute_resources=compute_resources, foo=f"\'aaa\'")
wf.add_task(task)
wf.run()
The above code creates a workflow with a single task what requests special characters in the input. Please note that this makes the Jobmon command harder to read and understand; thus, SciComp is not responsible to debug it.
- You can pass your own function to name your task. The function should take two arguments:
prefix: str - this will be your function name kwargs_for_name: Dict[str, Any] - the arguments of the task and return a string.
def custom_naming(prefix: str, kwargs_for_name: Dict[str, Any]) -> str:
return f"Lala_{kwargs_for_name['foo']}"
@task_generator(
serializers={},
tool_name="test_tool",
module_source_path=full_script_path,
max_attempts=1,
naming_args=["foo"],
custom_naming=custom_naming,
)
def simple_function(foo: int, bar: List[str] = []) -> None:
"""Simple task_function."""
print(f"foo: {foo}")
print(f"bar: {bar}")
The above code creates a task generator with a custom naming function. The task will be named “Lala_1” instead of “simple_function:foo=1”.
@task_generator(
default_cluster_name="slurm",
default_compute_resources={"queue": "all.q", "project": "proj_scicomp"},
serializers={},
tool_name="test_tool",
module_source_path=full_script_path,
max_attempts=1,
naming_args=["foo"],
)
def simple_function(foo: int, bar: List[str] = []) -> None:
"""Simple task_function."""
print(f"foo: {foo}")
print(f"bar: {bar}")
The above code creates a task generator with default cluster name and compute resources, so you do not need to specify them when creating a task.
@task_generator(
yaml_file="/tmp/task_generator.yaml",
serializers={},
tool_name="test_tool",
module_source_path=full_script_path,
max_attempts=1,
naming_args=["foo"],
)
def simple_function(foo: int, bar: List[str] = []) -> None:
"""Simple task_function."""
print(f"foo: {foo}")
print(f"bar: {bar}")
The above code creates a task generator with a yaml file that contains the default cluster name and compute resources,
Getting Additional Help
Inside IHME
The Scientific Computing team is always available to answer your questions or to consult on Jobmon.
- To contact the team via Slack:
#jobmon-users to ask questions or raise concerns about Jobmon.
- To set up a consultation:
Create a Help Desk ticket asking for a consultation: SciComp Help Desk.
A Scientific Computing team member will reach out to you to schedule a consultation meeting.
- To raise a Scientific Computing help desk request:
When requesting help try to provide the team with as much information as you have about your problem. Please include your Workflow id, the Jobmon version that you’re using, and any TaskInstance error logs that you have.
Important announcements are on the slack channel #jobmon-announce
Outside IHME
Please use GitHub to communicate your issue.