R Client Guide

The Jobmon R client (jobmonr) allows you to create and run Jobmon workflows entirely from R or RStudio. It wraps the Python client using reticulate, providing native R functions for all core Jobmon operations.

Note

For general Jobmon concepts (Tools, Workflows, Tasks, etc.), see Core Concepts. This guide focuses on R-specific usage patterns.

Installation

The R client is distributed separately from the Python client.

# Install from source (contact your administrator for the package)
install.packages("jobmonr", repos = NULL, type = "source")

Requirements

  • R 4.0.0 or higher

  • The reticulate package

  • A Python environment with jobmon_client installed

Python Environment

By default, jobmonr uses a centrally managed Python environment. To use your own:

# Set BEFORE loading jobmonr
Sys.setenv(RETICULATE_PYTHON = "/path/to/your/python")
library(jobmonr)

Warning

reticulate can only use one Python environment per R session. If you use other reticulate-based packages, you may need to create a unified environment with all required packages.

Quick Start

Here’s a minimal example to create and run a workflow:

library(jobmonr)

# 1. Create a Tool
my_tool <- jobmonr::tool(name = "my_r_tool")

# 2. Set default resources
my_tool <- jobmonr::set_default_tool_resources(
  my_tool,
  default_cluster_name = "slurm",
  resources = list(
    memory = "2G",
    runtime = "30m",
    cores = 1L,
    queue = "all.q"
  )
)

# 3. Create a Workflow
wf <- jobmonr::workflow(
  tool = my_tool,
  workflow_args = paste0("my_workflow_", Sys.Date())
)

# 4. Create a Task Template
tt <- jobmonr::task_template(
  tool = my_tool,
  template_name = "process_data",
  command_template = "{rshell} {script} --id {data_id}",
  node_args = list("data_id"),
  op_args = list("rshell", "script")
)

# 5. Create Tasks
tasks <- list()
for (id in 1:10) {
  tasks[[id]] <- jobmonr::task(
    task_template = tt,
    name = paste0("process_", id),
    data_id = id,
    rshell = "Rscript",
    script = "process.R"
  )
}

# 6. Add tasks and run
wf <- jobmonr::add_tasks(wf, tasks)
status <- jobmonr::run(wf, resume = FALSE, seconds_until_timeout = 3600)

if (status != "D") {
  stop("Workflow failed!")
}

API Reference

Tool Functions

tool()

Create a Jobmon Tool to associate with your workflows.

tool(name = "unknown", active_tool_version_id = "latest")

Arguments:

  • name (character): Name for your tool (e.g., “codem”, “codcorrect”)

  • active_tool_version_id: Tool version to use. Default “latest”

Returns: A Tool object reference

Example:

my_tool <- jobmonr::tool(name = "my_analysis")

create_new_tool_version()

Create a new version of an existing tool.

create_new_tool_version(tool)

Arguments:

  • tool: A Tool object

Returns: The new tool_version_id

Example:

new_version <- jobmonr::create_new_tool_version(my_tool)

set_default_tool_resources()

Set default compute resources for all workflows/tasks created from this tool.

set_default_tool_resources(
  tool,
  default_cluster_name,
  resources = NULL,
  path_to_yaml = NULL
)

Arguments:

  • tool: A Tool object

  • default_cluster_name (character): Cluster name (e.g., “slurm”)

  • resources (named list): Resource key-value pairs

  • path_to_yaml (character): Path to YAML file with resources

Note: Provide either resources OR path_to_yaml, not both.

Example:

# From a list
my_tool <- jobmonr::set_default_tool_resources(
  my_tool,
  default_cluster_name = "slurm",
  resources = list(
    memory = "10G",
    runtime = "2h",
    cores = 2L,
    queue = "all.q"
  )
)

# From YAML
my_tool <- jobmonr::set_default_tool_resources(
  my_tool,
  default_cluster_name = "slurm",
  path_to_yaml = "/path/to/resources.yaml"
)

Workflow Functions

workflow()

Create a workflow from a tool.

workflow(tool, workflow_args = "", workflow_attributes = list(), ...)

Arguments:

  • tool: A Tool object

  • workflow_args (character): Unique identifier for this workflow

  • workflow_attributes (list): Custom attributes to track

  • ...: Additional arguments (e.g., max_concurrently_running)

Returns: A Workflow object reference

Example:

wf <- jobmonr::workflow(
  tool = my_tool,
  workflow_args = "version_2024_q4",
  workflow_attributes = list(
    release_id = 123,
    description = "Q4 2024 analysis"
  ),
  max_concurrently_running = 500
)

add_tasks()

Add tasks to a workflow.

add_tasks(workflow, tasks)

