client.swarm.workflow_run ========================= .. py:module:: client.swarm.workflow_run .. autoapi-nested-parse:: Workflow Run is a distributor instance of a declared workflow. Attributes ---------- .. autoapisummary:: client.swarm.workflow_run.logger Classes ------- .. autoapisummary:: client.swarm.workflow_run.SwarmCommand client.swarm.workflow_run.WorkflowRun Module Contents --------------- .. py:data:: logger .. py:class:: SwarmCommand(func: Callable[Ellipsis, None], *args: List[jobmon.client.swarm.swarm_task.SwarmTask], **kwargs: Any) .. py:attribute:: _func .. py:attribute:: _args :value: () .. py:attribute:: _kwargs .. py:method:: __call__() -> None .. py:class:: WorkflowRun(workflow_run_id: int, workflow_run_heartbeat_interval: Optional[int] = None, heartbeat_report_by_buffer: Optional[float] = None, fail_fast: bool = False, wedged_workflow_sync_interval: int = 600, fail_after_n_executions: int = 1000000000, status: Optional[str] = None, requester: Optional[jobmon.core.requester.Requester] = None) WorkflowRun enables tracking for multiple runs of a single Workflow. A Workflow may be started/paused/ and resumed multiple times. Each start or resume represents a new WorkflowRun. In order for a Workflow can be deemed to be DONE (successfully), it must have 1 or more WorkflowRuns. In the current implementation, a Workflow Job may belong to one or more WorkflowRuns, but once the Job reaches a DONE state, it will no longer be added to a subsequent WorkflowRun. However, this is not enforced via any database constraints. .. py:attribute:: workflow_run_id .. py:attribute:: _active_states .. py:attribute:: tasks :type: Dict[int, jobmon.client.swarm.swarm_task.SwarmTask] .. py:attribute:: arrays :type: Dict[int, jobmon.client.swarm.swarm_array.SwarmArray] .. py:attribute:: ready_to_run :type: collections.deque[jobmon.client.swarm.swarm_task.SwarmTask] .. py:attribute:: _task_status_map :type: Dict[str, Set[jobmon.client.swarm.swarm_task.SwarmTask]] .. py:attribute:: _task_resources :type: Dict[int, jobmon.client.task_resources.TaskResources] .. py:attribute:: _status :value: None .. py:attribute:: _last_heartbeat_time .. py:attribute:: fail_fast :value: False .. py:attribute:: wedged_workflow_sync_interval :value: 600 .. py:attribute:: _val_fail_after_n_executions :value: 1000000000 .. py:attribute:: _n_executions :value: 0 .. py:attribute:: requester :value: None .. py:attribute:: _terminated :value: False .. py:attribute:: initialized :value: False .. py:property:: status :type: Optional[str] Status of the workflow run. .. py:property:: done_tasks :type: List[jobmon.client.swarm.swarm_task.SwarmTask] .. py:property:: failed_tasks :type: List[jobmon.client.swarm.swarm_task.SwarmTask] .. py:property:: active_tasks :type: bool Based on the task status map, does the workflow run have more work or not. If there are no tasks in active states, the fringe is empty, and therefore we should error out. .. py:method:: _get_active_tasks_count() -> int Return the number of active tasks. .. py:method:: _get_ready_to_run_count() -> int Return the number of tasks ready to run. .. py:method:: _decide_run_loop_continue(time_since_last_full_sync: float) -> tuple[bool, float] Decide whether the run loop should continue based on task states. :returns: (should_continue, updated_time_since_last_full_sync) :rtype: tuple .. py:method:: from_workflow(workflow: jobmon.client.workflow.Workflow) -> None .. py:method:: from_workflow_id(workflow_id: int, edge_chunk_size: int = 500) -> None .. py:method:: set_workflow_metadata(workflow_id: int) -> None Fetch the dag_id and max_concurrently_running parameters of this workflow. .. py:method:: set_tasks_from_db(chunk_size: int = 500) -> None Pull the tasks that need to be run associated with this workflow. I.e. all tasks that aren't in DONE state. .. py:method:: set_downstreams_from_db(chunk_size: int = 500) -> None Pull downstream edges from the database associated with the workflow. .. py:method:: run(distributor_alive_callable: Callable[Ellipsis, bool], seconds_until_timeout: int = 36000, initialize: bool = True) -> None Take a concrete DAG and queue al the Tasks that are not DONE. Uses forward chaining from initial fringe, hence out-of-date is not applied transitively backwards through the graph. It could also use backward chaining from an identified goal node, the effect is identical. Conceptually: all tasks in registering state w/ finished upstreams are ready_to_run Put tasks in Adjusting state on the ready_to_run queue while there are tasks ready_to_run or currently running tasks: queue all tasks that are ready_to_run wait for some jobs to complete and add downstreams to the ready_to_run queue rinse and repeat :param distributor_alive_callable: callable that checks whether or not the distributor service is still alive. :param seconds_until_timeout: how long to block while waiting for the next task to finish before raising an error. :param initialize: whether to initialize (update WorkflowRun to RUNNING and set fringe) :returns: None .. py:method:: set_initial_fringe() -> None Set initial fringe. .. py:method:: get_swarm_commands() -> Generator[SwarmCommand, None, None] Generator to get next chunk of work to be done. Must be idempotent. .. py:method:: _get_time_till_next_heartbeat(timeout: Union[int, float], loop_start: float) -> Tuple[float, Union[int, float]] A method to calculate the time till the next heartbeat. This method is used to test a bug in FHS where the timeout is not updated. :param timeout: time until we stop processing. -1 means process till no more work :param loop_start: the time the loop started .. py:method:: process_commands(timeout: Union[int, float] = -1) -> None Processes swarm commands until all work is done or timeout is reached. :param timeout: time until we stop processing. -1 means process till no more work .. py:method:: synchronize_state(full_sync: bool = False) -> None .. py:method:: _refresh_task_status_map(updated_tasks: Set[jobmon.client.swarm.swarm_task.SwarmTask]) -> None .. py:method:: _set_status_for_triaging() -> None .. py:method:: _log_heartbeat() -> None .. py:method:: _update_status(status: str) -> None Update the status of the workflow_run with whatever status is passed. .. py:method:: _terminate_task_instances() -> None Terminate the workflow run. .. py:method:: _set_fail_after_n_executions(n: int) -> None For use during testing. Force the TaskDag to 'fall over' after n executions, so that the resume case can be tested. In every non-test case, self.fail_after_n_executions will be None, and so the 'fall over' will not be triggered in production. .. py:method:: _get_current_time() -> datetime.datetime .. py:method:: _task_status_updates(full_sync: bool = False) -> None Update internal state of tasks to match the database. If no tasks are specified, get all tasks. .. py:method:: _synchronize_max_concurrently_running() -> None .. py:method:: _synchronize_array_max_concurrently_running() -> None .. py:method:: queue_task_batch(tasks: List[jobmon.client.swarm.swarm_task.SwarmTask]) -> None .. py:method:: _set_validated_task_resources(task: jobmon.client.swarm.swarm_task.SwarmTask) -> None .. py:method:: _set_adjusted_task_resources(task: jobmon.client.swarm.swarm_task.SwarmTask) -> None Adjust the swarm task's parameters. Use the cluster API to generate the new resources, then bind to input swarmtask.