client.swarm.orchestrator ========================= .. py:module:: client.swarm.orchestrator .. autoapi-nested-parse:: WorkflowRunOrchestrator: Main event loop coordinator for workflow runs. This module provides the orchestrator that coordinates all workflow run services (heartbeat, synchronization, scheduling) and manages the main execution loop. Attributes ---------- .. autoapisummary:: client.swarm.orchestrator.logger Classes ------- .. autoapisummary:: client.swarm.orchestrator.OrchestratorResult client.swarm.orchestrator.WorkflowRunConfig client.swarm.orchestrator.OrchestratorConfig client.swarm.orchestrator.WorkflowRunOrchestrator Module Contents --------------- .. py:data:: logger .. py:class:: OrchestratorResult Complete result of orchestrator execution. Contains all information needed by callers after execution, eliminating the need to access mutable state post-run. .. py:attribute:: final_status :type: str .. py:attribute:: elapsed_time :type: float .. py:attribute:: total_tasks :type: int .. py:attribute:: done_count :type: int .. py:attribute:: failed_count :type: int .. py:attribute:: num_previously_complete :type: int .. py:attribute:: task_final_statuses :type: dict[int, str] .. py:attribute:: done_task_ids :type: frozenset[int] .. py:attribute:: failed_task_ids :type: frozenset[int] .. py:attribute:: error :type: Optional[BaseException] :value: None .. py:class:: WorkflowRunConfig Configuration for WorkflowRun execution. Consolidates all configuration parameters into a single object for cleaner construction and easier defaults management. This is the user-facing configuration class. It maps to OrchestratorConfig internally but provides a simpler interface. Example:: # Simple usage with defaults config = WorkflowRunConfig() # Custom configuration config = WorkflowRunConfig( fail_fast=True, heartbeat_interval=60, ) # Use with run_workflow() result = run_workflow(workflow, workflow_run_id, distributor.alive, config=config) .. py:attribute:: heartbeat_interval :type: Optional[int] :value: None .. py:attribute:: heartbeat_report_by_buffer :type: Optional[float] :value: None .. py:attribute:: fail_fast :type: bool :value: False .. py:attribute:: wedged_workflow_sync_interval :type: int :value: 600 .. py:attribute:: fail_after_n_executions :type: int :value: 1000000000 .. py:method:: from_defaults() -> WorkflowRunConfig :classmethod: Create config with all defaults from JobmonConfig. :returns: WorkflowRunConfig with default values. .. py:class:: OrchestratorConfig Configuration for the WorkflowRunOrchestrator. .. py:attribute:: heartbeat_interval :type: float :value: 30.0 .. py:attribute:: heartbeat_report_by_buffer :type: float :value: 1.5 .. py:attribute:: wedged_workflow_sync_interval :type: float :value: 600.0 .. py:attribute:: fail_fast :type: bool :value: False .. py:attribute:: timeout :type: int :value: 36000 .. py:attribute:: fail_after_n_executions :type: Optional[int] :value: None .. py:class:: WorkflowRunOrchestrator(state: jobmon.client.swarm.state.SwarmState, gateway: jobmon.client.swarm.gateway.ServerGateway, config: OrchestratorConfig) Thin coordinator - manages the main event loop. The orchestrator coordinates all workflow run services: - HeartbeatService: Periodic heartbeat logging - Synchronizer: State sync with server - Scheduler: Task batching and queueing It manages the main execution loop including: - Initialization (set initial fringe, transition to RUNNING) - Main loop with timeout, distributor alive checks, status-based flow control - State propagation (downstream readiness, resource adjustments) - Termination handling (resume signals, task draining) - Error handling with proper cleanup Usage: orchestrator = WorkflowRunOrchestrator(state, gateway, config) result = await orchestrator.run(distributor_alive_callable) Initialize the orchestrator. :param state: SwarmState containing all workflow run state. :param gateway: ServerGateway for server communication. :param config: OrchestratorConfig with settings. .. py:method:: run(distributor_alive_callable: Callable[Ellipsis, bool]) -> OrchestratorResult :async: Execute the main workflow run event loop. :param distributor_alive_callable: Callable that returns True if distributor is alive. :returns: OrchestratorResult with execution summary. :raises Various exceptions from constraint violations or workflow errors.: