client.swarm.builder

SwarmBuilder: Service for building workflow run state.

This module provides the SwarmBuilder service that handles all initialization of workflow run state, including: - Building from an in-memory Workflow object (new runs) - Building from database state (resume scenarios) - Fetching tasks and dependencies in chunks with periodic heartbeats

Attributes

logger

Classes

SwarmBuilder

Builds workflow run state from Workflow objects or database.

Module Contents

client.swarm.builder.logger
class client.swarm.builder.SwarmBuilder(requester: jobmon.core.requester.Requester, workflow_run_id: int, heartbeat_interval: float = 30.0, heartbeat_report_by_buffer: float = 1.5, initial_status: str = WorkflowRunStatus.BOUND)

Builds workflow run state from Workflow objects or database.

This service handles all initialization logic, extracting it from the WorkflowRun class. It builds fully initialized SwarmState that can be passed directly to the WorkflowRunOrchestrator.

Usage:

# For new runs (from Workflow object): builder = SwarmBuilder(requester, workflow_run_id) builder.build_from_workflow(workflow)

# For resume runs (from database): builder = SwarmBuilder(requester, workflow_run_id) builder.build_from_workflow_id(workflow_id)

# Then pass state and gateway to orchestrator: orchestrator = WorkflowRunOrchestrator(builder.state, builder._gateway, config) result = await orchestrator.run(distributor_alive_callable)

Initialize the builder.

Parameters:
  • requester – The Requester instance for HTTP communication.

  • workflow_run_id – The workflow run ID.

  • heartbeat_interval – Interval between heartbeats in seconds.

  • heartbeat_report_by_buffer – Multiplier for next report time.

  • initial_status – Initial workflow run status.

requester
workflow_run_id
heartbeat_interval = 30.0
heartbeat_report_by_buffer = 1.5
initial_status = 'B'
build_from_workflow(workflow: jobmon.client.workflow.Workflow) None

Build state from an in-memory Workflow object.

This is used for new workflow runs where the Workflow object is already constructed in memory. After calling this, access builder.state and builder._gateway to get the built state.

Parameters:

workflow – The Workflow object containing tasks and arrays.

build_from_workflow_id(workflow_id: int, edge_chunk_size: int = 500) None

Build state by fetching from database.

This is used for resume scenarios where we need to reconstruct the workflow state from the database. After calling this, access builder.state and builder._gateway to get the built state.

Parameters:
  • workflow_id – The workflow ID to fetch.

  • edge_chunk_size – Number of edges to fetch per chunk.

property state: jobmon.client.swarm.state.SwarmState | None

Get the SwarmState (available after build).

property tasks: dict[int, jobmon.client.swarm.task.SwarmTask]

Get the built tasks dictionary.

property arrays: dict[int, jobmon.client.swarm.array.SwarmArray]

Get the built arrays dictionary.

property task_status_map: dict[str, set[jobmon.client.swarm.task.SwarmTask]]

Get the task status map.

property workflow_id: int | None

Get the workflow ID (set after build).

property dag_id: int | None

Get the DAG ID (set after build).

property max_concurrently_running: int

Get max concurrently running limit.

property last_sync: datetime.datetime | None

Get last sync time.

property status: str

Get current status.

property num_previously_complete: int

Get count of tasks that were already complete.