server.web.services.transition_service
Unified transition service with TI-centric architecture and audit logging.
Attributes
Classes
Unified transition service with TI-centric model. |
Module Contents
- server.web.services.transition_service.logger
- class server.web.services.transition_service.TransitionService
Unified transition service with TI-centric model.
TaskInstance transitions drive Task transitions
Task FSM acts as the gate
All transitions audited to task_status_audit
Handles both single-task and bulk transitions with: - Centralized FSM validation via TaskFSM - Built-in retry logic with exponential backoff - Row locking with NOWAIT (single) or SKIP LOCKED (bulk) - Automatic audit logging to task_status_audit table
IMPORTANT - Transaction Management: - This service has internal retries that call session.rollback() on lock errors - Callers should NOT have uncommitted work they want to keep before calling - Call this service at the START of a transaction, or in its own transaction - The service does NOT call commit() - caller must commit on success
- DEFAULT_MAX_RETRIES = 5
- DEFAULT_BASE_DELAY_MS = 50
- LOCK_ERROR_MESSAGES = ['could not obtain lock', 'nowait', 'lock wait timeout', 'deadlock', 'database is locked']
- classmethod create_audit_record_with_immediate_exit(session: sqlalchemy.orm.Session, task_id: int, workflow_id: int, previous_status: str, new_status: str) None
Create audit record with both entered_at and exited_at set (for transient states).
Used for ERROR_RECOVERABLE status which is a transient state - the task enters and exits it in the same transition.
- Parameters:
session – Database session
task_id – The task ID being transitioned
workflow_id – The workflow ID the task belongs to
previous_status – The status the task is transitioning from
new_status – The transient status (e.g., ERROR_RECOVERABLE)
- classmethod create_audit_record(session: sqlalchemy.orm.Session, task_id: int, workflow_id: int, previous_status: str, new_status: str) None
Create audit record with proper exited_at handling.
Closes any open audit record for this task (sets exited_at = func.now())
Creates new audit record (entered_at uses model default = func.now())
Uses database func.now() for timestamps to ensure server-time consistency.
- Parameters:
session – Database session
task_id – The task ID being transitioned
workflow_id – The workflow ID the task belongs to
previous_status – The status the task is transitioning from
new_status – The status the task is transitioning to
- classmethod create_audit_records_bulk(session: sqlalchemy.orm.Session, records: List[Dict]) None
Bulk create audit records with proper exited_at handling.
Closes any open audit rows for the supplied task_ids and inserts a new open row per record. Runs as a three-phase sequence to avoid the next-key gap-lock cycle the original
UPDATE … WHERE exited_at IS NULLwould trigger on the(task_id, exited_at)secondary index when two transactions with interleaved task_ids ran the close-then-INSERT pair concurrently (InnoDB errno 1213).The
exited_atcolumn stays nullable. NULL continues to mean “no observed exit yet” — currently open OR orphaned by workflow death — and downstream consumers (MAX(exited_at)ignoring opens,COALESCE(exited_at, NOW())duration math, the GUI’sactive = !exitedAt) keep working unchanged.- Parameters:
session – Database session
records – List of dicts with task_id, workflow_id, previous_status, new_status
- classmethod transition_task_instance(session: sqlalchemy.orm.Session, task_instance_id: int, task_id: int, current_ti_status: str, new_ti_status: str, task_num_attempts: int, task_max_attempts: int, report_by_date: Any | None = None, max_retries: int = DEFAULT_MAX_RETRIES) Dict[str, Any]
Worker-triggered: TI transition cascades to Task.
Validate TI transition (untimely, valid_transitions)
Lock TI (NOWAIT) -> Lock Task (NOWAIT)
Update TI status
Compute implied Task status (via TaskFSM.get_task_status_for_ti)
Handle orphaned TI (Task terminal → TI goes to ERROR_FATAL)
Validate Task FSM gate
Update Task -> Audit Task
Preserves TI-first locking order for atomicity. Has internal retries with rollback.
- Parameters:
session – Database session (caller must commit on success)
task_instance_id – TaskInstance ID to transition
task_id – Task ID associated with the TaskInstance
current_ti_status – Current TaskInstance status (for validation)
new_ti_status – New TaskInstance status
task_num_attempts – Current task num_attempts
task_max_attempts – Task max_attempts
report_by_date – Optional report_by_date value (already computed)
max_retries – Number of retry attempts on lock contention
- Returns:
- {
“ti_updated”: bool, “task_transitioned”: bool, “task_status”: str or None, “error”: str or None, # “untimely_transition”, “invalid_ti_transition”, None “orphaned”: bool, # True if Task was terminal
}
- classmethod gate_tasks_for_queueing(session: sqlalchemy.orm.Session, task_ids: List[int], max_retries: int = DEFAULT_MAX_RETRIES) Dict[str, List[int]]
Distributor-triggered: Check Task gate for TI creation.
Lock Tasks with SKIP LOCKED
Validate FSM gate (REGISTERING/ADJUSTING_RESOURCES -> QUEUED)
Update Tasks, increment num_attempts
Audit Task status changes
Returns: {“gated”: […], “invalid”: […], “locked”: […]} Caller creates TaskInstances for “gated” tasks.
- Parameters:
session – Database session (caller must commit on success)
task_ids – Task IDs to gate for queueing
max_retries – Number of retry attempts on lock contention
- Returns:
- {
“gated”: [ids that passed gate and transitioned to QUEUED], “invalid”: [ids that failed FSM validation], “locked”: [ids that were locked by another transaction], “not_found”: [ids that don’t exist]
}
- classmethod transition_tasks(session: sqlalchemy.orm.Session, task_ids: List[int], to_status: str, increment_attempts: bool = False, max_retries: int = DEFAULT_MAX_RETRIES, use_skip_locked: bool = False) Dict[str, List[int]]
Unified transition function for single or bulk operations.
- Parameters:
session – Database session (caller must commit on success)
task_ids – Task IDs to transition (1 or many)
to_status – Target status (from TaskStatus constants)
increment_attempts – If True, increment num_attempts (for QUEUED)
max_retries – Number of retry attempts on lock contention
use_skip_locked – If True, skip locked rows instead of failing Use for bulk ops; requires MySQL 8.0+
- Returns:
- {
“transitioned”: [ids that were updated], “invalid”: [ids that failed FSM validation], “locked”: [ids that were locked by another transaction], “not_found”: [ids that don’t exist]
}
Note
Has internal retries with rollback on lock errors
Does NOT commit - caller must commit on success
Call at START of transaction (rollback will clear any prior work)