Arguments:

  • workflow: A Workflow object

  • tasks (list): List of Task objects

Returns: The updated Workflow object

Example:

wf <- jobmonr::add_tasks(wf, my_tasks)

add_arrays()

Add arrays (and their tasks) to a workflow.

add_arrays(workflow, arrays)

run()

Execute a workflow.

run(workflow, resume, seconds_until_timeout, ...)

Arguments:

  • workflow: A Workflow object

  • resume (logical): Whether to resume an existing workflow

  • seconds_until_timeout (numeric): Maximum wait time in seconds

  • ...: Additional arguments

Returns: Workflow run status (“D” for done, “E” for error, etc.)

Example:

status <- jobmonr::run(
  wf,
  resume = FALSE,
  seconds_until_timeout = 36000  # 10 hours
)

if (status != "D") {
  stop(paste("Workflow failed with status:", status))
}

set_default_workflow_resources()

Set default resources for all tasks in a workflow.

set_default_workflow_resources(
  workflow,
  default_cluster_name,
  resources = NULL,
  path_to_yaml = NULL
)

Task Template Functions

task_template()

Create a task template for generating similar tasks.

task_template(
  tool,
  template_name,
  command_template,
  node_args = list(),
  task_args = list(),
  op_args = list()
)

Arguments:

  • tool: A Tool object

  • template_name (character): Name for this template

  • command_template (character): Python-style format string with placeholders

  • node_args (list): Arguments for parallelization (e.g., location_id)

  • task_args (list): Arguments for data flow (e.g., version_id)

  • op_args (list): Operational arguments (e.g., script paths)

Returns: A TaskTemplate object reference

Example:

model_template <- jobmonr::task_template(
  tool = my_tool,
  template_name = "model_by_location",
  command_template = "{rshell} {script} --loc {location_id} --year {year_id}",
  node_args = list("location_id"),
  task_args = list("year_id"),
  op_args = list("rshell", "script")
)

set_default_template_resources()

Set default resources for tasks created from this template.

set_default_template_resources(
  task_template,
  default_cluster_name,
  resources = NULL,
  path_to_yaml = NULL
)

Example:

model_template <- jobmonr::set_default_template_resources(
  model_template,
  default_cluster_name = "slurm",
  resources = list(
    memory = "20G",
    runtime = "4h",
    cores = 4L
  )
)

Task Functions

task()

Create a single task from a template.

task(
  task_template,
  name,
  compute_resources = NULL,
  upstream_tasks = list(),
  task_attributes = list(),
  max_attempts = 3,
  ...
)

Arguments:

  • task_template: A TaskTemplate object

  • name (character): Unique name for this task

  • compute_resources (named list): Override template resources

  • upstream_tasks (list): Tasks this task depends on

  • task_attributes (list): Custom attributes

  • max_attempts (integer): Number of retries (default 3)

  • ...: Values for node_args, task_args, and op_args

Returns: A Task object reference

Example:

my_task <- jobmonr::task(
  task_template = model_template,
  name = "model_loc_1_year_2024",
  upstream_tasks = list(prep_task),
  max_attempts = 5,
  location_id = 1,
  year_id = 2024,
  rshell = "Rscript",
  script = "model.R"
)

array_tasks()

Create multiple tasks at once by providing lists of node_arg values.

array_tasks(
  task_template,
  upstream_tasks = list(),
  max_attempts = 3,
  compute_resources = NULL,
  cluster_name = "",
  ...
)

Arguments:

  • task_template: A TaskTemplate object

  • upstream_tasks (list): Tasks all array tasks depend on

  • max_attempts (integer): Retries per task

  • compute_resources (named list): Override resources

  • cluster_name (character): Override cluster

  • ...: node/task/op args (node_args as lists for parallelization)

Returns: A list of Task objects

Example:

# Create tasks for 3 locations × 2 sexes = 6 tasks
tasks <- jobmonr::array_tasks(
  task_template = model_template,
  upstream_tasks = list(prep_task),
  location_id = list(1L, 2L, 3L),  # node_arg as list
  sex_id = list(1L, 2L),           # node_arg as list
  year_id = 2024,                  # task_arg as single value
  rshell = "Rscript",
  script = "model.R"
)

get_tasks_by_node_args()

Retrieve specific tasks from an array by their node argument values.

get_tasks_by_node_args(workflow, task_template_name, ...)

Example:

# Get all tasks for location 1
loc1_tasks <- jobmonr::get_tasks_by_node_args(wf, "model_template", location_id = 1)

# Get the specific task for location 1, sex 1
specific_task <- jobmonr::get_tasks_by_node_args(
  wf, "model_template",
  location_id = 1,
  sex_id = 1
)

Utility Functions

