client.swarm.gateway ==================== .. py:module:: client.swarm.gateway .. autoapi-nested-parse:: ServerGateway: Centralized HTTP communication with the Jobmon server. This module consolidates all HTTP requests for workflow run operations into a single class, providing: - Type-safe response objects - Consistent session management - Clear API boundaries for testing Attributes ---------- .. autoapisummary:: client.swarm.gateway.logger Classes ------- .. autoapisummary:: client.swarm.gateway.HeartbeatResponse client.swarm.gateway.StatusUpdateResponse client.swarm.gateway.QueueResponse client.swarm.gateway.TaskStatusUpdatesResponse client.swarm.gateway.WorkflowMetadata client.swarm.gateway.DownstreamTasksResponse client.swarm.gateway.ServerGateway Module Contents --------------- .. py:data:: logger .. py:class:: HeartbeatResponse Response from heartbeat logging. .. py:attribute:: status :type: str .. py:class:: StatusUpdateResponse Response from workflow run status update. .. py:attribute:: status :type: str .. py:class:: QueueResponse Response from task queueing operations. .. py:attribute:: tasks_by_status :type: dict[str, list[int]] .. py:class:: TaskStatusUpdatesResponse Response from task status synchronization. .. py:attribute:: time :type: datetime.datetime .. py:attribute:: tasks_by_status :type: dict[str, list[int]] .. py:class:: WorkflowMetadata Workflow metadata from the server. .. py:attribute:: workflow_id :type: int .. py:attribute:: dag_id :type: int .. py:attribute:: max_concurrently_running :type: int .. py:class:: DownstreamTasksResponse Response from downstream tasks query. .. py:attribute:: downstream_tasks :type: dict[int, tuple[int, list[int]]] .. py:class:: ServerGateway(requester: jobmon.core.requester.Requester, workflow_id: int, workflow_run_id: int) Centralized HTTP communication with the Jobmon server. This class consolidates all server communication for workflow run operations, providing a clean interface with typed responses. Usage: async with ServerGateway(requester, workflow_id, workflow_run_id) as gateway: response = await gateway.log_heartbeat(status="R", next_report_increment=30.0) Or without context manager (session managed externally): gateway = ServerGateway(requester, workflow_id, workflow_run_id) gateway.set_session(existing_session) response = await gateway.log_heartbeat(...) Initialize the gateway. :param requester: The Requester instance for HTTP communication. :param workflow_id: The workflow ID for workflow-level operations. :param workflow_run_id: The workflow run ID for run-level operations. .. py:attribute:: requester .. py:attribute:: workflow_id .. py:attribute:: workflow_run_id .. py:method:: set_session(session: aiohttp.ClientSession) -> None Set an external session (gateway will not close it). .. py:method:: close() -> None :async: Close the session if we own it. .. py:method:: log_heartbeat(status: str, next_report_increment: float) -> HeartbeatResponse :async: Log a heartbeat to the server. :param status: Current workflow run status. :param next_report_increment: Seconds until next expected heartbeat. :returns: HeartbeatResponse with potentially updated status. .. py:method:: update_status(status: str) -> StatusUpdateResponse :async: Update the workflow run status. :param status: The new status to set. :returns: StatusUpdateResponse with the actual new status. .. py:method:: terminate_task_instances() -> None :async: Signal the server to terminate all task instances for this workflow run. .. py:method:: request_triage() -> None :async: Request server to triage overdue task instances. .. py:method:: get_task_status_updates(since: Optional[datetime.datetime] = None) -> TaskStatusUpdatesResponse :async: Fetch task status changes from the server. :param since: If provided, only get updates since this time. If None, get full status snapshot. :returns: TaskStatusUpdatesResponse with time and status updates. .. py:method:: get_workflow_concurrency() -> int :async: Get the workflow-level max_concurrently_running limit. :returns: The maximum number of tasks that can run concurrently. .. py:method:: get_array_concurrency(array_id: int) -> int :async: Get the array-level max_concurrently_running limit. :param array_id: The array to query. :returns: The maximum number of tasks that can run concurrently in this array. .. py:method:: queue_task_batch(array_id: int, task_ids: list[int], task_resources_id: int, cluster_id: int) -> QueueResponse :async: Queue a batch of tasks for execution. :param array_id: The array containing the tasks. :param task_ids: List of task IDs to queue. :param task_resources_id: The bound task resources ID. :param cluster_id: The cluster to run on. :returns: QueueResponse with task status updates. .. py:method:: get_workflow_metadata() -> WorkflowMetadata :async: Fetch workflow metadata from the server. :returns: WorkflowMetadata with dag_id and max_concurrently_running. :raises ValueError: If no workflow found for the given ID. .. py:method:: get_tasks(max_task_id: int = 0, chunk_size: int = 500) -> dict[int, Any] :async: Fetch tasks that need to be run from the database. :param max_task_id: Starting point for pagination (exclusive). :param chunk_size: Number of tasks to fetch per request. :returns: Dictionary mapping task_id to task metadata. .. py:method:: get_downstream_tasks(task_ids: list[int], dag_id: int) -> DownstreamTasksResponse :async: Fetch downstream edges for a batch of tasks. :param task_ids: Task IDs to query edges for. :param dag_id: The DAG ID for this workflow. :returns: DownstreamTasksResponse with edge information. .. py:method:: get_server_time() -> datetime.datetime :async: Get the current time from the server. :returns: The server's current datetime. .. py:method:: log_heartbeat_sync(status: str, next_report_increment: float) -> HeartbeatResponse Synchronous heartbeat logging (uses sync Requester). :param status: Current workflow run status. :param next_report_increment: Seconds until next expected heartbeat. :returns: HeartbeatResponse with potentially updated status. .. py:method:: update_status_sync(status: str) -> StatusUpdateResponse Synchronous status update (uses sync Requester). :param status: The new status to set. :returns: StatusUpdateResponse with the actual new status. .. py:method:: get_server_time_sync() -> datetime.datetime Get the current time from the server (synchronous). :returns: The server's current datetime.