client.swarm.workflow_run
Workflow Run is a distributor instance of a declared workflow.
Attributes
Classes
WorkflowRun enables tracking for multiple runs of a single Workflow. |
Module Contents
- class client.swarm.workflow_run.SwarmCommand(func: Callable[Ellipsis, None], *args: List[jobmon.client.swarm.swarm_task.SwarmTask], **kwargs: Any)[source]
- class client.swarm.workflow_run.WorkflowRun(workflow_run_id: int, workflow_run_heartbeat_interval: int | None = None, heartbeat_report_by_buffer: float | None = None, fail_fast: bool = False, wedged_workflow_sync_interval: int = 600, fail_after_n_executions: int = 1000000000, status: str | None = None, requester: jobmon.core.requester.Requester | None = None)[source]
WorkflowRun enables tracking for multiple runs of a single Workflow.
A Workflow may be started/paused/ and resumed multiple times. Each start or resume represents a new WorkflowRun.
In order for a Workflow can be deemed to be DONE (successfully), it must have 1 or more WorkflowRuns. In the current implementation, a Workflow Job may belong to one or more WorkflowRuns, but once the Job reaches a DONE state, it will no longer be added to a subsequent WorkflowRun. However, this is not enforced via any database constraints.
- ready_to_run: collections.deque[jobmon.client.swarm.swarm_task.SwarmTask][source]
- property active_tasks: bool[source]
Based on the task status map, does the workflow run have more work or not.
If there are no tasks in active states, the fringe is empty, and therefore we should error out.
- set_workflow_metadata(workflow_id: int) None[source]
Fetch the dag_id and max_concurrently_running parameters of this workflow.
- set_tasks_from_db(chunk_size: int = 500) None[source]
Pull the tasks that need to be run associated with this workflow.
I.e. all tasks that aren’t in DONE state.
- set_downstreams_from_db(chunk_size: int = 500) None[source]
Pull downstream edges from the database associated with the workflow.
- run(distributor_alive_callable: Callable[Ellipsis, bool], seconds_until_timeout: int = 36000, initialize: bool = True) None[source]
Take a concrete DAG and queue al the Tasks that are not DONE.
Uses forward chaining from initial fringe, hence out-of-date is not applied transitively backwards through the graph. It could also use backward chaining from an identified goal node, the effect is identical.
Conceptually: all tasks in registering state w/ finished upstreams are ready_to_run Put tasks in Adjusting state on the ready_to_run queue
- while there are tasks ready_to_run or currently running tasks:
queue all tasks that are ready_to_run wait for some jobs to complete and add downstreams to the ready_to_run queue rinse and repeat
- Parameters:
distributor_alive_callable – callable that checks whether or not the distributor service is still alive.
seconds_until_timeout – how long to block while waiting for the next task to finish before raising an error.
initialize – whether to initialize (update WorkflowRun to RUNNING and set fringe)
- Returns:
None
- get_swarm_commands() Generator[SwarmCommand, None, None][source]
Generator to get next chunk of work to be done. Must be idempotent.
- process_commands(timeout: int | float = -1) None[source]
Processes swarm commands until all work is done or timeout is reached.
- Parameters:
timeout – time until we stop processing. -1 means process till no more work
- _refresh_task_status_map(updated_tasks: Set[jobmon.client.swarm.swarm_task.SwarmTask]) None[source]
- _update_status(status: str) None[source]
Update the status of the workflow_run with whatever status is passed.
- _set_fail_after_n_executions(n: int) None[source]
For use during testing.
Force the TaskDag to ‘fall over’ after n executions, so that the resume case can be tested.
In every non-test case, self.fail_after_n_executions will be None, and so the ‘fall over’ will not be triggered in production.
- _get_current_time() datetime.datetime[source]