server.web.repositories.task_template_repository

Attributes

logger

Classes

ResourceUsageStatistics

Clean data class for resource usage statistics.

TaskTemplateRepository

Module Contents

server.web.repositories.task_template_repository.logger[source]
class server.web.repositories.task_template_repository.ResourceUsageStatistics[source]

Clean data class for resource usage statistics.

num_tasks: int | None = None[source]
min_mem: int | None = None[source]
max_mem: int | None = None[source]
mean_mem: float | None = None[source]
min_runtime: int | None = None[source]
max_runtime: int | None = None[source]
mean_runtime: float | None = None[source]
median_mem: float | None = None[source]
median_runtime: float | None = None[source]
ci_mem: List[float | None] | None = None[source]
ci_runtime: List[float | None] | None = None[source]
viz_data: List[jobmon.server.web.schemas.task_template.TaskResourceVizItem] | None = None[source]
class server.web.repositories.task_template_repository.TaskTemplateRepository(session: sqlalchemy.orm.Session)[source]
session[source]
_convert_to_task_resource_detail_item(row_data: Dict[str, Any]) jobmon.server.web.schemas.task_template.TaskResourceDetailItem | None[source]

Convert raw database row to TaskResourceDetailItem, with error handling.

get_task_resource_details(task_template_version_id: int, workflows: List[int] | None, node_args: Dict[str, List[Any]] | None) List[jobmon.server.web.schemas.task_template.TaskResourceDetailItem][source]

Fetch and filter task resource details with optimized single-query approach.

_get_task_resource_details_with_node_args(task_template_version_id: int, workflows: List[int] | None, node_args: Dict[str, List[Any]], base_filters: List[sqlalchemy.sql.elements.ColumnElement], attempt_number_col: sqlalchemy.sql.elements.Label) List[jobmon.server.web.schemas.task_template.TaskResourceDetailItem][source]

Optimized node_args filtering using database-level joins and filtering.

calculate_resource_statistics(task_details: List[jobmon.server.web.schemas.task_template.TaskResourceDetailItem], confidence_interval: str | None = None, task_template_version_id: int | None = None) ResourceUsageStatistics[source]

Calculate statistics from task details using scipy.stats.

get_task_template_resource_usage(req: jobmon.server.web.schemas.task_template.TaskTemplateResourceUsageRequest) List[jobmon.server.web.schemas.task_template.TaskResourceVizItem] | None[source]
_find_ttvid(workflow_id: int, task_template_id: int) int | None[source]

Find the task template version id using workflow and task template ids.

This could be slow for tt with huge nodes. However, given one workflow only us one tt version of a tt, we can search all the versions backwords to get the first none 0 version.

Parameters:
  • workflow_id – ID of the workflow

  • task_template_id – ID of the task template

  • db – Database session

Returns:

Task template version id

get_task_template_details(workflow_id: int, task_template_id: int) jobmon.server.web.schemas.task_template.TaskTemplateDetailsResponse | None[source]

Get task template details.

get_task_template_versions(task_id: int | None = None, workflow_id: int | None = None) jobmon.server.web.schemas.task_template.TaskTemplateVersionResponse | None[source]

Get task template version IDs and names for a task or workflow.

Parameters:
  • task_id – Optional task ID to get task template version for

  • workflow_id – Optional workflow ID to get all task template versions for

Returns:

TaskTemplateVersionResponse with list of task template versions, or None if no data found.

Note

If both task_id and workflow_id are provided, task_id takes precedence.

get_requested_cores(task_template_version_ids: List[int]) jobmon.server.web.schemas.task_template.RequestedCoresResponse[source]

Get the min, max, and avg of requested cores for task template versions.

Parameters:

task_template_version_ids – List of task template version IDs

Returns:

RequestedCoresResponse with core information for each task template version

Get the most popular queue for task template versions.

Parameters:

task_template_version_ids – List of task template version IDs

Returns:

MostPopularQueueResponse with queue information for each task template version

get_workflow_tt_status_viz(workflow_id: int, dialect: str = 'sqlite') Dict[int, jobmon.server.web.schemas.task_template.WorkflowTaskTemplateStatusItem][source]

Get the status of workflow task templates for GUI visualization.

Optimized version using single query with SQL aggregation instead of two separate queries and Python-level aggregation.

Parameters:
  • workflow_id – ID of the workflow

  • dialect – Database dialect for optimization hints

Returns:

Dictionary mapping task template ID to WorkflowTaskTemplateStatusItem

get_tt_error_log_viz(workflow_id: int, task_template_id: int, task_instance_id: int | None = None, page: int = 1, page_size: int = 10, recent_errors_only: bool = False, cluster_errors: bool = False) jobmon.server.web.schemas.task_template.ErrorLogResponse[source]

Get error logs for a task template ID for GUI visualization.

Parameters:
  • workflow_id – ID of the workflow

  • task_template_id – ID of the task template

  • task_instance_id – Optional specific task instance ID

  • page – Page number for pagination

  • page_size – Number of items per page

  • recent_errors_only – Whether to show only recent errors

  • cluster_errors – Whether to cluster similar errors

Returns:

ErrorLogResponse with paginated error log data