server.web.routes.v3.cli.task_concurrency

Routes for Task Concurrency Timeline.

Attributes

logger

PENDING_STATUSES

ERROR_STATUSES

DONE_STATUS

GROUP_BY_STATUS

TIMELINE_CATEGORIES

Functions

get_task_concurrency(, end_time, group_by, ...)

Get concurrent task counts over time, grouped by status.

get_task_status_audit(, limit, db)

Get task status audit records for a workflow.

get_workflow_task_templates(...)

Get list of task template names used in a workflow.

get_template_timeline(...)

Get template execution timeline for a workflow.

Module Contents

server.web.routes.v3.cli.task_concurrency.logger
server.web.routes.v3.cli.task_concurrency.PENDING_STATUSES
server.web.routes.v3.cli.task_concurrency.ERROR_STATUSES
server.web.routes.v3.cli.task_concurrency.DONE_STATUS = 'D'
server.web.routes.v3.cli.task_concurrency.GROUP_BY_STATUS = 'status'
async server.web.routes.v3.cli.task_concurrency.get_task_concurrency(workflow_id: int, start_time: datetime.datetime | None = Query(None, description='Start of time range'), end_time: datetime.datetime | None = Query(None, description='End of time range'), group_by: str = Query(GROUP_BY_STATUS, description='Grouping strategy for the series (only "status" supported)'), bucket_seconds: int = Query(None, ge=1, le=3600, description='Bucket size in seconds (overrides bucket_minutes)'), bucket_minutes: int = Query(1, ge=1, le=60, description='Bucket size in minutes'), task_template_name: str | None = Query(None, description='Filter by task template name'), db: sqlalchemy.orm.Session = Depends(get_db)) jobmon.server.web.schemas.task_concurrency.TaskConcurrencyResponse

Get concurrent task counts over time, grouped by status.

Returns time-bucketed counts for timeseries visualization. Uses task_status_audit table with ix_task_status_audit_workflow_time index.

Groups by status category (PENDING, LAUNCHED, RUNNING, ERROR, DONE). The only supported grouping strategy is “status”. Note: DONE and ERROR show tasks that entered those states within each bucket (not cumulative or interval overlap).

The response includes: - buckets: List of time bucket start times - series: Dict mapping status names to lists of concurrent counts - template_breakdown: Dict mapping status to list of {template: count} per bucket

Example response: {

“buckets”: [“2024-01-01T00:00:00”, “2024-01-01T00:01:00”, …], “series”: {

“PENDING”: [5, 8, 10, 7, …], “LAUNCHED”: [3, 5, 4, 2, …], “RUNNING”: [2, 3, 4, 3, …], “ERROR”: [0, 1, 0, 2, …], // error events within each bucket “DONE”: [0, 2, 5, 3, …] // tasks completed within each bucket

}, “template_breakdown”: {

“RUNNING”: [{“template_a”: 2, “template_b”: 1}, …], “ERROR”: [{“template_a”: 0, “template_b”: 1}, …], “DONE”: [{“template_a”: 0, “template_b”: 2}, …]

}

}

async server.web.routes.v3.cli.task_concurrency.get_task_status_audit(workflow_id: int, task_id: int | None = Query(None, description='Filter by specific task ID'), limit: int = Query(100, ge=1, le=1000, description='Maximum records to return'), db: sqlalchemy.orm.Session = Depends(get_db)) jobmon.server.web.schemas.task_concurrency.TaskStatusAuditResponse

Get task status audit records for a workflow.

Returns the audit trail showing all task status transitions. Useful for debugging and understanding task lifecycle.

Parameters:
  • workflow_id – The workflow ID to get audit records for

  • task_id – Optional filter for a specific task

  • limit – Maximum number of records to return

Returns:

List of audit records with task_id, previous_status, new_status, timestamps

async server.web.routes.v3.cli.task_concurrency.get_workflow_task_templates(workflow_id: int, db: sqlalchemy.orm.Session = Depends(get_db)) jobmon.server.web.schemas.task_concurrency.WorkflowTaskTemplatesResponse

Get list of task template names used in a workflow.

Returns distinct task template names for tasks in this workflow. Used for filtering the concurrency view by template.

Parameters:

workflow_id – The workflow ID to get templates for

Returns:

List of distinct task template names, sorted alphabetically

server.web.routes.v3.cli.task_concurrency.TIMELINE_CATEGORIES = ['REGISTERED', 'PENDING', 'LAUNCHED', 'RUNNING', 'ERROR', 'DONE']
async server.web.routes.v3.cli.task_concurrency.get_template_timeline(workflow_id: int, db: sqlalchemy.orm.Session = Depends(get_db)) jobmon.server.web.schemas.task_concurrency.TemplateTimelineResponse

Get template execution timeline for a workflow.

Returns per-template event-driven time series: at every status-transition timestamp the exact task count per status category is recorded. No bucketing — the frontend renders a continuous stacked area chart from the raw events.