server.workflow_reaper.workflow_reaper

Service to monitor and reap dead workflows.

Attributes

logger

Classes

WorkflowReaper

Monitoring and reaping dead workflows.

Module Contents

server.workflow_reaper.workflow_reaper.logger[source]
class server.workflow_reaper.workflow_reaper.WorkflowReaper(poll_interval_seconds: int | None = None, requester: jobmon.core.requester.Requester | None = None, wf_notification_sink: Callable[Ellipsis, None] | None = None)[source]

Bases: object

Monitoring and reaping dead workflows.

_version[source]
_current_starting_row = 0[source]
_reaper_message[source]
_poll_interval_seconds = None[source]
_requester = None[source]
_wf_notification_sink = None[source]
monitor_forever() None[source]

The main part of the Worklow Reaper.

Check if workflow runs should be in ABORTED, SUSPENDED, or ERROR state. Wait and do it again.

_get_wf_name_args(workflow_id: int) Tuple[str, str][source]

Return the workflow name and args associated with a specific workflow_id.

_get_lost_workflow_runs(status: List[str]) List[jobmon.server.workflow_reaper.reaper_workflow_run.ReaperWorkflowRun][source]

Return all workflows that are in a specific state.

_halted_state() str | None[source]

Check if a workflow_run needs to be transitioned to terminated state.

_error_state() str | None[source]

Get lost workflows and register them as error.

_aborted_state() str | None[source]

Find workflows that should be in aborted state.

Get all workflow runs in G state and validate if they should be in A state. Get all lost wfr in L state and set it to A

_inconsistent_status(step_size: int) None[source]

Find wf in F with all tasks in D and fix them.