"""Task Instance Database Table."""
import json
from typing import Tuple
import structlog
from sqlalchemy import Column, DateTime, ForeignKey, Index, Integer, String, Text
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql import func
from jobmon.core.exceptions import InvalidStateTransition
from jobmon.core.serializers import SerializeTaskInstance
from jobmon.server.web.models import Base
from jobmon.server.web.models.task_instance_status import TaskInstanceStatus
from jobmon.server.web.models.task_status import TaskStatus
# new structlog logger per flask request context. internally stored as flask.g.logger
[docs]
logger = structlog.get_logger(__name__)
[docs]
class TaskInstance(Base):
"""Task Instance Database Table."""
[docs]
__tablename__ = "task_instance"
[docs]
def to_wire_as_distributor_task_instance(self) -> Tuple:
"""Serialize task instance object."""
return SerializeTaskInstance.to_wire_distributor(
self.id,
self.task_id,
self.workflow_run_id,
self.task.workflow_id,
self.status,
self.distributor_id,
self.task_resources_id,
self.array_id,
self.array_batch_num,
self.array_step_id,
)
[docs]
def to_wire_as_worker_node_task_instance(self) -> Tuple:
"""Serialize task instance object."""
requested_resources = json.loads(self.task_resources.requested_resources)
return SerializeTaskInstance.to_wire_worker_node(
self.id,
self.status,
self.workflow_run_id,
self.task.id,
self.task.name,
self.task.command,
self.task.workflow_id,
requested_resources.get("stdout"),
requested_resources.get("stderr"),
)
[docs]
id: Mapped[int] = mapped_column(Integer, primary_key=True)
[docs]
workflow_run_id: Mapped[int] = mapped_column(Integer, ForeignKey("workflow_run.id"))
[docs]
array_id: Mapped[int] = mapped_column(Integer, ForeignKey("array.id"), default=None)
[docs]
task_id: Mapped[int] = mapped_column(Integer, ForeignKey("task.id"), nullable=False)
[docs]
distributor_id: Mapped[str] = mapped_column(String(20), index=True, nullable=True)
[docs]
task_resources_id: Mapped[int] = mapped_column(
Integer, ForeignKey("task_resources.id"), index=True
)
[docs]
array_batch_num: Mapped[int] = mapped_column(Integer, index=True)
[docs]
array_step_id: Mapped[int] = mapped_column(Integer, index=True)
# usage
[docs]
nodename = Column(String(150))
[docs]
process_group_id = Column(Integer)
[docs]
usage_str = Column(String(250))
[docs]
wallclock = Column(String(50))
[docs]
maxpss = Column(String(50))
[docs]
cpu = Column(String(50))
[docs]
io = Column(String(50))
[docs]
stdout = Column(String(2048))
[docs]
stderr = Column(String(2048))
[docs]
stdout_log = Column(Text)
[docs]
stderr_log = Column(Text)
# status/state
[docs]
status: Mapped[str] = mapped_column(
String(1),
ForeignKey("task_instance_status.id"),
default=TaskInstanceStatus.QUEUED,
nullable=False,
)
[docs]
submitted_date = Column(DateTime)
[docs]
status_date = mapped_column(DateTime, default=func.now())
[docs]
report_by_date = Column(DateTime)
# ORM relationships
[docs]
task = relationship("Task", back_populates="task_instances")
[docs]
errors = relationship("TaskInstanceErrorLog", back_populates="task_instance")
[docs]
task_resources = relationship("TaskResources")
[docs]
__table_args__ = (
Index(
"ix_array_batch_index",
"array_id",
"array_batch_num",
"array_step_id",
),
Index("ix_status_status_date", "status", "status_date"),
)
# finite state machine transition information
[docs]
valid_transitions = [
# task instance is moved from queued to instantiated by distributor
(TaskInstanceStatus.QUEUED, TaskInstanceStatus.INSTANTIATED),
# task instance is queued and waiting to instantiate when a new workflow run starts and
# tells it to die
(TaskInstanceStatus.QUEUED, TaskInstanceStatus.KILL_SELF),
# task instance is launched by distributor
(TaskInstanceStatus.INSTANTIATED, TaskInstanceStatus.LAUNCHED),
# task instance submission hit weird bug and didn't get an distributor_id
(TaskInstanceStatus.INSTANTIATED, TaskInstanceStatus.NO_DISTRIBUTOR_ID),
# task instance is mid submission and a new workflow run starts and
# tells it to die
(TaskInstanceStatus.INSTANTIATED, TaskInstanceStatus.KILL_SELF),
# task instance logs running before submitted due to race condition
(TaskInstanceStatus.INSTANTIATED, TaskInstanceStatus.RUNNING),
# task instance running after transitioning from launched
(TaskInstanceStatus.LAUNCHED, TaskInstanceStatus.RUNNING),
# task instance disappeared from distributor heartbeat and never logged
# running. The distributor has no accounting of why it died
(TaskInstanceStatus.LAUNCHED, TaskInstanceStatus.UNKNOWN_ERROR),
# Heartbeat never logged for a task instance in launched, either the distributor
# is failing to log heartbeats or the worker node failed to start up.
(TaskInstanceStatus.LAUNCHED, TaskInstanceStatus.NO_HEARTBEAT),
# task instance disappeared from distributor heartbeat and never logged
# running. The distributor discovered a resource error exit status.
# This seems unlikely but is valid for the purposes of the FSM
(TaskInstanceStatus.LAUNCHED, TaskInstanceStatus.RESOURCE_ERROR),
# task instance is submitted to the batch distributor waiting to launch.
# new workflow run is created and this task is told to kill
# itself
(TaskInstanceStatus.LAUNCHED, TaskInstanceStatus.KILL_SELF),
# allow task instance to transit to F to immediately fail the task if there is an env
# mismatch
(TaskInstanceStatus.LAUNCHED, TaskInstanceStatus.ERROR_FATAL),
# task instance triaging after transitioning from running
(TaskInstanceStatus.RUNNING, TaskInstanceStatus.TRIAGING),
# task instance hits an application error (happy path)
(TaskInstanceStatus.RUNNING, TaskInstanceStatus.ERROR),
# task instance stops logging heartbeats. reconciler can't find an exit
# status
(TaskInstanceStatus.RUNNING, TaskInstanceStatus.UNKNOWN_ERROR),
# 1) task instance stops logging heartbeats. reconciler discovers a
# resource error.
# 2) worker node detects a resource error
(TaskInstanceStatus.RUNNING, TaskInstanceStatus.RESOURCE_ERROR),
# task instance is running. another workflow run starts and tells it to
# die
(TaskInstanceStatus.RUNNING, TaskInstanceStatus.KILL_SELF),
# task instance finishes normally (happy path)
(TaskInstanceStatus.RUNNING, TaskInstanceStatus.DONE),
# task instance launched after transitioning from triaging
(TaskInstanceStatus.TRIAGING, TaskInstanceStatus.RUNNING),
# task instance resource_error after transitioning from triaging
(TaskInstanceStatus.TRIAGING, TaskInstanceStatus.RESOURCE_ERROR),
# task instance unknown_error after transitioning from triaging
(TaskInstanceStatus.TRIAGING, TaskInstanceStatus.UNKNOWN_ERROR),
# task instance error_fatal after transitioning from triaging
(TaskInstanceStatus.TRIAGING, TaskInstanceStatus.ERROR_FATAL),
# task instance error after transitioning from kill_self
(TaskInstanceStatus.KILL_SELF, TaskInstanceStatus.ERROR_FATAL),
# task is reset by workflow resume and worker finishes gracefully
(TaskInstanceStatus.KILL_SELF, TaskInstanceStatus.DONE),
# transition to a recoverable error after failing to log a heartbeat in launched
(TaskInstanceStatus.NO_HEARTBEAT, TaskInstanceStatus.ERROR),
]
[docs]
untimely_transitions = [
# task instance logs running before the distributor logs submitted due to
# race condition. this is unlikely but happens and is valid for the
# purposes of the FSM
(TaskInstanceStatus.RUNNING, TaskInstanceStatus.LAUNCHED),
# A worker node instance can error before the distributor moves it to launched.
(TaskInstanceStatus.ERROR, TaskInstanceStatus.LAUNCHED),
# task instance stops logging heartbeats and reconciler is looking for
# remote exit status but can't find it so logs an unknown error. task
# finishes with an application error. We can't update state because
# the task may already be running again due to a race with the JIF
(TaskInstanceStatus.ERROR, TaskInstanceStatus.UNKNOWN_ERROR),
# Both the worker node and the distributor try to move a task instance into Error
(TaskInstanceStatus.ERROR, TaskInstanceStatus.ERROR),
# task instance stops logging heartbeats and reconciler can't find exit
# status. Worker tries to finish gracefully but reconciler won the race
(TaskInstanceStatus.UNKNOWN_ERROR, TaskInstanceStatus.DONE),
# task instance stops logging heartbeats and reconciler can't find exit
# status. Worker tries to report an application error but cant' because
# the task could be running again alread and we don't want to update
# task state
(TaskInstanceStatus.UNKNOWN_ERROR, TaskInstanceStatus.ERROR),
# task instance stops logging heartbeats and reconciler can't find exit
# status. Worker tries to report a resource error but cant' because
# the task could be running again alread and we don't want to update
# task state
(TaskInstanceStatus.UNKNOWN_ERROR, TaskInstanceStatus.RESOURCE_ERROR),
# task instance stops logging heartbeats and reconciler can't find exit
# status. Worker reports a resource error before reconciler logs an
# unknown error.
(TaskInstanceStatus.RESOURCE_ERROR, TaskInstanceStatus.UNKNOWN_ERROR),
# task instance stops logging heartbeats and reconciler is looking for
# remote exit status but can't find it so logs an unknown error.
# The worker finishes gracefully before reconciler can log an unknown
# error
(TaskInstanceStatus.DONE, TaskInstanceStatus.UNKNOWN_ERROR),
# task is reset by workflow resume and worker hits an application error
(TaskInstanceStatus.KILL_SELF, TaskInstanceStatus.ERROR),
# task is reset by workflow resume and reconciler or worker node
# discovers resource error
(TaskInstanceStatus.KILL_SELF, TaskInstanceStatus.RESOURCE_ERROR),
# task is reset by workflow resume and reconciler discovers unknown error
(TaskInstanceStatus.KILL_SELF, TaskInstanceStatus.UNKNOWN_ERROR),
]
[docs]
error_states = [
TaskInstanceStatus.NO_DISTRIBUTOR_ID,
TaskInstanceStatus.ERROR,
TaskInstanceStatus.UNKNOWN_ERROR,
TaskInstanceStatus.RESOURCE_ERROR,
]
[docs]
def transition(self, new_state: str) -> None:
"""Transition the TaskInstance status."""
# if the transition is timely, move to new state. Otherwise do nothing
# bind_to_logger(
# workflow_run_id=self.workflow_run_id,
# task_id=self.task_id,
# task_instance_id=self.id,
# )
if self._is_timely_transition(new_state):
self._validate_transition(new_state)
logger.info(
f"Transitioning task_instance from {self.status} to {new_state}"
)
self.status = new_state
self.status_date = func.now()
if new_state == TaskInstanceStatus.QUEUED:
self.task.transition(TaskStatus.QUEUED)
if new_state == TaskInstanceStatus.INSTANTIATED:
self.task.transition(TaskStatus.INSTANTIATING)
if new_state == TaskInstanceStatus.LAUNCHED:
self.task.transition(TaskStatus.LAUNCHED)
if new_state == TaskInstanceStatus.RUNNING:
self.task.transition(TaskStatus.RUNNING)
elif new_state == TaskInstanceStatus.DONE:
self.task.transition(TaskStatus.DONE)
elif new_state in self.error_states:
self.task.transition_after_task_instance_error(new_state)
elif new_state == TaskInstanceStatus.ERROR_FATAL:
# if the task instance is F, the task status should be F too
self.task.transition(TaskStatus.ERROR_RECOVERABLE)
self.task.transition(TaskStatus.ERROR_FATAL)
[docs]
def _validate_transition(self, new_state: str) -> None:
"""Ensure the TaskInstance status transition is valid."""
if (self.status, new_state) not in self.__class__.valid_transitions:
raise InvalidStateTransition(
"TaskInstance", self.id, self.status, new_state
)
[docs]
def _is_timely_transition(self, new_state: str) -> bool:
"""Check if the transition is invalid due to a race condition."""
if (self.status, new_state) in self.__class__.untimely_transitions:
msg = str(
InvalidStateTransition("TaskInstance", self.id, self.status, new_state)
)
msg += (
". This is an untimely transition likely caused by a race "
" condition between the distributor_service and the worker_node."
)
logger.warning(msg)
return False
else:
return True