client.swarm.builder ==================== .. py:module:: client.swarm.builder .. autoapi-nested-parse:: SwarmBuilder: Service for building workflow run state. This module provides the SwarmBuilder service that handles all initialization of workflow run state, including: - Building from an in-memory Workflow object (new runs) - Building from database state (resume scenarios) - Fetching tasks and dependencies in chunks with periodic heartbeats Attributes ---------- .. autoapisummary:: client.swarm.builder.logger Classes ------- .. autoapisummary:: client.swarm.builder.SwarmBuilder Module Contents --------------- .. py:data:: logger .. py:class:: SwarmBuilder(requester: jobmon.core.requester.Requester, workflow_run_id: int, heartbeat_interval: float = 30.0, heartbeat_report_by_buffer: float = 1.5, initial_status: str = WorkflowRunStatus.BOUND) Builds workflow run state from Workflow objects or database. This service handles all initialization logic, extracting it from the WorkflowRun class. It builds fully initialized SwarmState that can be passed directly to the WorkflowRunOrchestrator. Usage: # For new runs (from Workflow object): builder = SwarmBuilder(requester, workflow_run_id) builder.build_from_workflow(workflow) # For resume runs (from database): builder = SwarmBuilder(requester, workflow_run_id) builder.build_from_workflow_id(workflow_id) # Then pass state and gateway to orchestrator: orchestrator = WorkflowRunOrchestrator(builder.state, builder._gateway, config) result = await orchestrator.run(distributor_alive_callable) Initialize the builder. :param requester: The Requester instance for HTTP communication. :param workflow_run_id: The workflow run ID. :param heartbeat_interval: Interval between heartbeats in seconds. :param heartbeat_report_by_buffer: Multiplier for next report time. :param initial_status: Initial workflow run status. .. py:attribute:: requester .. py:attribute:: workflow_run_id .. py:attribute:: heartbeat_interval :value: 30.0 .. py:attribute:: heartbeat_report_by_buffer :value: 1.5 .. py:attribute:: initial_status :value: 'B' .. py:method:: build_from_workflow(workflow: jobmon.client.workflow.Workflow) -> None Build state from an in-memory Workflow object. This is used for new workflow runs where the Workflow object is already constructed in memory. After calling this, access `builder.state` and `builder._gateway` to get the built state. :param workflow: The Workflow object containing tasks and arrays. .. py:method:: build_from_workflow_id(workflow_id: int, edge_chunk_size: int = 500) -> None Build state by fetching from database. This is used for resume scenarios where we need to reconstruct the workflow state from the database. After calling this, access `builder.state` and `builder._gateway` to get the built state. :param workflow_id: The workflow ID to fetch. :param edge_chunk_size: Number of edges to fetch per chunk. .. py:property:: state :type: Optional[jobmon.client.swarm.state.SwarmState] Get the SwarmState (available after build). .. py:property:: tasks :type: dict[int, jobmon.client.swarm.task.SwarmTask] Get the built tasks dictionary. .. py:property:: arrays :type: dict[int, jobmon.client.swarm.array.SwarmArray] Get the built arrays dictionary. .. py:property:: task_status_map :type: dict[str, set[jobmon.client.swarm.task.SwarmTask]] Get the task status map. .. py:property:: workflow_id :type: Optional[int] Get the workflow ID (set after build). .. py:property:: dag_id :type: Optional[int] Get the DAG ID (set after build). .. py:property:: max_concurrently_running :type: int Get max concurrently running limit. .. py:property:: last_sync :type: Optional[datetime.datetime] Get last sync time. .. py:property:: status :type: str Get current status. .. py:property:: num_previously_complete :type: int Get count of tasks that were already complete.