client.swarm.workflow_run

Workflow Run is a distributor instance of a declared workflow.

This module contains the core scheduling loop that drives task execution, heartbeat management, and state synchronization with the Jobmon server.

Attributes

logger

ACTIVE_TASK_STATUSES

SERVER_STOP_STATUSES

TERMINATING_STATUSES

Classes

SwarmCommand

WorkflowRun

WorkflowRun enables tracking for multiple runs of a single Workflow.

Module Contents

client.swarm.workflow_run.logger[source]
client.swarm.workflow_run.ACTIVE_TASK_STATUSES: tuple[str, Ellipsis][source]
client.swarm.workflow_run.SERVER_STOP_STATUSES: frozenset[str][source]
client.swarm.workflow_run.TERMINATING_STATUSES: tuple[str, Ellipsis][source]
class client.swarm.workflow_run.SwarmCommand(func: Callable[Ellipsis, Awaitable[None]], *args: list[jobmon.client.swarm.swarm_task.SwarmTask], **kwargs: Any)[source]
_func[source]
_args = ()[source]
_kwargs[source]
async __call__() None[source]
class client.swarm.workflow_run.WorkflowRun(workflow_run_id: int, workflow_run_heartbeat_interval: int | None = None, heartbeat_report_by_buffer: float | None = None, fail_fast: bool = False, wedged_workflow_sync_interval: int = 600, fail_after_n_executions: int = 1000000000, status: str | None = None, requester: jobmon.core.requester.Requester | None = None)[source]

WorkflowRun enables tracking for multiple runs of a single Workflow.

A Workflow may be started/paused/ and resumed multiple times. Each start or resume represents a new WorkflowRun.

In order for a Workflow can be deemed to be DONE (successfully), it must have 1 or more WorkflowRuns. In the current implementation, a Workflow Job may belong to one or more WorkflowRuns, but once the Job reaches a DONE state, it will no longer be added to a subsequent WorkflowRun. However, this is not enforced via any database constraints.

workflow_run_id[source]
tasks: dict[int, jobmon.client.swarm.swarm_task.SwarmTask][source]
arrays: dict[int, jobmon.client.swarm.swarm_array.SwarmArray][source]
ready_to_run: collections.deque[jobmon.client.swarm.swarm_task.SwarmTask][source]
_task_status_map: dict[str, set[jobmon.client.swarm.swarm_task.SwarmTask]][source]
_task_resources: dict[int, jobmon.client.task_resources.TaskResources][source]
_status = None[source]
_last_heartbeat_time[source]
fail_fast = False[source]
wedged_workflow_sync_interval = 600[source]
_val_fail_after_n_executions = 1000000000[source]
_n_executions = 0[source]
requester = None[source]
_terminated = False[source]
initialized = False[source]
_session: aiohttp.ClientSession | None = None[source]
_stop_event: asyncio.Event | None = None[source]
_heartbeat_task: asyncio.Task[None] | None = None[source]
property status: str | None[source]

Status of the workflow run.

property done_tasks: list[jobmon.client.swarm.swarm_task.SwarmTask][source]
property failed_tasks: list[jobmon.client.swarm.swarm_task.SwarmTask][source]
property active_tasks: bool[source]

Return True if the workflow run has in-flight or ready-to-run work.

Short-circuits to False when the run is already terminal (ERROR/TERMINATED).

_get_active_tasks_count() int[source]

Return the number of tasks currently in-flight.

_get_ready_to_run_count() int[source]

Return the number of tasks ready to run.

_decide_run_loop_continue(time_since_last_full_sync: float) tuple[bool, float][source]

Decide whether the run loop should continue based on task states.

_has_active_or_ready_tasks() bool[source]
_all_tasks_final() bool[source]
from_workflow(workflow: jobmon.client.workflow.Workflow) None[source]
from_workflow_id(workflow_id: int, edge_chunk_size: int = 500) None[source]
set_workflow_metadata(workflow_id: int) None[source]

Fetch the dag_id and max_concurrently_running parameters of this workflow.

