server.web.routes.v3.fsm.task_instance ====================================== .. py:module:: server.web.routes.v3.fsm.task_instance .. autoapi-nested-parse:: Routes for TaskInstances. Attributes ---------- .. autoapisummary:: server.web.routes.v3.fsm.task_instance.logger Functions --------- .. autoapisummary:: server.web.routes.v3.fsm.task_instance.get_new_task_status server.web.routes.v3.fsm.task_instance.validate_transition server.web.routes.v3.fsm.task_instance.transit_ti_and_t server.web.routes.v3.fsm.task_instance.log_running server.web.routes.v3.fsm.task_instance.log_ti_report_by server.web.routes.v3.fsm.task_instance.log_ti_report_by_batch server.web.routes.v3.fsm.task_instance.log_done server.web.routes.v3.fsm.task_instance.log_error_worker_node server.web.routes.v3.fsm.task_instance.get_task_instance_error_log server.web.routes.v3.fsm.task_instance.get_array_task_instance_id server.web.routes.v3.fsm.task_instance.log_no_distributor_id server.web.routes.v3.fsm.task_instance.log_distributor_id server.web.routes.v3.fsm.task_instance.log_known_error server.web.routes.v3.fsm.task_instance.log_unknown_error server.web.routes.v3.fsm.task_instance.instantiate_task_instances Module Contents --------------- .. py:data:: logger .. py:function:: get_new_task_status(task_instance: jobmon.server.web.models.task_instance.TaskInstance, new_state: str) -> tuple[str, str] Get the valid status to transit to. (TaskInstanceStatus, TaskStatus). .. py:function:: validate_transition(task_instance: jobmon.server.web.models.task_instance.TaskInstance, new_ti_status: str) -> dict | None Validate and return the status transition info for a TaskInstance. Checks if a TaskInstance can transition to the new status by validating: 1. The transition is timely (not a race condition) 2. The TaskInstance transition is in valid_transitions 3. The corresponding Task transition is in valid_transitions (unless Task is already terminal, in which case TI can go to ERROR_FATAL) :param task_instance: The TaskInstance to validate transition for :param new_ti_status: The desired new TaskInstance status :returns: A dict with transition info if valid, None if transition is not allowed .. py:function:: transit_ti_and_t(task_instance: jobmon.server.web.models.task_instance.TaskInstance, status_dict: dict, db: sqlalchemy.orm.Session, report_by_date: Optional[float] = None, log_message: Optional[str] = None, dialect: Optional[str] = None) -> None Transit the task_instance and task to the new status. Update task_instance and task in a single transation or to avoid inconsistent state. If unable to obtain logs for both, update none. :param task_instance: The TaskInstance to update :param status_dict: Dictionary with status transition info :param db: Database session :param report_by_date: Optional seconds to add for report_by_date :param log_message: Optional message to log on success :param dialect: Database dialect (mysql, sqlite) - required if report_by_date is set .. py:function:: log_running(task_instance_id: int, request: fastapi.Request, db: sqlalchemy.orm.Session = Depends(get_db)) -> Any :async: Log a task_instance as running. .. py:function:: log_ti_report_by(task_instance_id: int, request: fastapi.Request, db: sqlalchemy.orm.Session = Depends(get_db)) -> Any :async: Log a task_instance as being responsive with a new report_by_date. This is done at the worker node heartbeat_interval rate, so it may not happen at the same rate that the reconciler updates batch submitted report_by_dates (also because it causes a lot of traffic if all workers are logging report by_dates often compared to if the reconciler runs often). :param task_instance_id: id of the task_instance to log :param request: fastapi request object :param db: The database session. .. py:function:: log_ti_report_by_batch(request: fastapi.Request, db: sqlalchemy.orm.Session = Depends(get_db)) -> Any :async: Log task_instances as being responsive with a new report_by_date. This is done at the worker node heartbeat_interval rate, so it may not happen at the same rate that the reconciler updates batch submitted report_by_dates (also because it causes a lot of traffic if all workers are logging report by_dates often compared to if the reconciler runs often). :param task_instance_id: id of the task_instance to log :param request: fastapi request object :param db: The database session. .. py:function:: log_done(task_instance_id: int, request: fastapi.Request, db: sqlalchemy.orm.Session = Depends(get_db)) -> Any :async: Log a task_instance as done. .. py:function:: log_error_worker_node(task_instance_id: int, request: fastapi.Request, db: sqlalchemy.orm.Session = Depends(get_db)) -> Any :async: Log an error for a task instance. .. py:function:: get_task_instance_error_log(task_instance_id: int, db: sqlalchemy.orm.Session = Depends(get_db)) -> Any :async: Route to return all task_instance_error_log entries of the task_instance_id. :param task_instance_id: ID of the task instance :type task_instance_id: int :param db: The database session. :returns: jsonified task_instance_error_log result set .. py:function:: get_array_task_instance_id(array_id: int, batch_num: int, step_id: int, db: sqlalchemy.orm.Session = Depends(get_db)) -> Any Given an array ID and an index, select a single task instance ID. Task instance IDs that are associated with the array are ordered, and selected by index. This route will be called once per array task instance worker node, so must be scalable. .. py:function:: log_no_distributor_id(task_instance_id: int, request: fastapi.Request, db: sqlalchemy.orm.Session = Depends(get_db)) -> Any :async: Log a task_instance_id that did not get an distributor_id upon submission. .. py:function:: log_distributor_id(task_instance_id: int, request: fastapi.Request, db: sqlalchemy.orm.Session = Depends(get_db)) -> Any :async: Log a task_instance's distributor id. .. py:function:: log_known_error(task_instance_id: int, request: fastapi.Request, db: sqlalchemy.orm.Session = Depends(get_db)) -> Any :async: Log a task_instance as errored. :param task_instance_id: id for task instance. :type task_instance_id: int :param request: fastapi request object. :type request: Request :param db: The database session. .. py:function:: log_unknown_error(task_instance_id: int, request: fastapi.Request, db: sqlalchemy.orm.Session = Depends(get_db)) -> Any :async: Log a task_instance as errored. :param task_instance_id: id for task instance :type task_instance_id: int :param request: fastapi request object :type request: Request :param db: The database session. .. py:function:: instantiate_task_instances(request: fastapi.Request, db: sqlalchemy.orm.Session = Depends(get_db)) -> Any :async: Sync status of given task intance IDs.