client.swarm.workflow_run ========================= .. py:module:: client.swarm.workflow_run .. autoapi-nested-parse:: Workflow Run is a distributor instance of a declared workflow. This module contains the core scheduling loop that drives task execution, heartbeat management, and state synchronization with the Jobmon server. Attributes ---------- .. autoapisummary:: client.swarm.workflow_run.logger client.swarm.workflow_run.ACTIVE_TASK_STATUSES client.swarm.workflow_run.SERVER_STOP_STATUSES client.swarm.workflow_run.TERMINATING_STATUSES Classes ------- .. autoapisummary:: client.swarm.workflow_run.SwarmCommand client.swarm.workflow_run.WorkflowRun Module Contents --------------- .. py:data:: logger .. py:data:: ACTIVE_TASK_STATUSES :type: tuple[str, Ellipsis] .. py:data:: SERVER_STOP_STATUSES :type: frozenset[str] .. py:data:: TERMINATING_STATUSES :type: tuple[str, Ellipsis] .. py:class:: SwarmCommand(func: Callable[Ellipsis, Awaitable[None]], *args: list[jobmon.client.swarm.swarm_task.SwarmTask], **kwargs: Any) .. py:attribute:: _func .. py:attribute:: _args :value: () .. py:attribute:: _kwargs .. py:method:: __call__() -> None :async: .. py:class:: WorkflowRun(workflow_run_id: int, workflow_run_heartbeat_interval: Optional[int] = None, heartbeat_report_by_buffer: Optional[float] = None, fail_fast: bool = False, wedged_workflow_sync_interval: int = 600, fail_after_n_executions: int = 1000000000, status: Optional[str] = None, requester: Optional[jobmon.core.requester.Requester] = None) WorkflowRun enables tracking for multiple runs of a single Workflow. A Workflow may be started/paused/ and resumed multiple times. Each start or resume represents a new WorkflowRun. In order for a Workflow can be deemed to be DONE (successfully), it must have 1 or more WorkflowRuns. In the current implementation, a Workflow Job may belong to one or more WorkflowRuns, but once the Job reaches a DONE state, it will no longer be added to a subsequent WorkflowRun. However, this is not enforced via any database constraints. .. py:attribute:: workflow_run_id .. py:attribute:: tasks :type: dict[int, jobmon.client.swarm.swarm_task.SwarmTask] .. py:attribute:: arrays :type: dict[int, jobmon.client.swarm.swarm_array.SwarmArray] .. py:attribute:: ready_to_run :type: collections.deque[jobmon.client.swarm.swarm_task.SwarmTask] .. py:attribute:: _task_status_map :type: dict[str, set[jobmon.client.swarm.swarm_task.SwarmTask]] .. py:attribute:: _task_resources :type: dict[int, jobmon.client.task_resources.TaskResources] .. py:attribute:: _status :value: None .. py:attribute:: _last_heartbeat_time .. py:attribute:: fail_fast :value: False .. py:attribute:: wedged_workflow_sync_interval :value: 600 .. py:attribute:: _val_fail_after_n_executions :value: 1000000000 .. py:attribute:: _n_executions :value: 0 .. py:attribute:: requester :value: None .. py:attribute:: _terminated :value: False .. py:attribute:: initialized :value: False .. py:attribute:: _session :type: Optional[aiohttp.ClientSession] :value: None .. py:attribute:: _stop_event :type: Optional[asyncio.Event] :value: None .. py:attribute:: _heartbeat_task :type: Optional[asyncio.Task[None]] :value: None .. py:property:: status :type: Optional[str] Status of the workflow run. .. py:property:: done_tasks :type: list[jobmon.client.swarm.swarm_task.SwarmTask] .. py:property:: failed_tasks :type: list[jobmon.client.swarm.swarm_task.SwarmTask] .. py:property:: active_tasks :type: bool Return True if the workflow run has in-flight or ready-to-run work. Short-circuits to False when the run is already terminal (ERROR/TERMINATED). .. py:method:: _get_active_tasks_count() -> int Return the number of tasks currently in-flight. .. py:method:: _get_ready_to_run_count() -> int Return the number of tasks ready to run. .. py:method:: _decide_run_loop_continue(time_since_last_full_sync: float) -> tuple[bool, float] Decide whether the run loop should continue based on task states. .. py:method:: _has_active_or_ready_tasks() -> bool .. py:method:: _all_tasks_final() -> bool .. py:method:: from_workflow(workflow: jobmon.client.workflow.Workflow) -> None .. py:method:: from_workflow_id(workflow_id: int, edge_chunk_size: int = 500) -> None .. py:method:: set_workflow_metadata(workflow_id: int) -> None Fetch the dag_id and max_concurrently_running parameters of this workflow. .. py:method:: set_tasks_from_db(chunk_size: int = 500) -> None Pull the tasks that need to be run associated with this workflow. I.e. all tasks that aren't in DONE state. .. py:method:: set_downstreams_from_db(chunk_size: int = 500) -> None Pull downstream edges from the database associated with the workflow. .. py:method:: run(distributor_alive_callable: Callable[Ellipsis, bool], seconds_until_timeout: int = 36000, initialize: bool = True) -> None Execute the workflow-run event loop. .. py:method:: _run_async(distributor_alive_callable: Callable[Ellipsis, bool], seconds_until_timeout: int, initialize: bool) -> None :async: Async workflow runner that manages scheduler and heartbeat loops. .. py:method:: _heartbeat_loop() -> None :async: Background task that ensures workflow-run heartbeats are logged. .. py:method:: set_initial_fringe() -> None Populate ready_to_run with tasks whose upstreams are satisfied. .. py:method:: get_swarm_commands() -> Generator[SwarmCommand, None, None] Yield batched queue commands respecting workflow/array concurrency limits. .. py:method:: _get_time_till_next_heartbeat(timeout: Union[int, float], loop_start: float) -> tuple[float, Union[int, float]] A method to calculate the time till the next heartbeat. This method is used to test a bug in FHS where the timeout is not updated. :param timeout: time until we stop processing. -1 means process till no more work :param loop_start: the time the loop started .. py:method:: process_commands(timeout: Union[int, float] = -1) -> None .. py:method:: process_commands_async(timeout: Union[int, float] = -1) -> None :async: Processes swarm commands until all work is done or timeout is reached. :param timeout: time until we stop processing. -1 means process till no more work .. py:method:: synchronize_state(full_sync: bool = False) -> None .. py:method:: synchronize_state_async(full_sync: bool = False) -> None :async: .. py:method:: _refresh_task_status_map(updated_tasks: set[jobmon.client.swarm.swarm_task.SwarmTask]) -> None Re-bucket tasks whose statuses changed and propagate downstream readiness. .. py:method:: _set_status_for_triaging_async() -> None :async: Request server to triage overdue task instances. .. py:method:: _ensure_session() -> aiohttp.ClientSession :async: .. py:method:: _close_session() -> None :async: .. py:method:: _request_async(app_route: str, message: dict[str, Any], request_type: str, tenacious: bool = True) -> tuple[int, Any] :async: .. py:method:: _run_coroutine(coro: Awaitable[Any], cleanup: bool = True) -> Any .. py:method:: _teardown_async() -> None :async: .. py:method:: _log_heartbeat() -> None .. py:method:: _log_heartbeat_async() -> None :async: .. py:method:: _apply_heartbeat_response(response: dict[str, Any]) -> None .. py:method:: _update_status(status: str) -> None Update the status of the workflow_run with whatever status is passed. .. py:method:: _update_status_async(status: str) -> None :async: Async status update helper. .. py:method:: _validate_status_transition(desired_status: str, new_status: str) -> None .. py:method:: _terminate_task_instances_async() -> None :async: Signal the server to terminate all task instances for this workflow run. .. py:method:: _check_fail_after_n_executions() -> None Raise the test hook exception when the execution threshold is met. .. py:method:: _set_fail_after_n_executions(n: int) -> None For use during testing. Force the TaskDag to 'fall over' after n executions, so that the resume case can be tested. In every non-test case, self.fail_after_n_executions will be None, and so the 'fall over' will not be triggered in production. .. py:method:: _get_current_time() -> datetime.datetime .. py:method:: _task_status_updates_async(full_sync: bool = False) -> None :async: Fetch task status changes from the server and update local state. .. py:method:: _apply_task_status_updates(response: dict[str, Any]) -> None .. py:method:: _synchronize_max_concurrently_running_async() -> None :async: Refresh workflow-level max_concurrently_running from server. .. py:method:: _synchronize_array_max_concurrently_running_async() -> None :async: Refresh per-array max_concurrently_running limits from server. .. py:method:: queue_task_batch_async(tasks: list[jobmon.client.swarm.swarm_task.SwarmTask]) -> None :async: .. py:method:: _set_validated_task_resources(task: jobmon.client.swarm.swarm_task.SwarmTask) -> None .. py:method:: _set_adjusted_task_resources(task: jobmon.client.swarm.swarm_task.SwarmTask) -> None Adjust the swarm task's parameters. Use the cluster API to generate the new resources, then bind to input swarmtask.