Source code for distributor.distributor_workflow_run

from __future__ import annotations

import structlog

from jobmon.core.constants import WorkflowRunStatus
from jobmon.core.requester import Requester

[docs] logger = structlog.get_logger(__name__)
[docs] class DistributorWorkflowRun: """Implements workflow level bulk routes and tracks the in memory state on the distributor. when polling from the database we should work in task space and translate into array space in memory where appropriate. when pushing to the database we should work in CommandType (Workflow/Array/Task) space """ def __init__(self, workflow_run_id: int, requester: Requester) -> None: """Initialization of DistributorWorkflowRun object."""
[docs] self.workflow_run_id = workflow_run_id
[docs] self.status = ""
[docs] self.requester = requester
[docs] def _update_status(self, status: str) -> None: """Update the status of the workflow_run with whatever status is passed.""" app_route = f"/workflow_run/{self.workflow_run_id}/update_status" self.requester.send_request( app_route=app_route, message={"status": status}, request_type="put", ) self.status = status
[docs] def transition_to_instantiated(self) -> None: self._update_status(WorkflowRunStatus.INSTANTIATED)
[docs] def transition_to_launched(self) -> None: self._update_status(WorkflowRunStatus.LAUNCHED)
[docs] def __hash__(self) -> int: """Returns the ID of the workflow run.""" return self.workflow_run_id
[docs] def __eq__(self, other: object) -> bool: """Check if the hashes of two WorkflowRuns are equivalent.""" if not isinstance(other, DistributorWorkflowRun): return False else: return hash(self) == hash(other)
[docs] def __lt__(self, other: DistributorWorkflowRun) -> bool: """Check if one hash is less than the hash of another.""" return hash(self) < hash(other)