server.web.routes.v3.fsm.workflow ================================= .. py:module:: server.web.routes.v3.fsm.workflow .. autoapi-nested-parse:: Routes for Workflows. Attributes ---------- .. autoapisummary:: server.web.routes.v3.fsm.workflow.logger Functions --------- .. autoapisummary:: server.web.routes.v3.fsm.workflow.bind_workflow server.web.routes.v3.fsm.workflow.get_matching_workflows_by_workflow_args server.web.routes.v3.fsm.workflow.update_workflow_attribute server.web.routes.v3.fsm.workflow.set_resume server.web.routes.v3.fsm.workflow.workflow_is_resumable server.web.routes.v3.fsm.workflow.force_cleanup_kill_self server.web.routes.v3.fsm.workflow.get_max_concurrently_running server.web.routes.v3.fsm.workflow.update_max_running server.web.routes.v3.fsm.workflow.task_status_updates server.web.routes.v3.fsm.workflow.fetch_workflow_metadata server.web.routes.v3.fsm.workflow.get_tasks_from_workflow server.web.routes.v3.fsm.workflow.get_available_workflow_statuses server.web.routes.v3.fsm.workflow.increase_resources_for_resource_error_tasks server.web.routes.v3.fsm.workflow.update_array_max_running server.web.routes.v3.fsm.workflow.task_template_dag Module Contents --------------- .. py:data:: logger .. py:function:: bind_workflow(request: fastapi.Request, db: sqlalchemy.orm.Session = Depends(get_db), dialect: str = Depends(get_dialect)) -> Any :async: Bind a workflow to the database. .. py:function:: get_matching_workflows_by_workflow_args(workflow_args_hash: str, request: fastapi.Request, db: sqlalchemy.orm.Session = Depends(get_db)) -> Any :async: Return any dag hashes that are assigned to workflows with identical workflow args. .. py:function:: update_workflow_attribute(workflow_id: int, request: fastapi.Request, db: sqlalchemy.orm.Session = Depends(get_db), dialect: str = Depends(get_dialect)) -> Any :async: Update the attributes for a given workflow. .. py:function:: set_resume(workflow_id: int, request: fastapi.Request, db: sqlalchemy.orm.Session = Depends(get_db)) -> Any :async: Set resume on a workflow. .. py:function:: workflow_is_resumable(workflow_id: int, db: sqlalchemy.orm.Session = Depends(get_db)) -> Any Check if a workflow is in a resumable state. :returns: True if workflow can be resumed pending_kill_self: Number of KILL_SELF task instances waiting for cleanup :rtype: workflow_is_resumable .. py:function:: force_cleanup_kill_self(workflow_id: int, db: sqlalchemy.orm.Session = Depends(get_db)) -> Any Force cleanup of stuck KILL_SELF task instances and finalize workflow run. Use this when jobs have been externally terminated (e.g., scancel, node failure) and the workflow is stuck waiting for cleanup that will never happen. This transitions all KILL_SELF task instances to ERROR_FATAL and then immediately finalizes any workflow runs in COLD_RESUME/HOT_RESUME state to TERMINATED (instead of waiting for the reaper). .. py:function:: get_max_concurrently_running(workflow_id: int, request: fastapi.Request, db: sqlalchemy.orm.Session = Depends(get_db)) -> Any :async: Return the maximum concurrency of this workflow. .. py:function:: update_max_running(workflow_id: int, request: fastapi.Request, db: sqlalchemy.orm.Session = Depends(get_db)) -> Any :async: Update the number of tasks that can be running concurrently for a given workflow. .. py:function:: task_status_updates(workflow_id: int, request: fastapi.Request, db: sqlalchemy.orm.Session = Depends(get_db)) -> Any :async: Returns all tasks in the database that have the specified status. :param workflow_id: the ID of the workflow. :type workflow_id: int :param request: the request object. :type request: Request :param db: the database session. :type db: Session .. py:function:: fetch_workflow_metadata(workflow_id: int, db: sqlalchemy.orm.Session = Depends(get_db)) -> Any Get metadata associated with specified Workflow ID. .. py:function:: get_tasks_from_workflow(workflow_id: int, max_task_id: int, chunk_size: int, db: sqlalchemy.orm.Session = Depends(get_db)) -> Any Return tasks associated with specified Workflow ID. .. py:function:: get_available_workflow_statuses(db: sqlalchemy.orm.Session = Depends(get_db)) -> Any Return all available workflow statuses. .. py:function:: increase_resources_for_resource_error_tasks(workflow_id: int, request: fastapi.Request, db: sqlalchemy.orm.Session = Depends(get_db)) -> Any :async: Increase resources for tasks in E or F whose latest TaskInstance is Z. Steps per task: - Update Task.status -> ERROR_RECOVERABLE (E) - Load TaskResources.requested_resources JSON - Load Task.resource_scales (stringified dict) - Apply scaling: * numeric value => ceil(val * (1 + scale)) * list value => absolute value chosen by attempt index - Update TaskResources.requested_resources JSON - Set TaskResources.task_resources_type_id -> 'A' (Adjusted) .. py:function:: update_array_max_running(workflow_id: int, request: fastapi.Request, db: sqlalchemy.orm.Session = Depends(get_db)) -> Any :async: Update the number of tasks that can be running concurrently for a given Array. .. py:function:: task_template_dag(workflow_id: str, db: sqlalchemy.orm.Session = Depends(get_db)) -> Any :async: Compute the shape of a Workflow's DAG by TaskTemplate.