from datetime import datetime
from typing import Any, Dict, List, Optional, Union
from pydantic import BaseModel, ConfigDict, Field, computed_field
[docs]
class TaskTemplateResourceUsageRequest(BaseModel):
[docs]
task_template_version_id: int
[docs]
workflows: Optional[List[int]] = None
[docs]
node_args: Optional[Dict[str, List[str]]] = None
[docs]
ci: Optional[str] = None
[docs]
class RequestedResourcesModel(BaseModel): # Optional: For parsing the JSON string
[docs]
memory: Optional[float] = None
[docs]
runtime: Optional[float] = None
[docs]
cores: Optional[float] = None
[docs]
queue: Optional[str] = None
# Add other fields that might be in the requested_resources JSON
[docs]
class TaskResourceDetailItem(BaseModel):
[docs]
r: Optional[float] = Field(default=None, alias="wallclock")
[docs]
m: Optional[int] = Field(default=None, alias="maxrss")
[docs]
task_name: Optional[str] = None
[docs]
requested_resources: Optional[str] = None # Raw JSON string from DB
[docs]
attempt_number_of_instance: Optional[int] = None # Added field
[docs]
status: Optional[str] = None # Added field: Will hold 'D', 'F', etc.
[docs]
task_status_date: Optional[datetime] = None
[docs]
task_command: Optional[str] = None
[docs]
task_num_attempts: Optional[int] = None
[docs]
task_max_attempts: Optional[int] = None
[docs]
model_config = ConfigDict(populate_by_name=True)
[docs]
class TaskResourceVizItem(BaseModel):
[docs]
r: Optional[float] = None
[docs]
m: Optional[int] = None
[docs]
task_name: Optional[str] = None
[docs]
requested_resources: Optional[str] = None
[docs]
attempt_number_of_instance: Optional[int] = None
[docs]
status: Optional[str] = None
[docs]
task_status_date: Optional[datetime] = None
[docs]
task_command: Optional[str] = None
[docs]
task_num_attempts: Optional[int] = None
[docs]
task_max_attempts: Optional[int] = None
[docs]
class TaskTemplateResourceUsageResponse(BaseModel):
"""Unified response model for task template resource usage."""
[docs]
model_config = ConfigDict(populate_by_name=True)
# Core statistics (what both GUI and client need)
[docs]
num_tasks: Optional[int] = None
[docs]
min_mem: Optional[int] = None # bytes
[docs]
max_mem: Optional[int] = None # bytes
[docs]
mean_mem: Optional[float] = None # bytes
[docs]
min_runtime: Optional[int] = None # seconds
[docs]
max_runtime: Optional[int] = None # seconds
[docs]
mean_runtime: Optional[float] = None # seconds
# Confidence intervals (can be null for small datasets)
[docs]
ci_mem: Optional[List[Union[float, None]]] = None
[docs]
ci_runtime: Optional[List[Union[float, None]]] = None
# Visualization data (optional, only when viz=True)
[docs]
result_viz: Optional[List[TaskResourceVizItem]] = None
# Computed properties for client convenience
@computed_field
[docs]
class TaskTemplateDetailsResponse(BaseModel):
[docs]
task_template_name: str
[docs]
task_template_version_id: int
[docs]
class TaskTemplateVersionItem(BaseModel):
"""Individual task template version item."""
[docs]
class TaskTemplateVersionResponse(BaseModel):
"""Response model for task template version queries."""
[docs]
task_template_version_ids: List[TaskTemplateVersionItem]
[docs]
class CoreInfoItem(BaseModel):
"""Individual core info item."""
[docs]
class RequestedCoresResponse(BaseModel):
"""Response model for requested cores queries."""
[docs]
core_info: List[CoreInfoItem]
[docs]
class QueueInfoItem(BaseModel):
"""Individual queue info item."""
[docs]
class MostPopularQueueResponse(BaseModel):
"""Response model for most popular queue queries."""
[docs]
queue_info: List[QueueInfoItem]
[docs]
class WorkflowTaskTemplateStatusItem(BaseModel):
"""Individual workflow task template status item."""
[docs]
MAXC: Union[int, str] # Can be int or "NA"
[docs]
num_attempts_min: Optional[float]
[docs]
num_attempts_max: Optional[float]
[docs]
num_attempts_avg: Optional[float]
[docs]
task_template_version_id: int
[docs]
class ErrorLogItem(BaseModel):
"""Error log item - can represent individual errors or clustered errors."""
# Individual error fields (required for non-clustered, optional for clustered)
[docs]
task_id: Optional[int] = None
[docs]
task_instance_id: Optional[int] = None
[docs]
task_instance_err_id: Optional[int] = None
[docs]
error_time: Optional[datetime] = None
[docs]
error: Optional[str] = None
[docs]
task_instance_stderr_log: Optional[str] = None
# Common fields (always present)
# Clustering fields (only present when clustering is enabled)
[docs]
error_score: Optional[float] = None
[docs]
group_instance_count: Optional[int] = None
[docs]
task_instance_ids: Optional[List[int]] = None
[docs]
task_ids: Optional[List[int]] = None
[docs]
sample_error: Optional[str] = None
[docs]
first_error_time: Optional[datetime] = None
[docs]
model_config = ConfigDict(
# This will serialize datetime objects to ISO strings in JSON responses
json_encoders={datetime: lambda v: v.isoformat()}
)
[docs]
class ErrorLogResponse(BaseModel):
"""Response model for error log queries."""
[docs]
error_logs: List[ErrorLogItem]