from typing import List, Optional
from pydantic import BaseModel
[docs]
class WorkflowValidationRequest(BaseModel):
"""Request model for workflow validation."""
[docs]
class WorkflowValidationResponse(BaseModel):
"""Response model for workflow validation."""
[docs]
workflow_status: Optional[str] = None
[docs]
class WorkflowTasksResponse(BaseModel):
"""Response model for workflow tasks."""
[docs]
workflow_tasks: str # JSON string from pandas DataFrame
[docs]
class WorkflowUserValidationResponse(BaseModel):
"""Response model for workflow user validation."""
[docs]
class WorkflowResetRequest(BaseModel):
"""Request model for workflow reset."""
[docs]
partial_reset: bool = False
[docs]
class WorkflowRunForResetResponse(BaseModel):
"""Response model for workflow run reset validation."""
[docs]
workflow_run_id: Optional[int]
[docs]
class WorkflowStatusResponse(BaseModel):
"""Response model for workflow status."""
[docs]
workflows: str # JSON string from pandas DataFrame
[docs]
class WorkflowStatusVizResponse(BaseModel):
"""Response model for workflow status visualization."""
# Dictionary mapping workflow_id to status data
pass # This will be a Dict[int, Any] but FastAPI handles this automatically
[docs]
class WorkflowOverviewRequest(BaseModel):
"""Request model for workflow overview filters."""
[docs]
user: Optional[str] = None
[docs]
wf_name: Optional[str] = None
[docs]
wf_args: Optional[str] = None
[docs]
wf_attribute_value: Optional[str] = None
[docs]
wf_attribute_key: Optional[str] = None
[docs]
wf_id: Optional[str] = None
[docs]
date_submitted: Optional[str] = None
[docs]
date_submitted_end: Optional[str] = None
[docs]
status: Optional[str] = None
[docs]
class WorkflowOverviewItem(BaseModel):
"""Individual workflow item in overview response."""
# Task status counts
[docs]
class WorkflowOverviewResponse(BaseModel):
"""Response model for workflow overview."""
[docs]
workflows: List[WorkflowOverviewItem]
[docs]
class TaskTableItem(BaseModel):
"""Individual task item in task table response."""
[docs]
class TaskTableResponse(BaseModel):
"""Response model for task table visualization."""
[docs]
tasks: List[TaskTableItem]
[docs]
class WorkflowDetailsItem(BaseModel):
"""Workflow details item."""
[docs]
wf_status: (
str # Changed from int to str - database returns string status codes like 'D'
)
[docs]
wfr_jobmon_version: Optional[str]
[docs]
wfr_heartbeat_date: Optional[str]
[docs]
class WorkflowDetailsResponse(BaseModel):
"""Response model for workflow details."""
pass # This will be a List[WorkflowDetailsItem] but FastAPI handles this automatically