client.swarm.orchestrator

WorkflowRunOrchestrator: Main event loop coordinator for workflow runs.

This module provides the orchestrator that coordinates all workflow run services (heartbeat, synchronization, scheduling) and manages the main execution loop.

Attributes

logger

Classes

OrchestratorResult

Complete result of orchestrator execution.

WorkflowRunConfig

Configuration for WorkflowRun execution.

OrchestratorConfig

Configuration for the WorkflowRunOrchestrator.

WorkflowRunOrchestrator

Thin coordinator - manages the main event loop.

Module Contents

client.swarm.orchestrator.logger
class client.swarm.orchestrator.OrchestratorResult

Complete result of orchestrator execution.

Contains all information needed by callers after execution, eliminating the need to access mutable state post-run.

final_status: str
elapsed_time: float
total_tasks: int
done_count: int
failed_count: int
num_previously_complete: int
task_final_statuses: dict[int, str]
done_task_ids: frozenset[int]
failed_task_ids: frozenset[int]
error: BaseException | None = None
class client.swarm.orchestrator.WorkflowRunConfig

Configuration for WorkflowRun execution.

Consolidates all configuration parameters into a single object for cleaner construction and easier defaults management.

This is the user-facing configuration class. It maps to OrchestratorConfig internally but provides a simpler interface.

Example:

# Simple usage with defaults
config = WorkflowRunConfig()

# Custom configuration
config = WorkflowRunConfig(
    fail_fast=True,
    heartbeat_interval=60,
)

# Use with run_workflow()
result = run_workflow(workflow, workflow_run_id, distributor.alive, config=config)
heartbeat_interval: int | None = None
heartbeat_report_by_buffer: float | None = None
fail_fast: bool = False
wedged_workflow_sync_interval: int = 600
fail_after_n_executions: int = 1000000000
classmethod from_defaults() WorkflowRunConfig

Create config with all defaults from JobmonConfig.

Returns:

WorkflowRunConfig with default values.

class client.swarm.orchestrator.OrchestratorConfig

Configuration for the WorkflowRunOrchestrator.

heartbeat_interval: float = 30.0
heartbeat_report_by_buffer: float = 1.5
wedged_workflow_sync_interval: float = 600.0
fail_fast: bool = False
timeout: int = 36000
fail_after_n_executions: int | None = None
class client.swarm.orchestrator.WorkflowRunOrchestrator(state: jobmon.client.swarm.state.SwarmState, gateway: jobmon.client.swarm.gateway.ServerGateway, config: OrchestratorConfig)

Thin coordinator - manages the main event loop.

The orchestrator coordinates all workflow run services: - HeartbeatService: Periodic heartbeat logging - Synchronizer: State sync with server - Scheduler: Task batching and queueing

It manages the main execution loop including: - Initialization (set initial fringe, transition to RUNNING) - Main loop with timeout, distributor alive checks, status-based flow control - State propagation (downstream readiness, resource adjustments) - Termination handling (resume signals, task draining) - Error handling with proper cleanup

Usage:

orchestrator = WorkflowRunOrchestrator(state, gateway, config) result = await orchestrator.run(distributor_alive_callable)

Initialize the orchestrator.

Parameters:
  • state – SwarmState containing all workflow run state.

  • gateway – ServerGateway for server communication.

  • config – OrchestratorConfig with settings.

async run(distributor_alive_callable: Callable[Ellipsis, bool]) OrchestratorResult

Execute the main workflow run event loop.

Parameters:

distributor_alive_callable – Callable that returns True if distributor is alive.

Returns:

OrchestratorResult with execution summary.

Raises:

Various exceptions from constraint violations or workflow errors.