client.swarm.state ================== .. py:module:: client.swarm.state .. autoapi-nested-parse:: SwarmState: Centralized state management for workflow runs. This module provides a consolidated state container for all workflow run state, enabling clearer state mutation boundaries and easier testing. Attributes ---------- .. autoapisummary:: client.swarm.state.logger client.swarm.state.ACTIVE_TASK_STATUSES client.swarm.state.SERVER_STOP_STATUSES client.swarm.state.TERMINATING_STATUSES Classes ------- .. autoapisummary:: client.swarm.state.StateUpdate client.swarm.state.SwarmState Module Contents --------------- .. py:data:: logger .. py:data:: ACTIVE_TASK_STATUSES :type: tuple[str, Ellipsis] .. py:data:: SERVER_STOP_STATUSES :type: frozenset[str] .. py:data:: TERMINATING_STATUSES :type: tuple[str, Ellipsis] .. py:class:: StateUpdate Represents changes to apply to SwarmState. StateUpdate is an immutable record of state changes that can be: - Created from server responses - Merged together to combine multiple updates - Applied atomically to SwarmState This pattern provides clear boundaries for state mutations and makes it easy to track what changed. .. py:attribute:: task_statuses :type: dict[int, str] .. py:attribute:: max_concurrently_running :type: Optional[int] :value: None .. py:attribute:: array_limits :type: dict[int, int] .. py:attribute:: workflow_run_status :type: Optional[str] :value: None .. py:attribute:: sync_time :type: Optional[datetime.datetime] :value: None .. py:method:: empty() -> StateUpdate :classmethod: Create an empty update (no changes). .. py:method:: from_task_status_response(tasks_by_status: dict[str, list[int]], sync_time: Optional[datetime.datetime] = None) -> StateUpdate :classmethod: Create a StateUpdate from a server task status response. :param tasks_by_status: Server response format {status: [task_ids]} :param sync_time: Server sync timestamp :returns: StateUpdate with task_statuses populated. .. py:method:: merge(other: StateUpdate) -> StateUpdate Combine two updates, with `other` taking precedence for conflicts. :param other: The update to merge in (takes precedence). :returns: New StateUpdate with combined changes. .. py:method:: is_empty() -> bool Check if this update contains any changes. .. py:class:: SwarmState(workflow_id: int, workflow_run_id: int, dag_id: int, max_concurrently_running: int = 10000, status: str = 'B') Centralized state container for workflow runs. SwarmState consolidates all workflow run state in one place: - Task registry and status tracking - Array registry - Scheduling queue (ready_to_run) - Workflow run metadata All state queries and mutations go through this class, providing clear boundaries and making the code easier to test and reason about. Usage: state = SwarmState(workflow_id=1, workflow_run_id=10, dag_id=5) state.add_task(swarm_task) state.apply_update(StateUpdate(task_statuses={1: "D"})) Initialize the state container. :param workflow_id: The workflow ID. :param workflow_run_id: The workflow run ID. :param dag_id: The DAG ID. :param max_concurrently_running: Initial workflow concurrency limit. :param status: Initial workflow run status. .. py:attribute:: workflow_id .. py:attribute:: workflow_run_id .. py:attribute:: dag_id .. py:attribute:: tasks :type: dict[int, jobmon.client.swarm.task.SwarmTask] .. py:attribute:: arrays :type: dict[int, jobmon.client.swarm.array.SwarmArray] .. py:attribute:: ready_to_run :type: collections.deque[jobmon.client.swarm.task.SwarmTask] .. py:attribute:: status :value: 'B' .. py:attribute:: max_concurrently_running :value: 10000 .. py:attribute:: last_sync :type: Optional[datetime.datetime] :value: None .. py:attribute:: task_resources_cache :type: dict[int, jobmon.client.task_resources.TaskResources] .. py:attribute:: num_previously_complete :type: int :value: 0 .. py:method:: add_task(task: jobmon.client.swarm.task.SwarmTask) -> None Register a task with the state. :param task: The SwarmTask to add. .. py:method:: add_array(array: jobmon.client.swarm.array.SwarmArray) -> None Register an array with the state. :param array: The SwarmArray to add. .. py:method:: get_task(task_id: int) -> Optional[jobmon.client.swarm.task.SwarmTask] Get a task by ID. :param task_id: The task ID to look up. :returns: The SwarmTask or None if not found. .. py:property:: active_task_statuses :type: tuple[str, Ellipsis] Task statuses representing in-flight work. .. py:method:: get_active_task_count() -> int Count of tasks currently in-flight. .. py:method:: get_ready_to_run_count() -> int Count of tasks ready to be scheduled. .. py:method:: get_done_count() -> int Count of completed tasks. .. py:method:: get_failed_count() -> int Count of fatally failed tasks. .. py:method:: get_done_tasks() -> list[jobmon.client.swarm.task.SwarmTask] List of completed tasks. .. py:method:: get_failed_tasks() -> list[jobmon.client.swarm.task.SwarmTask] List of fatally failed tasks. .. py:method:: get_tasks_by_status(status: str) -> set[jobmon.client.swarm.task.SwarmTask] Get all tasks with a specific status. :param status: The TaskStatus to filter by. :returns: Set of SwarmTask objects with that status. .. py:method:: all_tasks_final() -> bool True if all tasks are in terminal state (DONE or ERROR_FATAL). .. py:method:: has_pending_work() -> bool True if there's in-flight or ready-to-run work. .. py:method:: get_available_capacity() -> int How many more tasks can be queued at workflow level. .. py:method:: get_array_capacity(array_id: int) -> int How many more tasks can be queued for this array. :param array_id: The array to check. :returns: Available capacity for that array. .. py:method:: get_percent_done() -> float Calculate completion percentage. .. py:method:: apply_update(update: StateUpdate) -> set[jobmon.client.swarm.task.SwarmTask] Apply a StateUpdate to this state. This method atomically applies all changes in the update and returns the set of tasks whose status changed. :param update: The StateUpdate to apply. :returns: Set of SwarmTask objects whose status changed. .. py:method:: update_task_status(task_id: int, new_status: str) -> bool Update a single task's status. :param task_id: The task to update. :param new_status: The new status. :returns: True if the task was found and updated. .. py:method:: propagate_completions(completed_tasks: set[jobmon.client.swarm.task.SwarmTask]) -> list[jobmon.client.swarm.task.SwarmTask] Update downstream dependency counts for completed tasks. :param completed_tasks: Tasks that just completed. :returns: List of tasks that became ready to run (only REGISTERING tasks). .. py:method:: enqueue_task(task: jobmon.client.swarm.task.SwarmTask, front: bool = False) -> None Add a task to the ready_to_run queue. :param task: The task to enqueue. :param front: If True, add to front of queue (higher priority). .. py:method:: dequeue_task() -> Optional[jobmon.client.swarm.task.SwarmTask] Remove and return the next task from the ready_to_run queue. :returns: The next task, or None if queue is empty. .. py:method:: get_cached_resources(resource_hash: int) -> Optional[jobmon.client.task_resources.TaskResources] Get cached TaskResources by hash. :param resource_hash: The hash of the TaskResources. :returns: Cached TaskResources or None if not found. .. py:method:: cache_resources(resource_hash: int, task_resources: jobmon.client.task_resources.TaskResources) -> jobmon.client.task_resources.TaskResources Cache TaskResources and return the cached version. :param resource_hash: The hash of the TaskResources. :param task_resources: The TaskResources to cache. :returns: The cached TaskResources (may be existing if already cached). .. py:method:: compute_initial_upstream_done_counts() -> None Compute initial num_upstreams_done for all tasks. Call this after all tasks and their relationships are registered.