Source code for server.web.schemas.task

from typing import Any, Dict, List, Optional, Union

from pydantic import BaseModel


[docs] class TaskStatusRequest(BaseModel): """Request model for task status queries."""
[docs] task_ids: Optional[Union[int, List[int]]] = None
[docs] status: Optional[Union[str, List[str]]] = None
[docs] class TaskStatusItem(BaseModel): """Individual task status item."""
[docs] TASK_ID: int
[docs] task_status: str
[docs] TASK_INSTANCE_ID: int
[docs] DISTRIBUTOR_ID: Optional[str]
[docs] STATUS: str
[docs] RESOURCE_USAGE: Optional[str]
[docs] STDOUT: Optional[str]
[docs] STDERR: Optional[str]
[docs] ERROR_TRACE: Optional[str]
[docs] class TaskStatusResponse(BaseModel): """Response model for task status."""
[docs] task_instance_status: str # JSON string from pandas DataFrame
[docs] class TaskSubdagRequest(BaseModel): """Request model for task subdag."""
[docs] task_ids: List[int]
[docs] task_status: Optional[List[str]] = []
[docs] class TaskSubdagResponse(BaseModel): """Response model for task subdag."""
[docs] workflow_id: Optional[int]
[docs] sub_task: Optional[Dict[int, Any]] # Changed from str to int - keys are task IDs
[docs] class TaskDependencyItem(BaseModel): """Individual task dependency item."""
[docs] id: int
[docs] status: str
[docs] name: str
[docs] class TaskDependenciesResponse(BaseModel): """Response model for task dependencies."""
[docs] up: List[List[TaskDependencyItem]]
[docs] down: List[List[TaskDependencyItem]]
[docs] class TasksRecursiveRequest(BaseModel): """Request model for recursive tasks."""
[docs] task_ids: List[int]
[docs] class TasksRecursiveResponse(BaseModel): """Response model for recursive tasks."""
[docs] task_ids: List[int]
[docs] class TaskResourceUsageResponse(BaseModel): """Response model for task resource usage."""
[docs] resource_usage: List[Any] # tuple from SerializeTaskResourceUsage.to_wire()
[docs] class DownstreamTasksRequest(BaseModel): """Request model for downstream tasks."""
[docs] task_ids: List[int]
[docs] dag_id: int
[docs] class DownstreamTasksResponse(BaseModel): """Response model for downstream tasks."""
[docs] downstream_tasks: Dict[int, List[Any]]
[docs] class TaskInstanceDetailItem(BaseModel): """Individual task instance detail item."""
[docs] ti_id: int
[docs] ti_status: str
[docs] ti_stdout: Optional[str]
[docs] ti_stderr: Optional[str]
[docs] ti_stdout_log: Optional[str]
[docs] ti_stderr_log: Optional[str]
[docs] ti_distributor_id: Optional[str]
[docs] ti_nodename: Optional[str]
[docs] ti_error_log_description: Optional[str]
[docs] ti_wallclock: Optional[float]
[docs] ti_maxrss: Optional[float]
[docs] ti_resources: Optional[str]
[docs] ti_submit_date: Optional[str]
[docs] ti_status_date: Optional[str]
[docs] ti_queue_name: Optional[str]
[docs] class TaskInstanceDetailsResponse(BaseModel): """Response model for task instance details."""
[docs] taskinstances: List[TaskInstanceDetailItem]
[docs] class TaskDetailItem(BaseModel): """Individual task detail item."""
[docs] task_status: str
[docs] workflow_id: int
[docs] task_name: str
[docs] task_command: str
[docs] task_status_date: str
[docs] task_template_id: int
[docs] class TaskDetailsResponse(BaseModel): """Response model for task details."""
[docs] task_details: List[TaskDetailItem]