jobmon_help()

Display the Python docstring for a Jobmon object.

jobmon_help(object)

Arguments:

  • object (character): One of “tool”, “task_template”, “task”, “workflow”

Example:

jobmonr::jobmon_help("workflow")

YAML Configuration

You can define compute resources in YAML files for easier management.

Tool Resources YAML

tool_resources:
  slurm:
    cores: 1
    memory: "2G"
    runtime: "1h"
    queue: "all.q"

Task Template Resources YAML

task_template_resources:
  my_template_name:
    slurm:
      cores: 4
      memory: "20G"
      runtime: "6h"
      queue: "long.q"

Complete Example

Here’s a full example of a fork-join workflow pattern:

library(jobmonr)

# ============================================================
# Configuration
# ============================================================
VERSION_ID <- 42
LOCATIONS <- c(1, 2, 3, 4, 5)
DRAWS <- 1:100

# ============================================================
# Setup Tool
# ============================================================
my_tool <- jobmonr::tool(name = "fork_join_example")
my_tool <- jobmonr::set_default_tool_resources(
  my_tool,
  default_cluster_name = "slurm",
  resources = list(
    memory = "5G",
    runtime = "1h",
    cores = 1L,
    queue = "all.q",
    project = "proj_my_project"
  )
)

# ============================================================
# Create Workflow
# ============================================================
wf <- jobmonr::workflow(
  tool = my_tool,
  workflow_args = paste0("v", VERSION_ID, "_", Sys.Date()),
  workflow_attributes = list(version_id = VERSION_ID)
)

# ============================================================
# Define Task Templates
# ============================================================

# Parallel modeling tasks
model_tt <- jobmonr::task_template(
  tool = my_tool,
  template_name = "model",
  command_template = "Rscript model.R --loc {location_id} --draw {draw_id} --version {version_id}",
  node_args = list("location_id", "draw_id"),
  task_args = list("version_id"),
  op_args = list()
)

# Override resources for compute-intensive modeling
model_tt <- jobmonr::set_default_template_resources(
  model_tt,
  default_cluster_name = "slurm",
  resources = list(memory = "10G", runtime = "2h")
)

# Summary task
summary_tt <- jobmonr::task_template(
  tool = my_tool,
  template_name = "summarize",
  command_template = "Rscript summarize.R --version {version_id}",
  node_args = list(),
  task_args = list("version_id"),
  op_args = list()
)

summary_tt <- jobmonr::set_default_template_resources(
  summary_tt,
  default_cluster_name = "slurm",
  resources = list(memory = "50G", runtime = "4h", cores = 10L)
)

# ============================================================
# Create Tasks
# ============================================================

# Create parallel modeling tasks using array_tasks
model_tasks <- jobmonr::array_tasks(
  task_template = model_tt,
  location_id = as.list(LOCATIONS),
  draw_id = as.list(DRAWS),
  version_id = VERSION_ID
)

# Create summary task that waits for all modeling tasks
summary_task <- jobmonr::task(
  task_template = summary_tt,
  name = "summarize_all",
  upstream_tasks = model_tasks,
  version_id = VERSION_ID
)

# ============================================================
# Run Workflow
# ============================================================

wf <- jobmonr::add_tasks(wf, model_tasks)
wf <- jobmonr::add_tasks(wf, list(summary_task))

cat("Starting workflow with", length(model_tasks) + 1, "tasks\n")

status <- jobmonr::run(
  wf,
  resume = FALSE,
  seconds_until_timeout = 36000  # 10 hours
)

if (status == "D") {
  cat("Workflow completed successfully!\n")
} else {
  stop(paste("Workflow failed with status:", status))
}

Common Patterns

Resuming a Failed Workflow

# Use the same workflow_args to resume
wf <- jobmonr::workflow(
  tool = my_tool,
  workflow_args = "my_workflow_v1"  # Same as original
)
# ... add same tasks ...

status <- jobmonr::run(wf, resume = TRUE, seconds_until_timeout = 36000)

Setting Per-Task Resources

# Override template resources for specific tasks
big_task <- jobmonr::task(
  task_template = model_tt,
  name = "big_location",
  compute_resources = list(
    memory = "100G",
    runtime = "24h"
  ),
  location_id = 999,
  version_id = 1
)

Troubleshooting

Python Environment Issues

If you see errors about missing Python modules:

# Check which Python is being used
reticulate::py_config()

# Verify jobmon is installed
reticulate::py_module_available("jobmon.client")

Version Mismatch

If you see version mismatch errors, ensure jobmonr and jobmon_client versions are compatible (major.minor versions should match).

Connection Errors

Ensure you’re on the correct network and the Jobmon server is accessible.

See Also