"""Task Instance object from the distributor's perspective."""
from __future__ import annotations
from typing import TYPE_CHECKING, List, Set, Tuple
import structlog
from jobmon.core.constants import TaskInstanceStatus
from jobmon.core.requester import Requester
if TYPE_CHECKING:
from jobmon.distributor.task_instance_batch import TaskInstanceBatch
[docs]
logger = structlog.get_logger(__name__)
[docs]
class DistributorTaskInstance:
"""Object used for communicating with JSM from the distributor node."""
def __init__(
self,
task_instance_id: int,
workflow_run_id: int,
status: str,
requester: Requester,
) -> None:
"""Initialization of distributor task instance.
Args:
task_instance_id (int): a task_instance_id
workflow_run_id (int): a workflow_run_id
status(str): status of the distributor task instance
requester (Requester, optional): a requester to communicate with
the JSM. default is shared requester
"""
[docs]
self.task_instance_id = task_instance_id
[docs]
self.workflow_run_id = workflow_run_id
[docs]
self.requester = requester
@property
[docs]
def submission_name(self) -> str:
try:
return self.batch.submission_name
except AttributeError:
return str(self.task_instance_id)
@property
[docs]
def batch(self) -> TaskInstanceBatch:
"""Returns the batch the DistributorTaskInstance is in."""
return self._batch
@batch.setter
def batch(self, val: TaskInstanceBatch) -> None:
"""Sets the batch of the DistributorTaskInstance."""
self._batch = val
@property
[docs]
def array_step_id(self) -> int:
"""Returns the array step of the TI."""
return self._array_step_id
@array_step_id.setter
def array_step_id(self, val: int) -> None:
self._array_step_id = val
@property
[docs]
def distributor_id(self) -> str:
"""Returns the distributor id of the TI."""
return self._distributor_id
@distributor_id.setter
def distributor_id(self, val: str) -> None:
self._distributor_id = val
[docs]
def transition_to_launched(
self, distributor_id: str, next_report_increment: float
) -> None:
"""Register the submission of a new task instance to a cluster.
This method is never called by the happy path - only if array submission is not
implemented on a particular cluster type.
"""
self.distributor_id = distributor_id
app_route = f"/task_instance/{self.task_instance_id}/log_distributor_id"
self.requester.send_request(
app_route=app_route,
message={
"distributor_id": str(distributor_id),
"next_report_increment": next_report_increment,
},
request_type="post",
)
self.status = TaskInstanceStatus.LAUNCHED
[docs]
def transition_to_no_distributor_id(
self,
no_id_err_msg: str,
) -> None:
"""Register that submission failed with the central service.
Args:
no_id_err_msg: The error msg from the executor when failed to obtain distributor
id.
"""
logger.info(
"Distributor failed to get distributor ID for task instance",
task_instance_id=self.task_instance_id,
error_msg=no_id_err_msg[:200], # Truncate for readability
)
app_route = f"/task_instance/{self.task_instance_id}/log_no_distributor_id"
self.requester.send_request(
app_route=app_route,
message={"no_id_err_msg": no_id_err_msg},
request_type="post",
)
logger.info(
"Task instance transitioned INSTANTIATED → NO_DISTRIBUTOR_ID",
task_instance_id=self.task_instance_id,
)
[docs]
def _transition_to_error(self, error_message: str, error_state: str) -> None:
"""Transitions the TaskInstance to the specified error state."""
if self.distributor_id is None:
raise ValueError("distributor_id cannot be None during log_error")
distributor_id = self.distributor_id
if not error_state:
raise ValueError("cannot log error if error_state isn't set")
# Only log error details not captured by main service
error_type = (
"unknown" if error_state == TaskInstanceStatus.UNKNOWN_ERROR else "known"
)
logger.info(
f"Task transitioned to {error_type} error",
task_instance_id=self.task_instance_id,
error_state=error_state,
distributor_id=distributor_id,
)
if error_state == TaskInstanceStatus.UNKNOWN_ERROR:
app_route = f"/task_instance/{self.task_instance_id}/log_unknown_error"
else:
app_route = f"/task_instance/{self.task_instance_id}/log_known_error"
self.requester.send_request(
app_route=app_route,
message={
"error_state": error_state,
"error_message": error_message,
"distributor_id": distributor_id,
},
request_type="post",
)
self.error_state = error_state
[docs]
def transition_to_unknown_error(
self, error_message: str, error_state: str
) -> Tuple[Set[DistributorTaskInstance], List]:
"""Register that an unknown error was discovered during reconciliation."""
self._transition_to_error(error_message, error_state)
return {self}, []
[docs]
def transition_to_resource_error(
self, error_message: str, error_state: str
) -> Tuple[Set[DistributorTaskInstance], List]:
"""Register that a resource error was discovered during reconciliation."""
self._transition_to_error(error_message, error_state)
return {self}, []
[docs]
def transition_to_error(
self, error_message: str, error_state: str
) -> Tuple[Set[DistributorTaskInstance], List]:
"""Register that a known error occurred during reconciliation."""
self._transition_to_error(error_message, error_state)
return {self}, []
[docs]
def __hash__(self) -> int:
"""Returns the id of the TaskInstance."""
return self.task_instance_id
[docs]
def __eq__(self, other: object) -> bool:
"""Check if the hashes of two tasks are equivalent."""
if not isinstance(other, DistributorTaskInstance):
return False
else:
return hash(self) == hash(other)
[docs]
def __lt__(self, other: DistributorTaskInstance) -> bool:
"""Check if one hash is less than the has of another Task."""
return hash(self) < hash(other)
[docs]
def __repr__(self) -> str:
"""Return a short representation string."""
return (
f"DistributorTaskInstance(task_instance_id={self.task_instance_id},"
f"status={self.status})"
)