Source code for server.web.routes.v1.fsm.task_instance

"""Routes for TaskInstances."""

from http import HTTPStatus as StatusCodes
from typing import Any, cast, Dict

from flask import jsonify, request
from sqlalchemy import select
import structlog

from jobmon.core import constants
from jobmon.core.exceptions import InvalidStateTransition
from jobmon.server.web._compat import add_time
from jobmon.server.web.models.task_instance import TaskInstance
from jobmon.server.web.routes import SessionLocal
from jobmon.server.web.routes.v1 import api_v1_blueprint

[docs] logger = structlog.get_logger(__name__)
@api_v1_blueprint.route( "/task_instance/<task_instance_id>/log_running", methods=["POST"] )
[docs] def log_running(task_instance_id: int) -> Any: """Log a task_instance as running. Args: task_instance_id: id of the task_instance to log as running """ structlog.contextvars.bind_contextvars(task_instance_id=task_instance_id) data = cast(Dict, request.get_json()) session = SessionLocal() with session.begin(): select_stmt = select(TaskInstance).where(TaskInstance.id == task_instance_id) task_instance = session.execute(select_stmt).scalars().one() if data.get("distributor_id", None) is not None: task_instance.distributor_id = data["distributor_id"] if data.get("nodename", None) is not None: task_instance.nodename = data["nodename"] task_instance.process_group_id = data["process_group_id"] try: task_instance.transition(constants.TaskInstanceStatus.RUNNING) task_instance.report_by_date = add_time(data["next_report_increment"]) except InvalidStateTransition as e: if task_instance.status == constants.TaskInstanceStatus.RUNNING: logger.warning(e) elif task_instance.status == constants.TaskInstanceStatus.KILL_SELF: task_instance.transition(constants.TaskInstanceStatus.ERROR_FATAL) else: # Tried to move to an illegal state logger.error(e) wire_format = task_instance.to_wire_as_worker_node_task_instance() resp = jsonify(task_instance=wire_format) resp.status_code = StatusCodes.OK return resp