from http import HTTPStatus
from typing import Any
from flask import jsonify, request
from sqlalchemy import select
from jobmon.server.web.models.array import Array
from jobmon.server.web.models.task import Task
from jobmon.server.web.models.task_instance import TaskInstance
from jobmon.server.web.routes.v1 import api_v1_blueprint
from jobmon.server.web.routes.v2 import api_v2_blueprint
from jobmon.server.web.routes.v2 import SessionLocal
@api_v1_blueprint.route("/array/<workflow_id>/get_array_tasks")
@api_v2_blueprint.route("/array/<workflow_id>/get_array_tasks")
[docs]
def get_array_task_instances(workflow_id: int) -> Any:
"""Return error/output filepaths for task instances filtered by array name.
The user can also optionally filter by job name as well.
To avoid overly-large returned results, the user must also pass in a workflow ID.
"""
data = request.args
array_name = data.get("array_name")
job_name = data.get("job_name")
limit = data.get("limit", 5)
query_filters = [
Task.workflow_id == workflow_id,
TaskInstance.task_id == Task.id,
Task.array_id == Array.id,
]
if array_name:
query_filters.append(Array.name == array_name)
if job_name:
query_filters.append(Task.name == job_name)
session = SessionLocal()
with SessionLocal.begin():
select_stmt = (
select(
Task.id,
Task.name,
Array.name,
TaskInstance.id,
TaskInstance.stdout,
TaskInstance.stderr,
)
.where(*query_filters)
.limit(limit)
)
result = session.execute(select_stmt).all()
column_names = (
"TASK_ID",
"TASK_NAME",
"ARRAY_NAME",
"TASK_INSTANCE_ID",
"OUTPUT_PATH",
"ERROR_PATH",
)
resp = jsonify(array_tasks=[dict(zip(column_names, ti)) for ti in result])
resp.status_code = HTTPStatus.OK
return resp