server.web.routes.v3.cli.task_concurrency ========================================= .. py:module:: server.web.routes.v3.cli.task_concurrency .. autoapi-nested-parse:: Routes for Task Concurrency Timeline. Attributes ---------- .. autoapisummary:: server.web.routes.v3.cli.task_concurrency.logger server.web.routes.v3.cli.task_concurrency.PENDING_STATUSES server.web.routes.v3.cli.task_concurrency.ERROR_STATUSES server.web.routes.v3.cli.task_concurrency.DONE_STATUS server.web.routes.v3.cli.task_concurrency.GROUP_BY_STATUS server.web.routes.v3.cli.task_concurrency.TIMELINE_CATEGORIES Functions --------- .. autoapisummary:: server.web.routes.v3.cli.task_concurrency.get_task_concurrency server.web.routes.v3.cli.task_concurrency.get_task_status_audit server.web.routes.v3.cli.task_concurrency.get_workflow_task_templates server.web.routes.v3.cli.task_concurrency.get_template_timeline Module Contents --------------- .. py:data:: logger .. py:data:: PENDING_STATUSES .. py:data:: ERROR_STATUSES .. py:data:: DONE_STATUS :value: 'D' .. py:data:: GROUP_BY_STATUS :value: 'status' .. py:function:: get_task_concurrency(workflow_id: int, start_time: Optional[datetime.datetime] = Query(None, description='Start of time range'), end_time: Optional[datetime.datetime] = Query(None, description='End of time range'), group_by: str = Query(GROUP_BY_STATUS, description='Grouping strategy for the series (only "status" supported)'), bucket_seconds: int = Query(None, ge=1, le=3600, description='Bucket size in seconds (overrides bucket_minutes)'), bucket_minutes: int = Query(1, ge=1, le=60, description='Bucket size in minutes'), task_template_name: Optional[str] = Query(None, description='Filter by task template name'), db: sqlalchemy.orm.Session = Depends(get_db)) -> jobmon.server.web.schemas.task_concurrency.TaskConcurrencyResponse :async: Get concurrent task counts over time, grouped by status. Returns time-bucketed counts for timeseries visualization. Uses task_status_audit table with ix_task_status_audit_workflow_time index. Groups by status category (PENDING, LAUNCHED, RUNNING, ERROR, DONE). The only supported grouping strategy is "status". Note: DONE and ERROR show tasks that entered those states within each bucket (not cumulative or interval overlap). The response includes: - buckets: List of time bucket start times - series: Dict mapping status names to lists of concurrent counts - template_breakdown: Dict mapping status to list of {template: count} per bucket Example response: { "buckets": ["2024-01-01T00:00:00", "2024-01-01T00:01:00", ...], "series": { "PENDING": [5, 8, 10, 7, ...], "LAUNCHED": [3, 5, 4, 2, ...], "RUNNING": [2, 3, 4, 3, ...], "ERROR": [0, 1, 0, 2, ...], // error events within each bucket "DONE": [0, 2, 5, 3, ...] // tasks completed within each bucket }, "template_breakdown": { "RUNNING": [{"template_a": 2, "template_b": 1}, ...], "ERROR": [{"template_a": 0, "template_b": 1}, ...], "DONE": [{"template_a": 0, "template_b": 2}, ...] } } .. py:function:: get_task_status_audit(workflow_id: int, task_id: Optional[int] = Query(None, description='Filter by specific task ID'), limit: int = Query(100, ge=1, le=1000, description='Maximum records to return'), db: sqlalchemy.orm.Session = Depends(get_db)) -> jobmon.server.web.schemas.task_concurrency.TaskStatusAuditResponse :async: Get task status audit records for a workflow. Returns the audit trail showing all task status transitions. Useful for debugging and understanding task lifecycle. :param workflow_id: The workflow ID to get audit records for :param task_id: Optional filter for a specific task :param limit: Maximum number of records to return :returns: List of audit records with task_id, previous_status, new_status, timestamps .. py:function:: get_workflow_task_templates(workflow_id: int, db: sqlalchemy.orm.Session = Depends(get_db)) -> jobmon.server.web.schemas.task_concurrency.WorkflowTaskTemplatesResponse :async: Get list of task template names used in a workflow. Returns distinct task template names for tasks in this workflow. Used for filtering the concurrency view by template. :param workflow_id: The workflow ID to get templates for :returns: List of distinct task template names, sorted alphabetically .. py:data:: TIMELINE_CATEGORIES :value: ['REGISTERED', 'PENDING', 'LAUNCHED', 'RUNNING', 'ERROR', 'DONE'] .. py:function:: get_template_timeline(workflow_id: int, db: sqlalchemy.orm.Session = Depends(get_db)) -> jobmon.server.web.schemas.task_concurrency.TemplateTimelineResponse :async: Get template execution timeline for a workflow. Returns per-template event-driven time series: at every status-transition timestamp the exact task count per status category is recorded. No bucketing — the frontend renders a continuous stacked area chart from the raw events.