Source code for server.workflow_reaper.reaper_workflow_run

"""Reaper Behavior for a given Workflow Run."""

from __future__ import annotations

import logging
from typing import Any

from jobmon.core.requester import Requester
from jobmon.core.serializers import SerializeWorkflowRun


[docs] logger = logging.getLogger(__file__)
[docs] class ReaperWorkflowRun(object): """Reaper Behavior for a given Workflow Run.""" def __init__( self, workflow_run_id: int, workflow_id: int, requester: Requester ) -> None: """Implementing workflow reaper behavior of workflow run. Args: workflow_run_id (int): id of workflow run object from DB. workflow_id (int): id of associated workflow. requester (Requester): requester to communicate with Flask. """
[docs] self.workflow_run_id = workflow_run_id
[docs] self.workflow_id = workflow_id
[docs] self._requester = requester
@classmethod
[docs] def from_wire( cls: Any, wire_tuple: tuple, requester: Requester ) -> ReaperWorkflowRun: """Create Reaper Workflow Run object.""" kwargs = SerializeWorkflowRun.kwargs_from_wire(wire_tuple) return cls( workflow_run_id=kwargs["id"], workflow_id=kwargs["workflow_id"], requester=requester, )
[docs] def reap(self) -> str: """Transition workflow run to error.""" app_route = f"/workflow_run/{self.workflow_run_id}/reap" _, response = self._requester.send_request( app_route=app_route, message={}, request_type="put" ) return response["status"]
[docs] def __repr__(self) -> str: """Return formatted reaper workflow run data.""" return ( f"ReaperWorkflowRun(workflow_run_id={self.workflow_run_id}, " f"workflow_id={self.workflow_id}" )