from __future__ import annotations
import structlog
from jobmon.core.constants import WorkflowRunStatus
from jobmon.core.requester import Requester
[docs]
logger = structlog.get_logger(__name__)
[docs]
class DistributorWorkflowRun:
"""Implements workflow level bulk routes and tracks the in memory state on the distributor.
when polling from the database we should work in task space and translate into array
space in memory where appropriate.
when pushing to the database we should work in CommandType (Workflow/Array/Task) space
"""
def __init__(self, workflow_run_id: int, requester: Requester) -> None:
"""Initialization of DistributorWorkflowRun object."""
[docs]
self.workflow_run_id = workflow_run_id
[docs]
self.requester = requester
[docs]
def _update_status(self, status: str) -> None:
"""Update the status of the workflow_run with whatever status is passed."""
app_route = f"/workflow_run/{self.workflow_run_id}/update_status"
self.requester.send_request(
app_route=app_route,
message={"status": status},
request_type="put",
)
self.status = status
[docs]
def transition_to_instantiated(self) -> None:
self._update_status(WorkflowRunStatus.INSTANTIATED)
[docs]
def transition_to_launched(self) -> None:
self._update_status(WorkflowRunStatus.LAUNCHED)
[docs]
def __hash__(self) -> int:
"""Returns the ID of the workflow run."""
return self.workflow_run_id
[docs]
def __eq__(self, other: object) -> bool:
"""Check if the hashes of two WorkflowRuns are equivalent."""
if not isinstance(other, DistributorWorkflowRun):
return False
else:
return hash(self) == hash(other)
[docs]
def __lt__(self, other: DistributorWorkflowRun) -> bool:
"""Check if one hash is less than the hash of another."""
return hash(self) < hash(other)