Source code for server.web.schemas.workflow

from typing import List, Optional

from pydantic import BaseModel


[docs] class WorkflowValidationRequest(BaseModel): """Request model for workflow validation."""
[docs] task_ids: List[int]
[docs] class WorkflowValidationResponse(BaseModel): """Response model for workflow validation."""
[docs] validation: bool
[docs] workflow_status: Optional[str] = None
[docs] class WorkflowTasksResponse(BaseModel): """Response model for workflow tasks."""
[docs] workflow_tasks: str # JSON string from pandas DataFrame
[docs] class WorkflowUserValidationResponse(BaseModel): """Response model for workflow user validation."""
[docs] validation: bool
[docs] class WorkflowResetRequest(BaseModel): """Request model for workflow reset."""
[docs] partial_reset: bool = False
[docs] class WorkflowRunForResetResponse(BaseModel): """Response model for workflow run reset validation."""
[docs] workflow_run_id: Optional[int]
[docs] class WorkflowStatusResponse(BaseModel): """Response model for workflow status."""
[docs] workflows: str # JSON string from pandas DataFrame
[docs] class WorkflowStatusVizResponse(BaseModel): """Response model for workflow status visualization.""" # Dictionary mapping workflow_id to status data pass # This will be a Dict[int, Any] but FastAPI handles this automatically
[docs] class WorkflowOverviewRequest(BaseModel): """Request model for workflow overview filters."""
[docs] user: Optional[str] = None
[docs] tool: Optional[str] = None
[docs] wf_name: Optional[str] = None
[docs] wf_args: Optional[str] = None
[docs] wf_attribute_value: Optional[str] = None
[docs] wf_attribute_key: Optional[str] = None
[docs] wf_id: Optional[str] = None
[docs] date_submitted: Optional[str] = None
[docs] date_submitted_end: Optional[str] = None
[docs] status: Optional[str] = None
[docs] class WorkflowOverviewItem(BaseModel): """Individual workflow item in overview response."""
[docs] wf_id: int
[docs] wf_name: str
[docs] wf_submitted_date: str
[docs] wf_status_date: str
[docs] wf_args: Optional[str]
[docs] wfr_count: int
[docs] wf_status: str
[docs] wf_tool: str
# Task status counts
[docs] PENDING: int = 0
[docs] SCHEDULED: int = 0
[docs] RUNNING: int = 0
[docs] DONE: int = 0
[docs] FATAL: int = 0
[docs] class WorkflowOverviewResponse(BaseModel): """Response model for workflow overview."""
[docs] workflows: List[WorkflowOverviewItem]
[docs] class TaskTableItem(BaseModel): """Individual task item in task table response."""
[docs] task_id: int
[docs] task_name: str
[docs] task_status: str
[docs] task_command: str
[docs] task_num_attempts: int
[docs] task_status_date: str
[docs] task_max_attempts: int
[docs] class TaskTableResponse(BaseModel): """Response model for task table visualization."""
[docs] tasks: List[TaskTableItem]
[docs] class WorkflowDetailsItem(BaseModel): """Workflow details item."""
[docs] wf_name: str
[docs] wf_args: Optional[str]
[docs] wf_created_date: str
[docs] wf_status_date: str
[docs] tool_name: str
[docs] wf_status: ( str # Changed from int to str - database returns string status codes like 'D' )
[docs] wf_status_desc: str
[docs] wfr_jobmon_version: Optional[str]
[docs] wfr_heartbeat_date: Optional[str]
[docs] wfr_user: str
[docs] class WorkflowDetailsResponse(BaseModel): """Response model for workflow details.""" pass # This will be a List[WorkflowDetailsItem] but FastAPI handles this automatically