server.web.routes.v3.fsm.task_instance

Routes for TaskInstances.

Attributes

logger

Functions

get_new_task_status(→ tuple[str, str])

Get the valid status to transit to. (TaskInstanceStatus, TaskStatus).

validate_transition(→ dict | None)

Validate and return the status transition info for a TaskInstance.

transit_ti_and_t(→ None)

Transit the task_instance and task to the new status.

log_running() → Any)

Log a task_instance as running.

log_ti_report_by() → Any)

Log a task_instance as being responsive with a new report_by_date.

log_ti_report_by_batch() → Any)

Log task_instances as being responsive with a new report_by_date.

log_done() → Any)

Log a task_instance as done.

log_error_worker_node() → Any)

Log an error for a task instance.

get_task_instance_error_log() → Any)

Route to return all task_instance_error_log entries of the task_instance_id.

get_array_task_instance_id() → Any)

Given an array ID and an index, select a single task instance ID.

log_no_distributor_id() → Any)

Log a task_instance_id that did not get an distributor_id upon submission.

log_distributor_id() → Any)

Log a task_instance's distributor id.

log_known_error() → Any)

Log a task_instance as errored.

log_unknown_error() → Any)

Log a task_instance as errored.

instantiate_task_instances() → Any)

Sync status of given task intance IDs.

Module Contents

server.web.routes.v3.fsm.task_instance.logger
server.web.routes.v3.fsm.task_instance.get_new_task_status(task_instance: jobmon.server.web.models.task_instance.TaskInstance, new_state: str) tuple[str, str]

Get the valid status to transit to. (TaskInstanceStatus, TaskStatus).

server.web.routes.v3.fsm.task_instance.validate_transition(task_instance: jobmon.server.web.models.task_instance.TaskInstance, new_ti_status: str) dict | None

Validate and return the status transition info for a TaskInstance.

Checks if a TaskInstance can transition to the new status by validating: 1. The transition is timely (not a race condition) 2. The TaskInstance transition is in valid_transitions 3. The corresponding Task transition is in valid_transitions

(unless Task is already terminal, in which case TI can go to ERROR_FATAL)

Parameters:
  • task_instance – The TaskInstance to validate transition for

  • new_ti_status – The desired new TaskInstance status

Returns:

A dict with transition info if valid, None if transition is not allowed

server.web.routes.v3.fsm.task_instance.transit_ti_and_t(task_instance: jobmon.server.web.models.task_instance.TaskInstance, status_dict: dict, db: sqlalchemy.orm.Session, report_by_date: float | None = None, log_message: str | None = None, dialect: str | None = None) None

Transit the task_instance and task to the new status.

Update task_instance and task in a single transation or to avoid inconsistent state. If unable to obtain logs for both, update none.

Parameters:
  • task_instance – The TaskInstance to update

  • status_dict – Dictionary with status transition info

  • db – Database session

  • report_by_date – Optional seconds to add for report_by_date

  • log_message – Optional message to log on success

  • dialect – Database dialect (mysql, sqlite) - required if report_by_date is set

async server.web.routes.v3.fsm.task_instance.log_running(task_instance_id: int, request: fastapi.Request, db: sqlalchemy.orm.Session = Depends(get_db)) Any

Log a task_instance as running.

async server.web.routes.v3.fsm.task_instance.log_ti_report_by(task_instance_id: int, request: fastapi.Request, db: sqlalchemy.orm.Session = Depends(get_db)) Any

Log a task_instance as being responsive with a new report_by_date.

This is done at the worker node heartbeat_interval rate, so it may not happen at the same rate that the reconciler updates batch submitted report_by_dates (also because it causes a lot of traffic if all workers are logging report by_dates often compared to if the reconciler runs often).

Parameters:
  • task_instance_id – id of the task_instance to log

  • request – fastapi request object

  • db – The database session.

async server.web.routes.v3.fsm.task_instance.log_ti_report_by_batch(request: fastapi.Request, db: sqlalchemy.orm.Session = Depends(get_db)) Any

Log task_instances as being responsive with a new report_by_date.

This is done at the worker node heartbeat_interval rate, so it may not happen at the same rate that the reconciler updates batch submitted report_by_dates (also because it causes a lot of traffic if all workers are logging report by_dates often compared to if the reconciler runs often).

Parameters:
  • task_instance_id – id of the task_instance to log

  • request – fastapi request object

  • db – The database session.

async server.web.routes.v3.fsm.task_instance.log_done(task_instance_id: int, request: fastapi.Request, db: sqlalchemy.orm.Session = Depends(get_db)) Any

Log a task_instance as done.

async server.web.routes.v3.fsm.task_instance.log_error_worker_node(task_instance_id: int, request: fastapi.Request, db: sqlalchemy.orm.Session = Depends(get_db)) Any

Log an error for a task instance.

async server.web.routes.v3.fsm.task_instance.get_task_instance_error_log(task_instance_id: int, db: sqlalchemy.orm.Session = Depends(get_db)) Any

Route to return all task_instance_error_log entries of the task_instance_id.

Parameters:
  • task_instance_id (int) – ID of the task instance

  • db – The database session.

Returns:

jsonified task_instance_error_log result set

server.web.routes.v3.fsm.task_instance.get_array_task_instance_id(array_id: int, batch_num: int, step_id: int, db: sqlalchemy.orm.Session = Depends(get_db)) Any

Given an array ID and an index, select a single task instance ID.

Task instance IDs that are associated with the array are ordered, and selected by index. This route will be called once per array task instance worker node, so must be scalable.

async server.web.routes.v3.fsm.task_instance.log_no_distributor_id(task_instance_id: int, request: fastapi.Request, db: sqlalchemy.orm.Session = Depends(get_db)) Any

Log a task_instance_id that did not get an distributor_id upon submission.

async server.web.routes.v3.fsm.task_instance.log_distributor_id(task_instance_id: int, request: fastapi.Request, db: sqlalchemy.orm.Session = Depends(get_db)) Any

Log a task_instance’s distributor id.

async server.web.routes.v3.fsm.task_instance.log_known_error(task_instance_id: int, request: fastapi.Request, db: sqlalchemy.orm.Session = Depends(get_db)) Any

Log a task_instance as errored.

Parameters:
  • task_instance_id (int) – id for task instance.

  • request (Request) – fastapi request object.

  • db – The database session.

async server.web.routes.v3.fsm.task_instance.log_unknown_error(task_instance_id: int, request: fastapi.Request, db: sqlalchemy.orm.Session = Depends(get_db)) Any

Log a task_instance as errored.

Parameters:
  • task_instance_id (int) – id for task instance

  • request (Request) – fastapi request object

  • db – The database session.

async server.web.routes.v3.fsm.task_instance.instantiate_task_instances(request: fastapi.Request, db: sqlalchemy.orm.Session = Depends(get_db)) Any

Sync status of given task intance IDs.