Source code for server.web.routes.v2.cli.array

from http import HTTPStatus
from typing import Any, Optional

from fastapi import Query
from sqlalchemy import select
from starlette.responses import JSONResponse

from jobmon.server.web.db import get_sessionmaker
from jobmon.server.web.models.array import Array
from jobmon.server.web.models.task import Task
from jobmon.server.web.models.task_instance import TaskInstance
from jobmon.server.web.routes.v2.cli import cli_router as api_v2_router

[docs] SessionMaker = get_sessionmaker()
@api_v2_router.get("/array/{workflow_id}/get_array_tasks")
[docs] def get_array_task_instances( workflow_id: int, array_name: str, job_name: Optional[str] = Query(None), limit: Optional[int] = Query(None), ) -> Any: """Return error/output filepaths for task instances filtered by array name. The user can also optionally filter by job name as well. To avoid overly-large returned results, the user must also pass in a workflow ID. """ query_filters = [ Task.workflow_id == workflow_id, TaskInstance.task_id == Task.id, Task.array_id == Array.id, ] if array_name: query_filters.append(Array.name == array_name) if job_name: query_filters.append(Task.name == job_name) # set default limit to 5 if limit is None: limit = 5 with SessionMaker() as session: with session.begin(): select_stmt = ( select( Task.id, Task.name, Array.name, TaskInstance.id, TaskInstance.stdout, TaskInstance.stderr, ) .where(*query_filters) .limit(limit) ) result = session.execute(select_stmt).all() column_names = ( "TASK_ID", "TASK_NAME", "ARRAY_NAME", "TASK_INSTANCE_ID", "OUTPUT_PATH", "ERROR_PATH", ) array_tasks = [dict(zip(column_names, ti)) for ti in result] resp = JSONResponse( content={"array_tasks": array_tasks}, status_code=HTTPStatus.OK ) return resp