client.swarm.builder
SwarmBuilder: Service for building workflow run state.
This module provides the SwarmBuilder service that handles all initialization of workflow run state, including: - Building from an in-memory Workflow object (new runs) - Building from database state (resume scenarios) - Fetching tasks and dependencies in chunks with periodic heartbeats
Attributes
Classes
Builds workflow run state from Workflow objects or database. |
Module Contents
- client.swarm.builder.logger
- class client.swarm.builder.SwarmBuilder(requester: jobmon.core.requester.Requester, workflow_run_id: int, heartbeat_interval: float = 30.0, heartbeat_report_by_buffer: float = 1.5, initial_status: str = WorkflowRunStatus.BOUND)
Builds workflow run state from Workflow objects or database.
This service handles all initialization logic, extracting it from the WorkflowRun class. It builds fully initialized SwarmState that can be passed directly to the WorkflowRunOrchestrator.
- Usage:
# For new runs (from Workflow object): builder = SwarmBuilder(requester, workflow_run_id) builder.build_from_workflow(workflow)
# For resume runs (from database): builder = SwarmBuilder(requester, workflow_run_id) builder.build_from_workflow_id(workflow_id)
# Then pass state and gateway to orchestrator: orchestrator = WorkflowRunOrchestrator(builder.state, builder._gateway, config) result = await orchestrator.run(distributor_alive_callable)
Initialize the builder.
- Parameters:
requester – The Requester instance for HTTP communication.
workflow_run_id – The workflow run ID.
heartbeat_interval – Interval between heartbeats in seconds.
heartbeat_report_by_buffer – Multiplier for next report time.
initial_status – Initial workflow run status.
- requester
- workflow_run_id
- heartbeat_interval = 30.0
- heartbeat_report_by_buffer = 1.5
- initial_status = 'B'
- build_from_workflow(workflow: jobmon.client.workflow.Workflow) None
Build state from an in-memory Workflow object.
This is used for new workflow runs where the Workflow object is already constructed in memory. After calling this, access builder.state and builder._gateway to get the built state.
- Parameters:
workflow – The Workflow object containing tasks and arrays.
- build_from_workflow_id(workflow_id: int, edge_chunk_size: int = 500) None
Build state by fetching from database.
This is used for resume scenarios where we need to reconstruct the workflow state from the database. After calling this, access builder.state and builder._gateway to get the built state.
- Parameters:
workflow_id – The workflow ID to fetch.
edge_chunk_size – Number of edges to fetch per chunk.
- property state: jobmon.client.swarm.state.SwarmState | None
Get the SwarmState (available after build).
- property task_status_map: dict[str, set[jobmon.client.swarm.task.SwarmTask]]
Get the task status map.
- property last_sync: datetime.datetime | None
Get last sync time.