Source code for server.web.schemas.task_template

from datetime import datetime
from typing import Any, Dict, List, Optional, Union

from pydantic import BaseModel, ConfigDict, Field, computed_field


[docs] class TaskTemplateResourceUsageRequest(BaseModel):
[docs] task_template_version_id: int
[docs] workflows: Optional[List[int]] = None
[docs] node_args: Optional[Dict[str, List[str]]] = None
[docs] ci: Optional[str] = None
[docs] viz: bool = False
[docs] class RequestedResourcesModel(BaseModel): # Optional: For parsing the JSON string
[docs] memory: Optional[float] = None
[docs] runtime: Optional[float] = None
[docs] cores: Optional[float] = None
[docs] queue: Optional[str] = None
# Add other fields that might be in the requested_resources JSON
[docs] class TaskResourceDetailItem(BaseModel):
[docs] r: Optional[float] = Field(default=None, alias="wallclock")
[docs] m: Optional[int] = Field(default=None, alias="maxrss")
[docs] node_id: int
[docs] task_id: int
[docs] task_name: Optional[str] = None
[docs] requested_resources: Optional[str] = None # Raw JSON string from DB
[docs] attempt_number_of_instance: Optional[int] = None # Added field
[docs] status: Optional[str] = None # Added field: Will hold 'D', 'F', etc.
[docs] task_status_date: Optional[datetime] = None
[docs] task_command: Optional[str] = None
[docs] task_num_attempts: Optional[int] = None
[docs] task_max_attempts: Optional[int] = None
[docs] model_config = ConfigDict(populate_by_name=True)
[docs] class TaskResourceVizItem(BaseModel):
[docs] r: Optional[float] = None
[docs] m: Optional[int] = None
[docs] node_id: int
[docs] task_id: int
[docs] task_name: Optional[str] = None
[docs] requested_resources: Optional[str] = None
[docs] attempt_number_of_instance: Optional[int] = None
[docs] status: Optional[str] = None
[docs] task_status_date: Optional[datetime] = None
[docs] task_command: Optional[str] = None
[docs] task_num_attempts: Optional[int] = None
[docs] task_max_attempts: Optional[int] = None
[docs] class TaskTemplateResourceUsageResponse(BaseModel): """Unified response model for task template resource usage."""
[docs] model_config = ConfigDict(populate_by_name=True)
# Core statistics (what both GUI and client need)
[docs] num_tasks: Optional[int] = None
[docs] min_mem: Optional[int] = None # bytes
[docs] max_mem: Optional[int] = None # bytes
[docs] mean_mem: Optional[float] = None # bytes
[docs] min_runtime: Optional[int] = None # seconds
[docs] max_runtime: Optional[int] = None # seconds
[docs] mean_runtime: Optional[float] = None # seconds
[docs] median_mem: Optional[float] = None # bytes
[docs] median_runtime: Optional[float] = None # seconds
# Confidence intervals (can be null for small datasets)
[docs] ci_mem: Optional[List[Union[float, None]]] = None
[docs] ci_runtime: Optional[List[Union[float, None]]] = None
# Visualization data (optional, only when viz=True)
[docs] result_viz: Optional[List[TaskResourceVizItem]] = None
# Computed properties for client convenience @computed_field
[docs] def formatted_stats(self) -> Dict[str, Any]: """Provide formatted statistics similar to legacy client format.""" return { "num_tasks": self.num_tasks, "min_mem": f"{self.min_mem}B" if self.min_mem is not None else None, "max_mem": f"{self.max_mem}B" if self.max_mem is not None else None, "mean_mem": f"{self.mean_mem}B" if self.mean_mem is not None else None, "min_runtime": self.min_runtime, "max_runtime": self.max_runtime, "mean_runtime": self.mean_runtime, "median_mem": ( f"{self.median_mem}B" if self.median_mem is not None else None ), "median_runtime": self.median_runtime, "ci_mem": self.ci_mem, "ci_runtime": self.ci_runtime, }
[docs] class TaskTemplateDetailsResponse(BaseModel):
[docs] task_template_id: int
[docs] task_template_name: str
[docs] task_template_version_id: int
[docs] class TaskTemplateVersionItem(BaseModel): """Individual task template version item."""
[docs] id: int
[docs] name: str
[docs] class TaskTemplateVersionResponse(BaseModel): """Response model for task template version queries."""
[docs] task_template_version_ids: List[TaskTemplateVersionItem]
[docs] class CoreInfoItem(BaseModel): """Individual core info item."""
[docs] id: int
[docs] min: int
[docs] max: int
[docs] avg: int
[docs] class RequestedCoresResponse(BaseModel): """Response model for requested cores queries."""
[docs] core_info: List[CoreInfoItem]
[docs] class QueueInfoItem(BaseModel): """Individual queue info item."""
[docs] id: int
[docs] queue: str
[docs] queue_id: int
[docs] class MostPopularQueueResponse(BaseModel): """Response model for most popular queue queries."""
[docs] queue_info: List[QueueInfoItem]
[docs] class WorkflowTaskTemplateStatusItem(BaseModel): """Individual workflow task template status item."""
[docs] id: int
[docs] name: str
[docs] tasks: int
[docs] PENDING: int
[docs] SCHEDULED: int
[docs] RUNNING: int
[docs] DONE: int
[docs] FATAL: int
[docs] MAXC: Union[int, str] # Can be int or "NA"
[docs] num_attempts_min: Optional[float]
[docs] num_attempts_max: Optional[float]
[docs] num_attempts_avg: Optional[float]
[docs] task_template_version_id: int
[docs] class ErrorLogItem(BaseModel): """Error log item - can represent individual errors or clustered errors.""" # Individual error fields (required for non-clustered, optional for clustered)
[docs] task_id: Optional[int] = None
[docs] task_instance_id: Optional[int] = None
[docs] task_instance_err_id: Optional[int] = None
[docs] error_time: Optional[datetime] = None
[docs] error: Optional[str] = None
[docs] task_instance_stderr_log: Optional[str] = None
# Common fields (always present)
[docs] workflow_run_id: int
[docs] workflow_id: int
# Clustering fields (only present when clustering is enabled)
[docs] error_score: Optional[float] = None
[docs] group_instance_count: Optional[int] = None
[docs] task_instance_ids: Optional[List[int]] = None
[docs] task_ids: Optional[List[int]] = None
[docs] sample_error: Optional[str] = None
[docs] first_error_time: Optional[datetime] = None
[docs] model_config = ConfigDict( # This will serialize datetime objects to ISO strings in JSON responses json_encoders={datetime: lambda v: v.isoformat()} )
[docs] class ErrorLogResponse(BaseModel): """Response model for error log queries."""
[docs] error_logs: List[ErrorLogItem]
[docs] total_count: int
[docs] page: int
[docs] page_size: int