client.swarm.state
SwarmState: Centralized state management for workflow runs.
This module provides a consolidated state container for all workflow run state, enabling clearer state mutation boundaries and easier testing.
Attributes
Classes
Represents changes to apply to SwarmState. |
|
Centralized state container for workflow runs. |
Module Contents
- client.swarm.state.logger
- class client.swarm.state.StateUpdate
Represents changes to apply to SwarmState.
StateUpdate is an immutable record of state changes that can be: - Created from server responses - Merged together to combine multiple updates - Applied atomically to SwarmState
This pattern provides clear boundaries for state mutations and makes it easy to track what changed.
- sync_time: datetime.datetime | None = None
- classmethod empty() StateUpdate
Create an empty update (no changes).
- classmethod from_task_status_response(tasks_by_status: dict[str, list[int]], sync_time: datetime.datetime | None = None) StateUpdate
Create a StateUpdate from a server task status response.
- Parameters:
tasks_by_status – Server response format {status: [task_ids]}
sync_time – Server sync timestamp
- Returns:
StateUpdate with task_statuses populated.
- merge(other: StateUpdate) StateUpdate
Combine two updates, with other taking precedence for conflicts.
- Parameters:
other – The update to merge in (takes precedence).
- Returns:
New StateUpdate with combined changes.
- class client.swarm.state.SwarmState(workflow_id: int, workflow_run_id: int, dag_id: int, max_concurrently_running: int = 10000, status: str = 'B')
Centralized state container for workflow runs.
SwarmState consolidates all workflow run state in one place: - Task registry and status tracking - Array registry - Scheduling queue (ready_to_run) - Workflow run metadata
All state queries and mutations go through this class, providing clear boundaries and making the code easier to test and reason about.
- Usage:
state = SwarmState(workflow_id=1, workflow_run_id=10, dag_id=5) state.add_task(swarm_task) state.apply_update(StateUpdate(task_statuses={1: “D”}))
Initialize the state container.
- Parameters:
workflow_id – The workflow ID.
workflow_run_id – The workflow run ID.
dag_id – The DAG ID.
max_concurrently_running – Initial workflow concurrency limit.
status – Initial workflow run status.
- workflow_id
- workflow_run_id
- dag_id
- ready_to_run: collections.deque[jobmon.client.swarm.task.SwarmTask]
- status = 'B'
- max_concurrently_running = 10000
- last_sync: datetime.datetime | None = None
- add_task(task: jobmon.client.swarm.task.SwarmTask) None
Register a task with the state.
- Parameters:
task – The SwarmTask to add.
- add_array(array: jobmon.client.swarm.array.SwarmArray) None
Register an array with the state.
- Parameters:
array – The SwarmArray to add.
- get_task(task_id: int) jobmon.client.swarm.task.SwarmTask | None
Get a task by ID.
- Parameters:
task_id – The task ID to look up.
- Returns:
The SwarmTask or None if not found.
- get_tasks_by_status(status: str) set[jobmon.client.swarm.task.SwarmTask]
Get all tasks with a specific status.
- Parameters:
status – The TaskStatus to filter by.
- Returns:
Set of SwarmTask objects with that status.
- has_pending_work() bool
True if there’s in-flight or ready-to-run work.
Tasks in ADJUSTING_RESOURCES count as pending: they are transient and will be re-queued on the next scheduling pass. Excluding them causes the orchestrator main loop to exit prematurely when the only in-flight work is adjusting, even though downstream REGISTERING tasks are still waiting on them.
- get_array_capacity(array_id: int) int
How many more tasks can be queued for this array.
- Parameters:
array_id – The array to check.
- Returns:
Available capacity for that array.
- apply_update(update: StateUpdate) set[jobmon.client.swarm.task.SwarmTask]
Apply a StateUpdate to this state.
This method atomically applies all changes in the update and returns the set of tasks whose status changed.
- Parameters:
update – The StateUpdate to apply.
- Returns:
Set of SwarmTask objects whose status changed.
- update_task_status(task_id: int, new_status: str) bool
Update a single task’s status.
- Parameters:
task_id – The task to update.
new_status – The new status.
- Returns:
True if the task was found and updated.
- propagate_completions(completed_tasks: set[jobmon.client.swarm.task.SwarmTask]) list[jobmon.client.swarm.task.SwarmTask]
Update downstream dependency counts for completed tasks.
- Parameters:
completed_tasks – Tasks that just completed.
- Returns:
List of tasks that became ready to run (only REGISTERING tasks).
- enqueue_task(task: jobmon.client.swarm.task.SwarmTask, front: bool = False) None
Add a task to the ready_to_run queue.
- Parameters:
task – The task to enqueue.
front – If True, add to front of queue (higher priority).
- dequeue_task() jobmon.client.swarm.task.SwarmTask | None
Remove and return the next task from the ready_to_run queue.
- Returns:
The next task, or None if queue is empty.
- get_cached_resources(resource_hash: int) jobmon.client.task_resources.TaskResources | None
Get cached TaskResources by hash.
- Parameters:
resource_hash – The hash of the TaskResources.
- Returns:
Cached TaskResources or None if not found.
- cache_resources(resource_hash: int, task_resources: jobmon.client.task_resources.TaskResources) jobmon.client.task_resources.TaskResources
Cache TaskResources and return the cached version.
- Parameters:
resource_hash – The hash of the TaskResources.
task_resources – The TaskResources to cache.
- Returns:
The cached TaskResources (may be existing if already cached).