server.web.routes.v3.fsm.workflow
Routes for Workflows.
Attributes
Functions
|
Bind a workflow to the database. |
Return any dag hashes that are assigned to workflows with identical workflow args. |
|
|
Update the attributes for a given workflow. |
|
Set resume on a workflow. |
|
Check if a workflow is in a resumable state. |
|
Force cleanup of stuck KILL_SELF task instances and finalize workflow run. |
|
Return the maximum concurrency of this workflow. |
|
Update the number of tasks that can be running concurrently for a given workflow. |
|
Returns all tasks in the database that have the specified status. |
|
Get metadata associated with specified Workflow ID. |
|
Return tasks associated with specified Workflow ID. |
|
Return all available workflow statuses. |
Increase resources for tasks in E or F whose latest TaskInstance is Z. |
|
|
Update the number of tasks that can be running concurrently for a given Array. |
|
Compute the shape of a Workflow's DAG by TaskTemplate. |
Module Contents
- server.web.routes.v3.fsm.workflow.logger
- async server.web.routes.v3.fsm.workflow.bind_workflow(request: fastapi.Request, db: sqlalchemy.orm.Session = Depends(get_db), dialect: str = Depends(get_dialect)) Any
Bind a workflow to the database.
- async server.web.routes.v3.fsm.workflow.get_matching_workflows_by_workflow_args(workflow_args_hash: str, request: fastapi.Request, db: sqlalchemy.orm.Session = Depends(get_db)) Any
Return any dag hashes that are assigned to workflows with identical workflow args.
- async server.web.routes.v3.fsm.workflow.update_workflow_attribute(workflow_id: int, request: fastapi.Request, db: sqlalchemy.orm.Session = Depends(get_db), dialect: str = Depends(get_dialect)) Any
Update the attributes for a given workflow.
- async server.web.routes.v3.fsm.workflow.set_resume(workflow_id: int, request: fastapi.Request, db: sqlalchemy.orm.Session = Depends(get_db)) Any
Set resume on a workflow.
- server.web.routes.v3.fsm.workflow.workflow_is_resumable(workflow_id: int, db: sqlalchemy.orm.Session = Depends(get_db)) Any
Check if a workflow is in a resumable state.
- Returns:
True if workflow can be resumed pending_kill_self: Number of KILL_SELF task instances waiting for cleanup
- Return type:
workflow_is_resumable
- server.web.routes.v3.fsm.workflow.force_cleanup_kill_self(workflow_id: int, db: sqlalchemy.orm.Session = Depends(get_db)) Any
Force cleanup of stuck KILL_SELF task instances and finalize workflow run.
Use this when jobs have been externally terminated (e.g., scancel, node failure) and the workflow is stuck waiting for cleanup that will never happen.
This transitions all KILL_SELF task instances to ERROR_FATAL and then immediately finalizes any workflow runs in COLD_RESUME/HOT_RESUME state to TERMINATED (instead of waiting for the reaper).
- async server.web.routes.v3.fsm.workflow.get_max_concurrently_running(workflow_id: int, request: fastapi.Request, db: sqlalchemy.orm.Session = Depends(get_db)) Any
Return the maximum concurrency of this workflow.
- async server.web.routes.v3.fsm.workflow.update_max_running(workflow_id: int, request: fastapi.Request, db: sqlalchemy.orm.Session = Depends(get_db)) Any
Update the number of tasks that can be running concurrently for a given workflow.
- async server.web.routes.v3.fsm.workflow.task_status_updates(workflow_id: int, request: fastapi.Request, db: sqlalchemy.orm.Session = Depends(get_db)) Any
Returns all tasks in the database that have the specified status.
- Parameters:
workflow_id (int) – the ID of the workflow.
request (Request) – the request object.
db (Session) – the database session.
- server.web.routes.v3.fsm.workflow.fetch_workflow_metadata(workflow_id: int, db: sqlalchemy.orm.Session = Depends(get_db)) Any
Get metadata associated with specified Workflow ID.
- server.web.routes.v3.fsm.workflow.get_tasks_from_workflow(workflow_id: int, max_task_id: int, chunk_size: int, db: sqlalchemy.orm.Session = Depends(get_db)) Any
Return tasks associated with specified Workflow ID.
- server.web.routes.v3.fsm.workflow.get_available_workflow_statuses(db: sqlalchemy.orm.Session = Depends(get_db)) Any
Return all available workflow statuses.
- async server.web.routes.v3.fsm.workflow.increase_resources_for_resource_error_tasks(workflow_id: int, request: fastapi.Request, db: sqlalchemy.orm.Session = Depends(get_db)) Any
Increase resources for tasks in E or F whose latest TaskInstance is Z.
Steps per task: - Update Task.status -> ERROR_RECOVERABLE (E) - Load TaskResources.requested_resources JSON - Load Task.resource_scales (stringified dict) - Apply scaling:
numeric value => ceil(val * (1 + scale))
list value => absolute value chosen by attempt index
Update TaskResources.requested_resources JSON
Set TaskResources.task_resources_type_id -> ‘A’ (Adjusted)
- async server.web.routes.v3.fsm.workflow.update_array_max_running(workflow_id: int, request: fastapi.Request, db: sqlalchemy.orm.Session = Depends(get_db)) Any
Update the number of tasks that can be running concurrently for a given Array.
- async server.web.routes.v3.fsm.workflow.task_template_dag(workflow_id: str, db: sqlalchemy.orm.Session = Depends(get_db)) Any
Compute the shape of a Workflow’s DAG by TaskTemplate.
Templates whose nodes span multiple pipeline stages are split into numbered phases so the resulting graph stays acyclic and faithfully represents the execution order.