Source code for server.web.models.workflow_run

"""Workflow run database table."""

import datetime

import structlog
from sqlalchemy import Column, DateTime, ForeignKey, Index, Integer, String
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql import func

from jobmon.core.exceptions import InvalidStateTransition
from jobmon.core.logging import set_jobmon_context
from jobmon.core.serializers import SerializeWorkflowRun
from jobmon.server import __version__
from jobmon.server.web._compat import add_time
from jobmon.server.web.models import Base
from jobmon.server.web.models.workflow_run_status import WorkflowRunStatus
from jobmon.server.web.models.workflow_status import WorkflowStatus

[docs] logger = structlog.get_logger(__name__)
[docs] class WorkflowRun(Base): """Database table for recording Workflow Runs."""
[docs] __tablename__ = "workflow_run"
[docs] def to_wire_as_reaper_workflow_run(self) -> tuple: """Serialize workflow run.""" serialized = SerializeWorkflowRun.to_wire( id=self.id, workflow_id=self.workflow_id ) return serialized
[docs] id: Mapped[int] = mapped_column(Integer, primary_key=True)
[docs] workflow_id: Mapped[int] = mapped_column(Integer, ForeignKey("workflow.id"))
[docs] user = Column(String(150))
[docs] jobmon_version = Column(String(150), default="UNKNOWN")
[docs] jobmon_server_version = Column(String(150), default=__version__)
[docs] status: Mapped[str] = mapped_column( String(1), ForeignKey("workflow_run_status.id"), default=WorkflowRunStatus.REGISTERED, nullable=False, )
[docs] created_date = Column(DateTime, default=func.now())
[docs] status_date = mapped_column(DateTime, default=func.now())
[docs] heartbeat_date = mapped_column(DateTime, default=func.now())
[docs] workflow = relationship("Workflow", back_populates="workflow_runs", lazy=True)
[docs] __table_args__ = ( Index("ix_status_version", "status", "jobmon_version", "jobmon_server_version"), )
[docs] valid_transitions = [ # a workflow run is created normally. claimed control of workflow (WorkflowRunStatus.REGISTERED, WorkflowRunStatus.LINKING), # a workflow run is created normally. All tasks are updated in the db # and the workflow run can move to bound state (WorkflowRunStatus.LINKING, WorkflowRunStatus.BOUND), # a workflow run is created normally. Something goes wrong while the # tasks are binding and the workflow run moves to error state (WorkflowRunStatus.LINKING, WorkflowRunStatus.ABORTED), # a workflow run is bound and then moves to instantiating (WorkflowRunStatus.BOUND, WorkflowRunStatus.INSTANTIATED), # a workflow run moves from instantiating to launched (WorkflowRunStatus.INSTANTIATED, WorkflowRunStatus.LAUNCHED), # a workflow run moves from launched to running (WorkflowRunStatus.LAUNCHED, WorkflowRunStatus.RUNNING), # a workflow run can't be launched for some reason. TODO: implement triaging (WorkflowRunStatus.INSTANTIATED, WorkflowRunStatus.ERROR), (WorkflowRunStatus.LAUNCHED, WorkflowRunStatus.ERROR), # a workflow run is bound and then an error occurs before it starts # running (WorkflowRunStatus.BOUND, WorkflowRunStatus.ERROR), # a workflow run is bound and then a new workflow run is created # before the old workflow run moves into running state (WorkflowRunStatus.BOUND, WorkflowRunStatus.COLD_RESUME), (WorkflowRunStatus.BOUND, WorkflowRunStatus.HOT_RESUME), # the workflow starts running normally and finishes successfully (WorkflowRunStatus.RUNNING, WorkflowRunStatus.DONE), # the workflow starts running normally and the user stops it via a # keyboard interrupt (WorkflowRunStatus.RUNNING, WorkflowRunStatus.STOPPED), # the workflow is running and then a new workflow run is created (WorkflowRunStatus.RUNNING, WorkflowRunStatus.COLD_RESUME), (WorkflowRunStatus.RUNNING, WorkflowRunStatus.HOT_RESUME), # the workflow is running and then it's tasks hit errors (WorkflowRunStatus.RUNNING, WorkflowRunStatus.ERROR), # the workflow is set to resume and then it successfully shuts down (WorkflowRunStatus.COLD_RESUME, WorkflowRunStatus.TERMINATED), (WorkflowRunStatus.HOT_RESUME, WorkflowRunStatus.TERMINATED), ]
[docs] untimely_transitions = [ (WorkflowRunStatus.RUNNING, WorkflowRunStatus.RUNNING), (WorkflowRunStatus.LINKING, WorkflowRunStatus.LINKING), ]
[docs] bound_error_states = [WorkflowRunStatus.STOPPED, WorkflowRunStatus.ERROR]
[docs] active_states = [ WorkflowRunStatus.BOUND, WorkflowRunStatus.RUNNING, WorkflowRunStatus.COLD_RESUME, WorkflowRunStatus.HOT_RESUME, ]
@property
[docs] def is_alive(self) -> bool: """Workflow run is in a state that should be registering heartbeats.""" return self.status in [ WorkflowRunStatus.LINKING, WorkflowRunStatus.BOUND, WorkflowRunStatus.RUNNING, WorkflowRunStatus.COLD_RESUME, WorkflowRunStatus.HOT_RESUME, ]
@property
[docs] def is_active(self) -> bool: """Statuses where Workflow Run is active (bound or running).""" return self.status in [WorkflowRunStatus.BOUND, WorkflowRunStatus.RUNNING]
[docs] def terminable(self, current_time: datetime.datetime) -> bool: """Whether a workflowrun can be terminated. A workflowrun can be terminated if it is in Cold/Hot resume state and has missed the last reporting heartbeat. """ return (self.heartbeat_date <= current_time) and ( self.status in (WorkflowRunStatus.COLD_RESUME, WorkflowRunStatus.HOT_RESUME) )
[docs] def heartbeat( self, next_report_increment: float, transition_status: str = WorkflowRunStatus.RUNNING, ) -> None: """Register a heartbeat for the Workflow Run to show it is still alive.""" self.transition(transition_status) self.heartbeat_date = add_time(next_report_increment)
[docs] def reap(self) -> None: """Transition dead workflow runs to a terminal state.""" set_jobmon_context(workflow_run_id=self.id, workflow_id=self.workflow_id) logger.info("Dead workflow_run will be reaped.") if self.status == WorkflowRunStatus.LINKING: logger.debug(f"Transitioning wfr {self.id} to ABORTED") self.transition(WorkflowRunStatus.ABORTED) if self.status in [WorkflowRunStatus.COLD_RESUME, WorkflowRunStatus.HOT_RESUME]: logger.debug(f"Transitioning wfr {self.id} to TERMINATED") self.transition(WorkflowRunStatus.TERMINATED) if self.status == WorkflowRunStatus.RUNNING: logger.debug(f"Transitioning wfr {self.id} to ERROR") self.transition(WorkflowRunStatus.ERROR) logger.info( f"Workflow run transitioned to {self.status}", workflow_run_id=self.id, workflow_id=self.workflow_id, new_status=self.status, )
[docs] def transition(self, new_state: str) -> None: """Transition the Workflow Run's state.""" set_jobmon_context(workflow_run_id=self.id, workflow_id=self.workflow_id) if self._is_timely_transition(new_state): logger.info( f"Workflow run transitioned from {self.status} to {new_state}", workflow_run_id=self.id, workflow_id=self.workflow_id, old_status=self.status, new_status=new_state, ) self._validate_transition(new_state) self.status = new_state self.status_date = func.now() if new_state == WorkflowRunStatus.LINKING: self.workflow.transition(WorkflowStatus.REGISTERING) elif new_state == WorkflowRunStatus.BOUND: self.workflow.transition(WorkflowStatus.QUEUED) elif new_state == WorkflowRunStatus.ABORTED: self.workflow.transition(WorkflowStatus.ABORTED) elif new_state == WorkflowRunStatus.RUNNING: self.workflow.transition(WorkflowStatus.RUNNING) elif new_state == WorkflowRunStatus.DONE: self.workflow.transition(WorkflowStatus.DONE) elif new_state == WorkflowRunStatus.TERMINATED: self.workflow.transition(WorkflowStatus.HALTED) elif new_state in self.bound_error_states: self.workflow.transition(WorkflowStatus.FAILED) elif new_state == WorkflowRunStatus.INSTANTIATED: self.workflow.transition(WorkflowStatus.INSTANTIATING) elif new_state == WorkflowRunStatus.LAUNCHED: self.workflow.transition(WorkflowStatus.LAUNCHED)
[docs] def hot_reset(self) -> None: """Set Workflow Run to Hot Resume.""" set_jobmon_context(workflow_run_id=self.id, workflow_id=self.workflow_id) logger.info( "Workflow run transitioned to HOT_RESUME", workflow_run_id=self.id, workflow_id=self.workflow_id, new_status="HOT_RESUME", ) self.transition(WorkflowRunStatus.HOT_RESUME)
[docs] def cold_reset(self) -> None: """Set Workflow Run to Cold Resume.""" set_jobmon_context(workflow_run_id=self.id, workflow_id=self.workflow_id) logger.info( "Workflow run transitioned to COLD_RESUME", workflow_run_id=self.id, workflow_id=self.workflow_id, new_status="COLD_RESUME", ) self.transition(WorkflowRunStatus.COLD_RESUME)
[docs] def _validate_transition(self, new_state: str) -> None: """Ensure the Job state transition is valid.""" if (self.status, new_state) not in self.valid_transitions: raise InvalidStateTransition("WorkflowRun", self.id, self.status, new_state)
[docs] def _is_timely_transition(self, new_state: str) -> bool: """Check if the transition is invalid due to a race condition.""" set_jobmon_context(workflow_run_id=self.id, workflow_id=self.workflow_id) if (self.status, new_state) in self.untimely_transitions: logger.info( f"Ignoring transition of workflow_run {self.id} " f"from {self.status} to {new_state} (untimely transition)" ) return False else: logger.debug( f"No race condition when transitioning workflow_run from " f"{self.status} to {new_state}" ) return True