set_tasks_from_db(chunk_size: int = 500) None[source]

Pull the tasks that need to be run associated with this workflow.

I.e. all tasks that aren’t in DONE state.

set_downstreams_from_db(chunk_size: int = 500) None[source]

Pull downstream edges from the database associated with the workflow.

run(distributor_alive_callable: Callable[Ellipsis, bool], seconds_until_timeout: int = 36000, initialize: bool = True) None[source]

Execute the workflow-run event loop.

async _run_async(distributor_alive_callable: Callable[Ellipsis, bool], seconds_until_timeout: int, initialize: bool) None[source]

Async workflow runner that manages scheduler and heartbeat loops.

async _heartbeat_loop() None[source]

Background task that ensures workflow-run heartbeats are logged.

set_initial_fringe() None[source]

Populate ready_to_run with tasks whose upstreams are satisfied.

get_swarm_commands() Generator[SwarmCommand, None, None][source]

Yield batched queue commands respecting workflow/array concurrency limits.

_get_time_till_next_heartbeat(timeout: int | float, loop_start: float) tuple[float, int | float][source]

A method to calculate the time till the next heartbeat.

This method is used to test a bug in FHS where the timeout is not updated.

Parameters:
  • timeout – time until we stop processing. -1 means process till no more work

  • loop_start – the time the loop started

process_commands(timeout: int | float = -1) None[source]
async process_commands_async(timeout: int | float = -1) None[source]

Processes swarm commands until all work is done or timeout is reached.

Parameters:

timeout – time until we stop processing. -1 means process till no more work

synchronize_state(full_sync: bool = False) None[source]
async synchronize_state_async(full_sync: bool = False) None[source]
_refresh_task_status_map(updated_tasks: set[jobmon.client.swarm.swarm_task.SwarmTask]) None[source]

Re-bucket tasks whose statuses changed and propagate downstream readiness.

async _set_status_for_triaging_async() None[source]

Request server to triage overdue task instances.

async _ensure_session() aiohttp.ClientSession[source]
async _close_session() None[source]
async _request_async(app_route: str, message: dict[str, Any], request_type: str, tenacious: bool = True) tuple[int, Any][source]
_run_coroutine(coro: Awaitable[Any], cleanup: bool = True) Any[source]
async _teardown_async() None[source]
_log_heartbeat() None[source]
async _log_heartbeat_async() None[source]
_apply_heartbeat_response(response: dict[str, Any]) None[source]
_update_status(status: str) None[source]

Update the status of the workflow_run with whatever status is passed.

async _update_status_async(status: str) None[source]

Async status update helper.

_validate_status_transition(desired_status: str, new_status: str) None[source]
async _terminate_task_instances_async() None[source]

Signal the server to terminate all task instances for this workflow run.

_check_fail_after_n_executions() None[source]

Raise the test hook exception when the execution threshold is met.

_set_fail_after_n_executions(n: int) None[source]

For use during testing.

Force the TaskDag to ‘fall over’ after n executions, so that the resume case can be tested.

In every non-test case, self.fail_after_n_executions will be None, and so the ‘fall over’ will not be triggered in production.

_get_current_time() datetime.datetime[source]
async _task_status_updates_async(full_sync: bool = False) None[source]

Fetch task status changes from the server and update local state.

_apply_task_status_updates(response: dict[str, Any]) None[source]
async _synchronize_max_concurrently_running_async() None[source]

Refresh workflow-level max_concurrently_running from server.

async _synchronize_array_max_concurrently_running_async() None[source]

Refresh per-array max_concurrently_running limits from server.

async queue_task_batch_async(tasks: list[jobmon.client.swarm.swarm_task.SwarmTask]) None[source]
_set_validated_task_resources(task: jobmon.client.swarm.swarm_task.SwarmTask) None[source]
_set_adjusted_task_resources(task: jobmon.client.swarm.swarm_task.SwarmTask) None[source]

Adjust the swarm task’s parameters.

Use the cluster API to generate the new resources, then bind to input swarmtask.