"""Reaper Behavior for a given Workflow Run."""
from __future__ import annotations
import logging
from typing import Any
from jobmon.core.requester import Requester
from jobmon.core.serializers import SerializeWorkflowRun
[docs]
logger = logging.getLogger(__file__)
[docs]
class ReaperWorkflowRun(object):
"""Reaper Behavior for a given Workflow Run."""
def __init__(
self, workflow_run_id: int, workflow_id: int, requester: Requester
) -> None:
"""Implementing workflow reaper behavior of workflow run.
Args:
workflow_run_id (int): id of workflow run object from DB.
workflow_id (int): id of associated workflow.
requester (Requester): requester to communicate with Flask.
"""
[docs]
self.workflow_run_id = workflow_run_id
[docs]
self.workflow_id = workflow_id
[docs]
self._requester = requester
@classmethod
[docs]
def from_wire(
cls: Any, wire_tuple: tuple, requester: Requester
) -> ReaperWorkflowRun:
"""Create Reaper Workflow Run object."""
kwargs = SerializeWorkflowRun.kwargs_from_wire(wire_tuple)
return cls(
workflow_run_id=kwargs["id"],
workflow_id=kwargs["workflow_id"],
requester=requester,
)
[docs]
def reap(self) -> str:
"""Transition workflow run to error."""
app_route = f"/workflow_run/{self.workflow_run_id}/reap"
_, response = self._requester.send_request(
app_route=app_route, message={}, request_type="put"
)
return response["status"]
[docs]
def __repr__(self) -> str:
"""Return formatted reaper workflow run data."""
return (
f"ReaperWorkflowRun(workflow_run_id={self.workflow_run_id}, "
f"workflow_id={self.workflow_id}"
)