server.web.services.transition_service

Unified transition service with TI-centric architecture and audit logging.

Attributes

logger

Classes

TransitionService

Unified transition service with TI-centric model.

Module Contents

server.web.services.transition_service.logger
class server.web.services.transition_service.TransitionService

Unified transition service with TI-centric model.

  • TaskInstance transitions drive Task transitions

  • Task FSM acts as the gate

  • All transitions audited to task_status_audit

Handles both single-task and bulk transitions with: - Centralized FSM validation via TaskFSM - Built-in retry logic with exponential backoff - Row locking with NOWAIT (single) or SKIP LOCKED (bulk) - Automatic audit logging to task_status_audit table

IMPORTANT - Transaction Management: - This service has internal retries that call session.rollback() on lock errors - Callers should NOT have uncommitted work they want to keep before calling - Call this service at the START of a transaction, or in its own transaction - The service does NOT call commit() - caller must commit on success

DEFAULT_MAX_RETRIES = 5
DEFAULT_BASE_DELAY_MS = 50
TI_VALID_TRANSITIONS: Set[Tuple[str, str]]
TI_UNTIMELY_TRANSITIONS: Set[Tuple[str, str]]
TI_ERROR_STATES: Set[str]
TASK_TERMINAL_STATES: Set[str]
LOCK_ERROR_MESSAGES = ['could not obtain lock', 'nowait', 'lock wait timeout', 'deadlock', 'database is locked']
classmethod create_audit_record_with_immediate_exit(session: sqlalchemy.orm.Session, task_id: int, workflow_id: int, previous_status: str, new_status: str) None

Create audit record with both entered_at and exited_at set (for transient states).

Used for ERROR_RECOVERABLE status which is a transient state - the task enters and exits it in the same transition.

Parameters:
  • session – Database session

  • task_id – The task ID being transitioned

  • workflow_id – The workflow ID the task belongs to

  • previous_status – The status the task is transitioning from

  • new_status – The transient status (e.g., ERROR_RECOVERABLE)

classmethod create_audit_record(session: sqlalchemy.orm.Session, task_id: int, workflow_id: int, previous_status: str, new_status: str) None

Create audit record with proper exited_at handling.

  1. Closes any open audit record for this task (sets exited_at = func.now())

  2. Creates new audit record (entered_at uses model default = func.now())

Uses database func.now() for timestamps to ensure server-time consistency.

Parameters:
  • session – Database session

  • task_id – The task ID being transitioned

  • workflow_id – The workflow ID the task belongs to

  • previous_status – The status the task is transitioning from

  • new_status – The status the task is transitioning to

classmethod create_audit_records_bulk(session: sqlalchemy.orm.Session, records: List[Dict]) None

Bulk create audit records with proper exited_at handling.

Closes any open audit rows for the supplied task_ids and inserts a new open row per record. Runs as a three-phase sequence to avoid the next-key gap-lock cycle the original UPDATE WHERE exited_at IS NULL would trigger on the (task_id, exited_at) secondary index when two transactions with interleaved task_ids ran the close-then-INSERT pair concurrently (InnoDB errno 1213).

The exited_at column stays nullable. NULL continues to mean “no observed exit yet” — currently open OR orphaned by workflow death — and downstream consumers (MAX(exited_at) ignoring opens, COALESCE(exited_at, NOW()) duration math, the GUI’s active = !exitedAt) keep working unchanged.

Parameters:
  • session – Database session

  • records – List of dicts with task_id, workflow_id, previous_status, new_status

classmethod transition_task_instance(session: sqlalchemy.orm.Session, task_instance_id: int, task_id: int, current_ti_status: str, new_ti_status: str, task_num_attempts: int, task_max_attempts: int, report_by_date: Any | None = None, max_retries: int = DEFAULT_MAX_RETRIES) Dict[str, Any]

Worker-triggered: TI transition cascades to Task.

  1. Validate TI transition (untimely, valid_transitions)

  2. Lock TI (NOWAIT) -> Lock Task (NOWAIT)

  3. Update TI status

  4. Compute implied Task status (via TaskFSM.get_task_status_for_ti)

  5. Handle orphaned TI (Task terminal → TI goes to ERROR_FATAL)

  6. Validate Task FSM gate

  7. Update Task -> Audit Task

Preserves TI-first locking order for atomicity. Has internal retries with rollback.

Parameters:
  • session – Database session (caller must commit on success)

  • task_instance_id – TaskInstance ID to transition

  • task_id – Task ID associated with the TaskInstance

  • current_ti_status – Current TaskInstance status (for validation)

  • new_ti_status – New TaskInstance status

  • task_num_attempts – Current task num_attempts

  • task_max_attempts – Task max_attempts

  • report_by_date – Optional report_by_date value (already computed)

  • max_retries – Number of retry attempts on lock contention

Returns:

{

“ti_updated”: bool, “task_transitioned”: bool, “task_status”: str or None, “error”: str or None, # “untimely_transition”, “invalid_ti_transition”, None “orphaned”: bool, # True if Task was terminal

}

classmethod gate_tasks_for_queueing(session: sqlalchemy.orm.Session, task_ids: List[int], max_retries: int = DEFAULT_MAX_RETRIES) Dict[str, List[int]]

Distributor-triggered: Check Task gate for TI creation.

  1. Lock Tasks with SKIP LOCKED

  2. Validate FSM gate (REGISTERING/ADJUSTING_RESOURCES -> QUEUED)

  3. Update Tasks, increment num_attempts

  4. Audit Task status changes

Returns: {“gated”: […], “invalid”: […], “locked”: […]} Caller creates TaskInstances for “gated” tasks.

Parameters:
  • session – Database session (caller must commit on success)

  • task_ids – Task IDs to gate for queueing

  • max_retries – Number of retry attempts on lock contention

Returns:

{

“gated”: [ids that passed gate and transitioned to QUEUED], “invalid”: [ids that failed FSM validation], “locked”: [ids that were locked by another transaction], “not_found”: [ids that don’t exist]

}

classmethod transition_tasks(session: sqlalchemy.orm.Session, task_ids: List[int], to_status: str, increment_attempts: bool = False, max_retries: int = DEFAULT_MAX_RETRIES, use_skip_locked: bool = False) Dict[str, List[int]]

Unified transition function for single or bulk operations.

Parameters:
  • session – Database session (caller must commit on success)

  • task_ids – Task IDs to transition (1 or many)

  • to_status – Target status (from TaskStatus constants)

  • increment_attempts – If True, increment num_attempts (for QUEUED)

  • max_retries – Number of retry attempts on lock contention

  • use_skip_locked – If True, skip locked rows instead of failing Use for bulk ops; requires MySQL 8.0+

Returns:

{

“transitioned”: [ids that were updated], “invalid”: [ids that failed FSM validation], “locked”: [ids that were locked by another transaction], “not_found”: [ids that don’t exist]

}

Note

  • Has internal retries with rollback on lock errors

  • Does NOT commit - caller must commit on success

  • Call at START of transaction (rollback will clear any prior work)