client.swarm.state

SwarmState: Centralized state management for workflow runs.

This module provides a consolidated state container for all workflow run state, enabling clearer state mutation boundaries and easier testing.

Attributes

logger

ACTIVE_TASK_STATUSES

SERVER_STOP_STATUSES

TERMINATING_STATUSES

Classes

StateUpdate

Represents changes to apply to SwarmState.

SwarmState

Centralized state container for workflow runs.

Module Contents

client.swarm.state.logger
client.swarm.state.ACTIVE_TASK_STATUSES: tuple[str, Ellipsis]
client.swarm.state.SERVER_STOP_STATUSES: frozenset[str]
client.swarm.state.TERMINATING_STATUSES: tuple[str, Ellipsis]
class client.swarm.state.StateUpdate

Represents changes to apply to SwarmState.

StateUpdate is an immutable record of state changes that can be: - Created from server responses - Merged together to combine multiple updates - Applied atomically to SwarmState

This pattern provides clear boundaries for state mutations and makes it easy to track what changed.

task_statuses: dict[int, str]
max_concurrently_running: int | None = None
array_limits: dict[int, int]
workflow_run_status: str | None = None
sync_time: datetime.datetime | None = None
classmethod empty() StateUpdate

Create an empty update (no changes).

classmethod from_task_status_response(tasks_by_status: dict[str, list[int]], sync_time: datetime.datetime | None = None) StateUpdate

Create a StateUpdate from a server task status response.

Parameters:
  • tasks_by_status – Server response format {status: [task_ids]}

  • sync_time – Server sync timestamp

Returns:

StateUpdate with task_statuses populated.

merge(other: StateUpdate) StateUpdate

Combine two updates, with other taking precedence for conflicts.

Parameters:

other – The update to merge in (takes precedence).

Returns:

New StateUpdate with combined changes.

is_empty() bool

Check if this update contains any changes.

class client.swarm.state.SwarmState(workflow_id: int, workflow_run_id: int, dag_id: int, max_concurrently_running: int = 10000, status: str = 'B')

Centralized state container for workflow runs.

SwarmState consolidates all workflow run state in one place: - Task registry and status tracking - Array registry - Scheduling queue (ready_to_run) - Workflow run metadata

All state queries and mutations go through this class, providing clear boundaries and making the code easier to test and reason about.

Usage:

state = SwarmState(workflow_id=1, workflow_run_id=10, dag_id=5) state.add_task(swarm_task) state.apply_update(StateUpdate(task_statuses={1: “D”}))

Initialize the state container.

Parameters:
  • workflow_id – The workflow ID.

  • workflow_run_id – The workflow run ID.

  • dag_id – The DAG ID.

  • max_concurrently_running – Initial workflow concurrency limit.

  • status – Initial workflow run status.

workflow_id
workflow_run_id
dag_id
tasks: dict[int, jobmon.client.swarm.task.SwarmTask]
arrays: dict[int, jobmon.client.swarm.array.SwarmArray]
ready_to_run: collections.deque[jobmon.client.swarm.task.SwarmTask]
status = 'B'
max_concurrently_running = 10000
last_sync: datetime.datetime | None = None
task_resources_cache: dict[int, jobmon.client.task_resources.TaskResources]
num_previously_complete: int = 0
add_task(task: jobmon.client.swarm.task.SwarmTask) None

Register a task with the state.

Parameters:

task – The SwarmTask to add.

add_array(array: jobmon.client.swarm.array.SwarmArray) None

Register an array with the state.

Parameters:

array – The SwarmArray to add.

get_task(task_id: int) jobmon.client.swarm.task.SwarmTask | None

Get a task by ID.

Parameters:

task_id – The task ID to look up.

Returns:

The SwarmTask or None if not found.

property active_task_statuses: tuple[str, Ellipsis]

Task statuses representing in-flight work.

get_active_task_count() int

Count of tasks currently in-flight.

get_ready_to_run_count() int

Count of tasks ready to be scheduled.

get_done_count() int

Count of completed tasks.

get_failed_count() int

Count of fatally failed tasks.

get_done_tasks() list[jobmon.client.swarm.task.SwarmTask]

List of completed tasks.

get_failed_tasks() list[jobmon.client.swarm.task.SwarmTask]

List of fatally failed tasks.

get_tasks_by_status(status: str) set[jobmon.client.swarm.task.SwarmTask]

Get all tasks with a specific status.

Parameters:

status – The TaskStatus to filter by.

Returns:

Set of SwarmTask objects with that status.

all_tasks_final() bool

True if all tasks are in terminal state (DONE or ERROR_FATAL).

has_pending_work() bool

True if there’s in-flight or ready-to-run work.

Tasks in ADJUSTING_RESOURCES count as pending: they are transient and will be re-queued on the next scheduling pass. Excluding them causes the orchestrator main loop to exit prematurely when the only in-flight work is adjusting, even though downstream REGISTERING tasks are still waiting on them.

get_available_capacity() int

How many more tasks can be queued at workflow level.

get_array_capacity(array_id: int) int

How many more tasks can be queued for this array.

Parameters:

array_id – The array to check.

Returns:

Available capacity for that array.

get_percent_done() float

Calculate completion percentage.

apply_update(update: StateUpdate) set[jobmon.client.swarm.task.SwarmTask]

Apply a StateUpdate to this state.

This method atomically applies all changes in the update and returns the set of tasks whose status changed.

Parameters:

update – The StateUpdate to apply.

Returns:

Set of SwarmTask objects whose status changed.

update_task_status(task_id: int, new_status: str) bool

Update a single task’s status.

Parameters:
  • task_id – The task to update.

  • new_status – The new status.

Returns:

True if the task was found and updated.

propagate_completions(completed_tasks: set[jobmon.client.swarm.task.SwarmTask]) list[jobmon.client.swarm.task.SwarmTask]

Update downstream dependency counts for completed tasks.

Parameters:

completed_tasks – Tasks that just completed.

Returns:

List of tasks that became ready to run (only REGISTERING tasks).

enqueue_task(task: jobmon.client.swarm.task.SwarmTask, front: bool = False) None

Add a task to the ready_to_run queue.

Parameters:
  • task – The task to enqueue.

  • front – If True, add to front of queue (higher priority).

dequeue_task() jobmon.client.swarm.task.SwarmTask | None

Remove and return the next task from the ready_to_run queue.

Returns:

The next task, or None if queue is empty.

get_cached_resources(resource_hash: int) jobmon.client.task_resources.TaskResources | None

Get cached TaskResources by hash.

Parameters:

resource_hash – The hash of the TaskResources.

Returns:

Cached TaskResources or None if not found.

cache_resources(resource_hash: int, task_resources: jobmon.client.task_resources.TaskResources) jobmon.client.task_resources.TaskResources

Cache TaskResources and return the cached version.

Parameters:
  • resource_hash – The hash of the TaskResources.

  • task_resources – The TaskResources to cache.

Returns:

The cached TaskResources (may be existing if already cached).

compute_initial_upstream_done_counts() None

Compute initial num_upstreams_done for all tasks.

Call this after all tasks and their relationships are registered.