client.swarm.gateway
ServerGateway: Centralized HTTP communication with the Jobmon server.
This module consolidates all HTTP requests for workflow run operations into a single class, providing: - Type-safe response objects - Consistent session management - Clear API boundaries for testing
Attributes
Classes
Response from heartbeat logging. |
|
Response from workflow run status update. |
|
Response from task queueing operations. |
|
Response from task status synchronization. |
|
Workflow metadata from the server. |
|
Response from downstream tasks query. |
|
Centralized HTTP communication with the Jobmon server. |
Module Contents
- client.swarm.gateway.logger
- class client.swarm.gateway.StatusUpdateResponse
Response from workflow run status update.
- class client.swarm.gateway.QueueResponse
Response from task queueing operations.
- class client.swarm.gateway.TaskStatusUpdatesResponse
Response from task status synchronization.
- time: datetime.datetime
- class client.swarm.gateway.WorkflowMetadata
Workflow metadata from the server.
- class client.swarm.gateway.DownstreamTasksResponse
Response from downstream tasks query.
- class client.swarm.gateway.ServerGateway(requester: jobmon.core.requester.Requester, workflow_id: int, workflow_run_id: int)
Centralized HTTP communication with the Jobmon server.
This class consolidates all server communication for workflow run operations, providing a clean interface with typed responses.
- Usage:
- async with ServerGateway(requester, workflow_id, workflow_run_id) as gateway:
response = await gateway.log_heartbeat(status=”R”, next_report_increment=30.0)
- Or without context manager (session managed externally):
gateway = ServerGateway(requester, workflow_id, workflow_run_id) gateway.set_session(existing_session) response = await gateway.log_heartbeat(…)
Initialize the gateway.
- Parameters:
requester – The Requester instance for HTTP communication.
workflow_id – The workflow ID for workflow-level operations.
workflow_run_id – The workflow run ID for run-level operations.
- requester
- workflow_id
- workflow_run_id
- set_session(session: aiohttp.ClientSession) None
Set an external session (gateway will not close it).
- async log_heartbeat(status: str, next_report_increment: float) HeartbeatResponse
Log a heartbeat to the server.
- Parameters:
status – Current workflow run status.
next_report_increment – Seconds until next expected heartbeat.
- Returns:
HeartbeatResponse with potentially updated status.
- async update_status(status: str) StatusUpdateResponse
Update the workflow run status.
- Parameters:
status – The new status to set.
- Returns:
StatusUpdateResponse with the actual new status.
- async terminate_task_instances() None
Signal the server to terminate all task instances for this workflow run.
- async get_task_status_updates(since: datetime.datetime | None = None) TaskStatusUpdatesResponse
Fetch task status changes from the server.
- Parameters:
since – If provided, only get updates since this time. If None, get full status snapshot.
- Returns:
TaskStatusUpdatesResponse with time and status updates.
- async get_workflow_concurrency() int
Get the workflow-level max_concurrently_running limit.
- Returns:
The maximum number of tasks that can run concurrently.
- async get_array_concurrency(array_id: int) int
Get the array-level max_concurrently_running limit.
- Parameters:
array_id – The array to query.
- Returns:
The maximum number of tasks that can run concurrently in this array.
- async queue_task_batch(array_id: int, task_ids: list[int], task_resources_id: int, cluster_id: int) QueueResponse
Queue a batch of tasks for execution.
- Parameters:
array_id – The array containing the tasks.
task_ids – List of task IDs to queue.
task_resources_id – The bound task resources ID.
cluster_id – The cluster to run on.
- Returns:
QueueResponse with task status updates.
- async get_workflow_metadata() WorkflowMetadata
Fetch workflow metadata from the server.
- Returns:
WorkflowMetadata with dag_id and max_concurrently_running.
- Raises:
ValueError – If no workflow found for the given ID.
- async get_tasks(max_task_id: int = 0, chunk_size: int = 500) dict[int, Any]
Fetch tasks that need to be run from the database.
- Parameters:
max_task_id – Starting point for pagination (exclusive).
chunk_size – Number of tasks to fetch per request.
- Returns:
Dictionary mapping task_id to task metadata.
- async get_downstream_tasks(task_ids: list[int], dag_id: int) DownstreamTasksResponse
Fetch downstream edges for a batch of tasks.
- Parameters:
task_ids – Task IDs to query edges for.
dag_id – The DAG ID for this workflow.
- Returns:
DownstreamTasksResponse with edge information.
- async get_server_time() datetime.datetime
Get the current time from the server.
- Returns:
The server’s current datetime.
- log_heartbeat_sync(status: str, next_report_increment: float) HeartbeatResponse
Synchronous heartbeat logging (uses sync Requester).
- Parameters:
status – Current workflow run status.
next_report_increment – Seconds until next expected heartbeat.
- Returns:
HeartbeatResponse with potentially updated status.
- update_status_sync(status: str) StatusUpdateResponse
Synchronous status update (uses sync Requester).
- Parameters:
status – The new status to set.
- Returns:
StatusUpdateResponse with the actual new status.
- get_server_time_sync() datetime.datetime
Get the current time from the server (synchronous).
- Returns:
The server’s current datetime.