client.swarm.workflow_run
Workflow Run is a distributor instance of a declared workflow.
This module contains the core scheduling loop that drives task execution, heartbeat management, and state synchronization with the Jobmon server.
Attributes
Classes
WorkflowRun enables tracking for multiple runs of a single Workflow. |
Module Contents
- class client.swarm.workflow_run.SwarmCommand(func: Callable[Ellipsis, Awaitable[None]], *args: list[jobmon.client.swarm.swarm_task.SwarmTask], **kwargs: Any)[source]
- class client.swarm.workflow_run.WorkflowRun(workflow_run_id: int, workflow_run_heartbeat_interval: int | None = None, heartbeat_report_by_buffer: float | None = None, fail_fast: bool = False, wedged_workflow_sync_interval: int = 600, fail_after_n_executions: int = 1000000000, status: str | None = None, requester: jobmon.core.requester.Requester | None = None)[source]
WorkflowRun enables tracking for multiple runs of a single Workflow.
A Workflow may be started/paused/ and resumed multiple times. Each start or resume represents a new WorkflowRun.
In order for a Workflow can be deemed to be DONE (successfully), it must have 1 or more WorkflowRuns. In the current implementation, a Workflow Job may belong to one or more WorkflowRuns, but once the Job reaches a DONE state, it will no longer be added to a subsequent WorkflowRun. However, this is not enforced via any database constraints.
- ready_to_run: collections.deque[jobmon.client.swarm.swarm_task.SwarmTask][source]
- _stop_event: asyncio.Event | None = None[source]
- _heartbeat_task: asyncio.Task[None] | None = None[source]
- property active_tasks: bool[source]
Return True if the workflow run has in-flight or ready-to-run work.
Short-circuits to False when the run is already terminal (ERROR/TERMINATED).
- _decide_run_loop_continue(time_since_last_full_sync: float) tuple[bool, float][source]
Decide whether the run loop should continue based on task states.
- set_workflow_metadata(workflow_id: int) None[source]
Fetch the dag_id and max_concurrently_running parameters of this workflow.
- set_tasks_from_db(chunk_size: int = 500) None[source]
Pull the tasks that need to be run associated with this workflow.
I.e. all tasks that aren’t in DONE state.
- set_downstreams_from_db(chunk_size: int = 500) None[source]
Pull downstream edges from the database associated with the workflow.
- run(distributor_alive_callable: Callable[Ellipsis, bool], seconds_until_timeout: int = 36000, initialize: bool = True) None[source]
Execute the workflow-run event loop.
- async _run_async(distributor_alive_callable: Callable[Ellipsis, bool], seconds_until_timeout: int, initialize: bool) None[source]
Async workflow runner that manages scheduler and heartbeat loops.
- async _heartbeat_loop() None[source]
Background task that ensures workflow-run heartbeats are logged.
- get_swarm_commands() Generator[SwarmCommand, None, None][source]
Yield batched queue commands respecting workflow/array concurrency limits.
- _get_time_till_next_heartbeat(timeout: int | float, loop_start: float) tuple[float, int | float][source]
A method to calculate the time till the next heartbeat.
This method is used to test a bug in FHS where the timeout is not updated.
- Parameters:
timeout – time until we stop processing. -1 means process till no more work
loop_start – the time the loop started
- async process_commands_async(timeout: int | float = -1) None[source]
Processes swarm commands until all work is done or timeout is reached.
- Parameters:
timeout – time until we stop processing. -1 means process till no more work
- _refresh_task_status_map(updated_tasks: set[jobmon.client.swarm.swarm_task.SwarmTask]) None[source]
Re-bucket tasks whose statuses changed and propagate downstream readiness.
- async _set_status_for_triaging_async() None[source]
Request server to triage overdue task instances.
- async _request_async(app_route: str, message: dict[str, Any], request_type: str, tenacious: bool = True) tuple[int, Any][source]
- _update_status(status: str) None[source]
Update the status of the workflow_run with whatever status is passed.
- async _terminate_task_instances_async() None[source]
Signal the server to terminate all task instances for this workflow run.
- _check_fail_after_n_executions() None[source]
Raise the test hook exception when the execution threshold is met.
- _set_fail_after_n_executions(n: int) None[source]
For use during testing.
Force the TaskDag to ‘fall over’ after n executions, so that the resume case can be tested.
In every non-test case, self.fail_after_n_executions will be None, and so the ‘fall over’ will not be triggered in production.
- _get_current_time() datetime.datetime[source]
- async _task_status_updates_async(full_sync: bool = False) None[source]
Fetch task status changes from the server and update local state.
- async _synchronize_max_concurrently_running_async() None[source]
Refresh workflow-level max_concurrently_running from server.