R Client Guide
The Jobmon R client (jobmonr) allows you to create and run Jobmon workflows
entirely from R or RStudio. It wraps the Python client using reticulate,
providing native R functions for all core Jobmon operations.
Note
For general Jobmon concepts (Tools, Workflows, Tasks, etc.), see Core Concepts. This guide focuses on R-specific usage patterns.
Installation
The R client is distributed separately from the Python client.
# Install from source (contact your administrator for the package)
install.packages("jobmonr", repos = NULL, type = "source")
Requirements
R 4.0.0 or higher
The
reticulatepackageA Python environment with
jobmon_clientinstalled
Python Environment
By default, jobmonr uses a centrally managed Python environment. To use
your own:
# Set BEFORE loading jobmonr
Sys.setenv(RETICULATE_PYTHON = "/path/to/your/python")
library(jobmonr)
Warning
reticulate can only use one Python environment per R session. If you use
other reticulate-based packages, you may need to create a unified
environment with all required packages.
Quick Start
Here’s a minimal example to create and run a workflow:
library(jobmonr)
# 1. Create a Tool
my_tool <- jobmonr::tool(name = "my_r_tool")
# 2. Set default resources
my_tool <- jobmonr::set_default_tool_resources(
my_tool,
default_cluster_name = "slurm",
resources = list(
memory = "2G",
runtime = "30m",
cores = 1L,
queue = "all.q"
)
)
# 3. Create a Workflow
wf <- jobmonr::workflow(
tool = my_tool,
workflow_args = paste0("my_workflow_", Sys.Date())
)
# 4. Create a Task Template
tt <- jobmonr::task_template(
tool = my_tool,
template_name = "process_data",
command_template = "{rshell} {script} --id {data_id}",
node_args = list("data_id"),
op_args = list("rshell", "script")
)
# 5. Create Tasks
tasks <- list()
for (id in 1:10) {
tasks[[id]] <- jobmonr::task(
task_template = tt,
name = paste0("process_", id),
data_id = id,
rshell = "Rscript",
script = "process.R"
)
}
# 6. Add tasks and run
wf <- jobmonr::add_tasks(wf, tasks)
status <- jobmonr::run(wf, resume = FALSE, seconds_until_timeout = 3600)
if (status != "D") {
stop("Workflow failed!")
}
API Reference
Tool Functions
tool()
Create a Jobmon Tool to associate with your workflows.
tool(name = "unknown", active_tool_version_id = "latest")
Arguments:
name(character): Name for your tool (e.g., “codem”, “codcorrect”)active_tool_version_id: Tool version to use. Default “latest”
Returns: A Tool object reference
Example:
my_tool <- jobmonr::tool(name = "my_analysis")
create_new_tool_version()
Create a new version of an existing tool.
create_new_tool_version(tool)
Arguments:
tool: A Tool object
Returns: The new tool_version_id
Example:
new_version <- jobmonr::create_new_tool_version(my_tool)
set_default_tool_resources()
Set default compute resources for all workflows/tasks created from this tool.
set_default_tool_resources(
tool,
default_cluster_name,
resources = NULL,
path_to_yaml = NULL
)
Arguments:
tool: A Tool objectdefault_cluster_name(character): Cluster name (e.g., “slurm”)resources(named list): Resource key-value pairspath_to_yaml(character): Path to YAML file with resources
Note: Provide either resources OR path_to_yaml, not both.
Example:
# From a list
my_tool <- jobmonr::set_default_tool_resources(
my_tool,
default_cluster_name = "slurm",
resources = list(
memory = "10G",
runtime = "2h",
cores = 2L,
queue = "all.q"
)
)
# From YAML
my_tool <- jobmonr::set_default_tool_resources(
my_tool,
default_cluster_name = "slurm",
path_to_yaml = "/path/to/resources.yaml"
)
Workflow Functions
workflow()
Create a workflow from a tool.
workflow(tool, workflow_args = "", workflow_attributes = list(), ...)
Arguments:
tool: A Tool objectworkflow_args(character): Unique identifier for this workflowworkflow_attributes(list): Custom attributes to track...: Additional arguments (e.g.,max_concurrently_running)
Returns: A Workflow object reference
Example:
wf <- jobmonr::workflow(
tool = my_tool,
workflow_args = "version_2024_q4",
workflow_attributes = list(
release_id = 123,
description = "Q4 2024 analysis"
),
max_concurrently_running = 500
)
add_tasks()
Add tasks to a workflow.
add_tasks(workflow, tasks)
Arguments:
workflow: A Workflow objecttasks(list): List of Task objects
Returns: The updated Workflow object
Example:
wf <- jobmonr::add_tasks(wf, my_tasks)
add_arrays()
Add arrays (and their tasks) to a workflow.
add_arrays(workflow, arrays)
run()
Execute a workflow.
run(workflow, resume, seconds_until_timeout, ...)
Arguments:
workflow: A Workflow objectresume(logical): Whether to resume an existing workflowseconds_until_timeout(numeric): Maximum wait time in seconds...: Additional arguments
Returns: Workflow run status (“D” for done, “E” for error, etc.)
Example:
status <- jobmonr::run(
wf,
resume = FALSE,
seconds_until_timeout = 36000 # 10 hours
)
if (status != "D") {
stop(paste("Workflow failed with status:", status))
}
set_default_workflow_resources()
Set default resources for all tasks in a workflow.
set_default_workflow_resources(
workflow,
default_cluster_name,
resources = NULL,
path_to_yaml = NULL
)
Task Template Functions
task_template()
Create a task template for generating similar tasks.
task_template(
tool,
template_name,
command_template,
node_args = list(),
task_args = list(),
op_args = list()
)
Arguments:
tool: A Tool objecttemplate_name(character): Name for this templatecommand_template(character): Python-style format string with placeholdersnode_args(list): Arguments for parallelization (e.g., location_id)task_args(list): Arguments for data flow (e.g., version_id)op_args(list): Operational arguments (e.g., script paths)
Returns: A TaskTemplate object reference
Example:
model_template <- jobmonr::task_template(
tool = my_tool,
template_name = "model_by_location",
command_template = "{rshell} {script} --loc {location_id} --year {year_id}",
node_args = list("location_id"),
task_args = list("year_id"),
op_args = list("rshell", "script")
)
set_default_template_resources()
Set default resources for tasks created from this template.
set_default_template_resources(
task_template,
default_cluster_name,
resources = NULL,
path_to_yaml = NULL
)
Example:
model_template <- jobmonr::set_default_template_resources(
model_template,
default_cluster_name = "slurm",
resources = list(
memory = "20G",
runtime = "4h",
cores = 4L
)
)
Task Functions
task()
Create a single task from a template.
task(
task_template,
name,
compute_resources = NULL,
upstream_tasks = list(),
task_attributes = list(),
max_attempts = 3,
...
)
Arguments:
task_template: A TaskTemplate objectname(character): Unique name for this taskcompute_resources(named list): Override template resourcesupstream_tasks(list): Tasks this task depends ontask_attributes(list): Custom attributesmax_attempts(integer): Number of retries (default 3)...: Values for node_args, task_args, and op_args
Returns: A Task object reference
Example:
my_task <- jobmonr::task(
task_template = model_template,
name = "model_loc_1_year_2024",
upstream_tasks = list(prep_task),
max_attempts = 5,
location_id = 1,
year_id = 2024,
rshell = "Rscript",
script = "model.R"
)
array_tasks()
Create multiple tasks at once by providing lists of node_arg values.
array_tasks(
task_template,
upstream_tasks = list(),
max_attempts = 3,
compute_resources = NULL,
cluster_name = "",
...
)
Arguments:
task_template: A TaskTemplate objectupstream_tasks(list): Tasks all array tasks depend onmax_attempts(integer): Retries per taskcompute_resources(named list): Override resourcescluster_name(character): Override cluster...: node/task/op args (node_args as lists for parallelization)
Returns: A list of Task objects
Example:
# Create tasks for 3 locations × 2 sexes = 6 tasks
tasks <- jobmonr::array_tasks(
task_template = model_template,
upstream_tasks = list(prep_task),
location_id = list(1L, 2L, 3L), # node_arg as list
sex_id = list(1L, 2L), # node_arg as list
year_id = 2024, # task_arg as single value
rshell = "Rscript",
script = "model.R"
)
get_tasks_by_node_args()
Retrieve specific tasks from an array by their node argument values.
get_tasks_by_node_args(workflow, task_template_name, ...)
Example:
# Get all tasks for location 1
loc1_tasks <- jobmonr::get_tasks_by_node_args(wf, "model_template", location_id = 1)
# Get the specific task for location 1, sex 1
specific_task <- jobmonr::get_tasks_by_node_args(
wf, "model_template",
location_id = 1,
sex_id = 1
)
Utility Functions
jobmon_help()
Display the Python docstring for a Jobmon object.
jobmon_help(object)
Arguments:
object(character): One of “tool”, “task_template”, “task”, “workflow”
Example:
jobmonr::jobmon_help("workflow")
YAML Configuration
You can define compute resources in YAML files for easier management.
Tool Resources YAML
tool_resources:
slurm:
cores: 1
memory: "2G"
runtime: "1h"
queue: "all.q"
Task Template Resources YAML
task_template_resources:
my_template_name:
slurm:
cores: 4
memory: "20G"
runtime: "6h"
queue: "long.q"
Complete Example
Here’s a full example of a fork-join workflow pattern:
library(jobmonr)
# ============================================================
# Configuration
# ============================================================
VERSION_ID <- 42
LOCATIONS <- c(1, 2, 3, 4, 5)
DRAWS <- 1:100
# ============================================================
# Setup Tool
# ============================================================
my_tool <- jobmonr::tool(name = "fork_join_example")
my_tool <- jobmonr::set_default_tool_resources(
my_tool,
default_cluster_name = "slurm",
resources = list(
memory = "5G",
runtime = "1h",
cores = 1L,
queue = "all.q",
project = "proj_my_project"
)
)
# ============================================================
# Create Workflow
# ============================================================
wf <- jobmonr::workflow(
tool = my_tool,
workflow_args = paste0("v", VERSION_ID, "_", Sys.Date()),
workflow_attributes = list(version_id = VERSION_ID)
)
# ============================================================
# Define Task Templates
# ============================================================
# Parallel modeling tasks
model_tt <- jobmonr::task_template(
tool = my_tool,
template_name = "model",
command_template = "Rscript model.R --loc {location_id} --draw {draw_id} --version {version_id}",
node_args = list("location_id", "draw_id"),
task_args = list("version_id"),
op_args = list()
)
# Override resources for compute-intensive modeling
model_tt <- jobmonr::set_default_template_resources(
model_tt,
default_cluster_name = "slurm",
resources = list(memory = "10G", runtime = "2h")
)
# Summary task
summary_tt <- jobmonr::task_template(
tool = my_tool,
template_name = "summarize",
command_template = "Rscript summarize.R --version {version_id}",
node_args = list(),
task_args = list("version_id"),
op_args = list()
)
summary_tt <- jobmonr::set_default_template_resources(
summary_tt,
default_cluster_name = "slurm",
resources = list(memory = "50G", runtime = "4h", cores = 10L)
)
# ============================================================
# Create Tasks
# ============================================================
# Create parallel modeling tasks using array_tasks
model_tasks <- jobmonr::array_tasks(
task_template = model_tt,
location_id = as.list(LOCATIONS),
draw_id = as.list(DRAWS),
version_id = VERSION_ID
)
# Create summary task that waits for all modeling tasks
summary_task <- jobmonr::task(
task_template = summary_tt,
name = "summarize_all",
upstream_tasks = model_tasks,
version_id = VERSION_ID
)
# ============================================================
# Run Workflow
# ============================================================
wf <- jobmonr::add_tasks(wf, model_tasks)
wf <- jobmonr::add_tasks(wf, list(summary_task))
cat("Starting workflow with", length(model_tasks) + 1, "tasks\n")
status <- jobmonr::run(
wf,
resume = FALSE,
seconds_until_timeout = 36000 # 10 hours
)
if (status == "D") {
cat("Workflow completed successfully!\n")
} else {
stop(paste("Workflow failed with status:", status))
}
Common Patterns
Resuming a Failed Workflow
# Use the same workflow_args to resume
wf <- jobmonr::workflow(
tool = my_tool,
workflow_args = "my_workflow_v1" # Same as original
)
# ... add same tasks ...
status <- jobmonr::run(wf, resume = TRUE, seconds_until_timeout = 36000)
Setting Per-Task Resources
# Override template resources for specific tasks
big_task <- jobmonr::task(
task_template = model_tt,
name = "big_location",
compute_resources = list(
memory = "100G",
runtime = "24h"
),
location_id = 999,
version_id = 1
)
Troubleshooting
Python Environment Issues
If you see errors about missing Python modules:
# Check which Python is being used
reticulate::py_config()
# Verify jobmon is installed
reticulate::py_module_available("jobmon.client")
Version Mismatch
If you see version mismatch errors, ensure jobmonr and jobmon_client
versions are compatible (major.minor versions should match).
Connection Errors
Ensure you’re on the correct network and the Jobmon server is accessible.
See Also
Core Concepts - Understanding Jobmon concepts
Workflows - Workflow patterns
Compute Resources - Resource management
Troubleshooting - Debugging issues