Source code for server.web.models.workflow

"""Workflow Database Table."""

import datetime
from typing import Tuple

import structlog
from sqlalchemy import VARCHAR, Column, DateTime, ForeignKey, Integer, String, Text
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql import func

from jobmon.core.exceptions import InvalidStateTransition
from jobmon.core.serializers import SerializeDistributorWorkflow
from jobmon.server.web.models import Base
from jobmon.server.web.models.workflow_run import WorkflowRun
from jobmon.server.web.models.workflow_run_status import WorkflowRunStatus
from jobmon.server.web.models.workflow_status import WorkflowStatus

[docs] logger = structlog.get_logger(__name__)
[docs] class Workflow(Base): """Workflow Database Table."""
[docs] __tablename__ = "workflow"
[docs] def to_wire_as_distributor_workflow(self) -> tuple: """Serialize workflow object.""" serialized = SerializeDistributorWorkflow.to_wire( workflow_id=self.id, dag_id=self.dag_id, max_concurrently_running=self.max_concurrently_running, ) return serialized
[docs] id: Mapped[int] = mapped_column(Integer, primary_key=True)
[docs] tool_version_id = Column(Integer, ForeignKey("tool_version.id"))
[docs] dag_id: Mapped[int] = mapped_column(Integer, ForeignKey("dag.id"))
[docs] workflow_args_hash = Column(VARCHAR(150), index=True, nullable=False)
[docs] task_hash = Column(VARCHAR(150), index=True, nullable=False)
[docs] description = Column(Text)
[docs] name = Column(String(255), nullable=True)
[docs] workflow_args = Column(Text)
[docs] max_concurrently_running: Mapped[int] = mapped_column(Integer)
[docs] created_date = mapped_column(DateTime, default=None)
[docs] status_date = mapped_column(DateTime, default=func.now())
[docs] status: Mapped[str] = mapped_column( String(1), ForeignKey("workflow_status.id"), default=WorkflowStatus.REGISTERING, nullable=False, )
[docs] dag = relationship("Dag", back_populates="workflow", lazy=True)
[docs] workflow_runs = relationship("WorkflowRun", back_populates="workflow", lazy=True)
[docs] valid_transitions = [ # normal progression from registered to a workflow run has been fully bound (WorkflowStatus.REGISTERING, WorkflowStatus.QUEUED), # workflow encountered an error before a workflow run was created. (WorkflowStatus.REGISTERING, WorkflowStatus.ABORTED), # a workflow aborted during task creation. new workflow launched, found # existing workflow id and is creating a new workflow run (WorkflowStatus.ABORTED, WorkflowStatus.REGISTERING), # new workflow run created that resumes old failed workflow run (WorkflowStatus.FAILED, WorkflowStatus.REGISTERING), # new workflow run created that resumes old suspended workflow run (WorkflowStatus.HALTED, WorkflowStatus.REGISTERING), # Workflow is instantiating (WorkflowStatus.QUEUED, WorkflowStatus.INSTANTIATING), # Workflow fails before it's instantiated (WorkflowStatus.QUEUED, WorkflowStatus.FAILED), # workflow is launched. normal happy path (WorkflowStatus.INSTANTIATING, WorkflowStatus.LAUNCHED), # workflow can't launch for some reason. TODO: Implement triaging (WorkflowStatus.INSTANTIATING, WorkflowStatus.FAILED), (WorkflowStatus.LAUNCHED, WorkflowStatus.FAILED), # workflow runs. normal happy path (WorkflowStatus.LAUNCHED, WorkflowStatus.RUNNING), # workflow run was running and then got moved to a resume state (WorkflowStatus.RUNNING, WorkflowStatus.HALTED), # workflow run was bound, and then got moved to a resume state (WorkflowStatus.QUEUED, WorkflowStatus.HALTED), # workflow run was running and then completed successfully (WorkflowStatus.RUNNING, WorkflowStatus.DONE), # workflow run was running and then failed with an error (WorkflowStatus.RUNNING, WorkflowStatus.FAILED), ]
[docs] untimely_transitions = [(WorkflowStatus.REGISTERING, WorkflowStatus.REGISTERING)]
[docs] def transition(self, new_state: str) -> None: """Transition the state of the workflow.""" if self._is_timely_transition(new_state): logger.info(f"Transitioning workflow_id from {self.status} to {new_state}") self._validate_transition(new_state) self.status = new_state self.status_date = func.now()
[docs] def _validate_transition(self, new_state: str) -> None: """Ensure the Job state transition is valid.""" if (self.status, new_state) not in self.valid_transitions: raise InvalidStateTransition("Workflow", self.id, self.status, new_state)
[docs] def _is_timely_transition(self, new_state: str) -> bool: """Check if the transition is invalid due to a race condition.""" if (self.status, new_state) in self.untimely_transitions: return False else: return True
[docs] def reset(self, current_time: datetime.datetime) -> None: """Set a workflow to a resumable state.""" # Terminate the first existing workflowrun that is in cold resume state for workflow_run in self.workflow_runs: if workflow_run.terminable(current_time=current_time): workflow_run.transition(WorkflowRunStatus.TERMINATED) break # Bypass the FSM, since we should be able to reset workflows in weird states. self.status = WorkflowStatus.REGISTERING self.status_date = func.now()
[docs] def resume(self, reset_running_jobs: bool) -> None: """Resume a workflow.""" # bind_to_logger(workflow_id=self.id) logger.info("Resume workflow") for workflow_run in self.workflow_runs: if workflow_run.is_active: if reset_running_jobs: workflow_run.cold_reset() else: workflow_run.hot_reset() break
@property @property
[docs] def is_resumable(self) -> bool: """Is this workflow resumable.""" wfrs_active = any([wfr.is_alive for wfr in self.workflow_runs]) done_binding = self.created_date is not None return done_binding and not wfrs_active