server.web.services.transition_service ====================================== .. py:module:: server.web.services.transition_service .. autoapi-nested-parse:: Unified transition service with TI-centric architecture and audit logging. Attributes ---------- .. autoapisummary:: server.web.services.transition_service.logger Classes ------- .. autoapisummary:: server.web.services.transition_service.TransitionService Module Contents --------------- .. py:data:: logger .. py:class:: TransitionService Unified transition service with TI-centric model. - TaskInstance transitions drive Task transitions - Task FSM acts as the gate - All transitions audited to task_status_audit Handles both single-task and bulk transitions with: - Centralized FSM validation via TaskFSM - Built-in retry logic with exponential backoff - Row locking with NOWAIT (single) or SKIP LOCKED (bulk) - Automatic audit logging to task_status_audit table IMPORTANT - Transaction Management: - This service has internal retries that call session.rollback() on lock errors - Callers should NOT have uncommitted work they want to keep before calling - Call this service at the START of a transaction, or in its own transaction - The service does NOT call commit() - caller must commit on success .. py:attribute:: DEFAULT_MAX_RETRIES :value: 5 .. py:attribute:: DEFAULT_BASE_DELAY_MS :value: 50 .. py:attribute:: TI_VALID_TRANSITIONS :type: Set[Tuple[str, str]] .. py:attribute:: TI_UNTIMELY_TRANSITIONS :type: Set[Tuple[str, str]] .. py:attribute:: TI_ERROR_STATES :type: Set[str] .. py:attribute:: TASK_TERMINAL_STATES :type: Set[str] .. py:attribute:: LOCK_ERROR_MESSAGES :value: ['could not obtain lock', 'nowait', 'lock wait timeout', 'deadlock', 'database is locked'] .. py:method:: create_audit_record_with_immediate_exit(session: sqlalchemy.orm.Session, task_id: int, workflow_id: int, previous_status: str, new_status: str) -> None :classmethod: Create audit record with both entered_at and exited_at set (for transient states). Used for ERROR_RECOVERABLE status which is a transient state - the task enters and exits it in the same transition. :param session: Database session :param task_id: The task ID being transitioned :param workflow_id: The workflow ID the task belongs to :param previous_status: The status the task is transitioning from :param new_status: The transient status (e.g., ERROR_RECOVERABLE) .. py:method:: create_audit_record(session: sqlalchemy.orm.Session, task_id: int, workflow_id: int, previous_status: str, new_status: str) -> None :classmethod: Create audit record with proper exited_at handling. 1. Closes any open audit record for this task (sets exited_at = func.now()) 2. Creates new audit record (entered_at uses model default = func.now()) Uses database func.now() for timestamps to ensure server-time consistency. :param session: Database session :param task_id: The task ID being transitioned :param workflow_id: The workflow ID the task belongs to :param previous_status: The status the task is transitioning from :param new_status: The status the task is transitioning to .. py:method:: create_audit_records_bulk(session: sqlalchemy.orm.Session, records: List[Dict]) -> None :classmethod: Bulk create audit records with proper exited_at handling. Closes any open audit rows for the supplied task_ids and inserts a new open row per record. Runs as a three-phase sequence to avoid the next-key gap-lock cycle the original ``UPDATE … WHERE exited_at IS NULL`` would trigger on the ``(task_id, exited_at)`` secondary index when two transactions with interleaved task_ids ran the close-then-INSERT pair concurrently (InnoDB errno 1213). The ``exited_at`` column stays nullable. NULL continues to mean "no observed exit yet" — currently open OR orphaned by workflow death — and downstream consumers (``MAX(exited_at)`` ignoring opens, ``COALESCE(exited_at, NOW())`` duration math, the GUI's ``active = !exitedAt``) keep working unchanged. :param session: Database session :param records: List of dicts with task_id, workflow_id, previous_status, new_status .. py:method:: transition_task_instance(session: sqlalchemy.orm.Session, task_instance_id: int, task_id: int, current_ti_status: str, new_ti_status: str, task_num_attempts: int, task_max_attempts: int, report_by_date: Optional[Any] = None, max_retries: int = DEFAULT_MAX_RETRIES) -> Dict[str, Any] :classmethod: Worker-triggered: TI transition cascades to Task. 1. Validate TI transition (untimely, valid_transitions) 2. Lock TI (NOWAIT) -> Lock Task (NOWAIT) 3. Update TI status 4. Compute implied Task status (via TaskFSM.get_task_status_for_ti) 5. Handle orphaned TI (Task terminal → TI goes to ERROR_FATAL) 6. Validate Task FSM gate 7. Update Task -> Audit Task Preserves TI-first locking order for atomicity. Has internal retries with rollback. :param session: Database session (caller must commit on success) :param task_instance_id: TaskInstance ID to transition :param task_id: Task ID associated with the TaskInstance :param current_ti_status: Current TaskInstance status (for validation) :param new_ti_status: New TaskInstance status :param task_num_attempts: Current task num_attempts :param task_max_attempts: Task max_attempts :param report_by_date: Optional report_by_date value (already computed) :param max_retries: Number of retry attempts on lock contention :returns: { "ti_updated": bool, "task_transitioned": bool, "task_status": str or None, "error": str or None, # "untimely_transition", "invalid_ti_transition", None "orphaned": bool, # True if Task was terminal } .. py:method:: gate_tasks_for_queueing(session: sqlalchemy.orm.Session, task_ids: List[int], max_retries: int = DEFAULT_MAX_RETRIES) -> Dict[str, List[int]] :classmethod: Distributor-triggered: Check Task gate for TI creation. 1. Lock Tasks with SKIP LOCKED 2. Validate FSM gate (REGISTERING/ADJUSTING_RESOURCES -> QUEUED) 3. Update Tasks, increment num_attempts 4. Audit Task status changes Returns: {"gated": [...], "invalid": [...], "locked": [...]} Caller creates TaskInstances for "gated" tasks. :param session: Database session (caller must commit on success) :param task_ids: Task IDs to gate for queueing :param max_retries: Number of retry attempts on lock contention :returns: { "gated": [ids that passed gate and transitioned to QUEUED], "invalid": [ids that failed FSM validation], "locked": [ids that were locked by another transaction], "not_found": [ids that don't exist] } .. py:method:: transition_tasks(session: sqlalchemy.orm.Session, task_ids: List[int], to_status: str, increment_attempts: bool = False, max_retries: int = DEFAULT_MAX_RETRIES, use_skip_locked: bool = False) -> Dict[str, List[int]] :classmethod: Unified transition function for single or bulk operations. :param session: Database session (caller must commit on success) :param task_ids: Task IDs to transition (1 or many) :param to_status: Target status (from TaskStatus constants) :param increment_attempts: If True, increment num_attempts (for QUEUED) :param max_retries: Number of retry attempts on lock contention :param use_skip_locked: If True, skip locked rows instead of failing Use for bulk ops; requires MySQL 8.0+ :returns: { "transitioned": [ids that were updated], "invalid": [ids that failed FSM validation], "locked": [ids that were locked by another transaction], "not_found": [ids that don't exist] } .. note:: - Has internal retries with rollback on lock errors - Does NOT commit - caller must commit on success - Call at START of transaction (rollback will clear any prior work)