client.swarm.gateway

ServerGateway: Centralized HTTP communication with the Jobmon server.

This module consolidates all HTTP requests for workflow run operations into a single class, providing: - Type-safe response objects - Consistent session management - Clear API boundaries for testing

Attributes

logger

Classes

HeartbeatResponse

Response from heartbeat logging.

StatusUpdateResponse

Response from workflow run status update.

QueueResponse

Response from task queueing operations.

TaskStatusUpdatesResponse

Response from task status synchronization.

WorkflowMetadata

Workflow metadata from the server.

DownstreamTasksResponse

Response from downstream tasks query.

ServerGateway

Centralized HTTP communication with the Jobmon server.

Module Contents

client.swarm.gateway.logger
class client.swarm.gateway.HeartbeatResponse

Response from heartbeat logging.

status: str
class client.swarm.gateway.StatusUpdateResponse

Response from workflow run status update.

status: str
class client.swarm.gateway.QueueResponse

Response from task queueing operations.

tasks_by_status: dict[str, list[int]]
class client.swarm.gateway.TaskStatusUpdatesResponse

Response from task status synchronization.

time: datetime.datetime
tasks_by_status: dict[str, list[int]]
class client.swarm.gateway.WorkflowMetadata

Workflow metadata from the server.

workflow_id: int
dag_id: int
max_concurrently_running: int
class client.swarm.gateway.DownstreamTasksResponse

Response from downstream tasks query.

downstream_tasks: dict[int, tuple[int, list[int]]]
class client.swarm.gateway.ServerGateway(requester: jobmon.core.requester.Requester, workflow_id: int, workflow_run_id: int)

Centralized HTTP communication with the Jobmon server.

This class consolidates all server communication for workflow run operations, providing a clean interface with typed responses.

Usage:
async with ServerGateway(requester, workflow_id, workflow_run_id) as gateway:

response = await gateway.log_heartbeat(status=”R”, next_report_increment=30.0)

Or without context manager (session managed externally):

gateway = ServerGateway(requester, workflow_id, workflow_run_id) gateway.set_session(existing_session) response = await gateway.log_heartbeat(…)

Initialize the gateway.

Parameters:
  • requester – The Requester instance for HTTP communication.

  • workflow_id – The workflow ID for workflow-level operations.

  • workflow_run_id – The workflow run ID for run-level operations.

requester
workflow_id
workflow_run_id
set_session(session: aiohttp.ClientSession) None

Set an external session (gateway will not close it).

async close() None

Close the session if we own it.

async log_heartbeat(status: str, next_report_increment: float) HeartbeatResponse

Log a heartbeat to the server.

Parameters:
  • status – Current workflow run status.

  • next_report_increment – Seconds until next expected heartbeat.

Returns:

HeartbeatResponse with potentially updated status.

async update_status(status: str) StatusUpdateResponse

Update the workflow run status.

Parameters:

status – The new status to set.

Returns:

StatusUpdateResponse with the actual new status.

async terminate_task_instances() None

Signal the server to terminate all task instances for this workflow run.

async request_triage() None

Request server to triage overdue task instances.

async get_task_status_updates(since: datetime.datetime | None = None) TaskStatusUpdatesResponse

Fetch task status changes from the server.

Parameters:

since – If provided, only get updates since this time. If None, get full status snapshot.

Returns:

TaskStatusUpdatesResponse with time and status updates.

async get_workflow_concurrency() int

Get the workflow-level max_concurrently_running limit.

Returns:

The maximum number of tasks that can run concurrently.

async get_array_concurrency(array_id: int) int

Get the array-level max_concurrently_running limit.

Parameters:

array_id – The array to query.

Returns:

The maximum number of tasks that can run concurrently in this array.

async queue_task_batch(array_id: int, task_ids: list[int], task_resources_id: int, cluster_id: int) QueueResponse

Queue a batch of tasks for execution.

Parameters:
  • array_id – The array containing the tasks.

  • task_ids – List of task IDs to queue.

  • task_resources_id – The bound task resources ID.

  • cluster_id – The cluster to run on.

Returns:

QueueResponse with task status updates.

async get_workflow_metadata() WorkflowMetadata

Fetch workflow metadata from the server.

Returns:

WorkflowMetadata with dag_id and max_concurrently_running.

Raises:

ValueError – If no workflow found for the given ID.

async get_tasks(max_task_id: int = 0, chunk_size: int = 500) dict[int, Any]

Fetch tasks that need to be run from the database.

Parameters:
  • max_task_id – Starting point for pagination (exclusive).

  • chunk_size – Number of tasks to fetch per request.

Returns:

Dictionary mapping task_id to task metadata.

async get_downstream_tasks(task_ids: list[int], dag_id: int) DownstreamTasksResponse

Fetch downstream edges for a batch of tasks.

Parameters:
  • task_ids – Task IDs to query edges for.

  • dag_id – The DAG ID for this workflow.

Returns:

DownstreamTasksResponse with edge information.

async get_server_time() datetime.datetime

Get the current time from the server.

Returns:

The server’s current datetime.

log_heartbeat_sync(status: str, next_report_increment: float) HeartbeatResponse

Synchronous heartbeat logging (uses sync Requester).

Parameters:
  • status – Current workflow run status.

  • next_report_increment – Seconds until next expected heartbeat.

Returns:

HeartbeatResponse with potentially updated status.

update_status_sync(status: str) StatusUpdateResponse

Synchronous status update (uses sync Requester).

Parameters:

status – The new status to set.

Returns:

StatusUpdateResponse with the actual new status.

get_server_time_sync() datetime.datetime

Get the current time from the server (synchronous).

Returns:

The server’s current datetime.