client.swarm.services.heartbeat =============================== .. py:module:: client.swarm.services.heartbeat .. autoapi-nested-parse:: HeartbeatService: Background heartbeat management for workflow runs. This service manages periodic heartbeat logging to the Jobmon server, ensuring the workflow run is kept alive and receiving any status updates from the server (e.g., pause/resume signals). Attributes ---------- .. autoapisummary:: client.swarm.services.heartbeat.logger Classes ------- .. autoapisummary:: client.swarm.services.heartbeat.HeartbeatService Module Contents --------------- .. py:data:: logger .. py:class:: HeartbeatService(gateway: jobmon.client.swarm.gateway.ServerGateway, interval: float, report_by_buffer: float, initial_status: str) Background heartbeat management for workflow runs. The HeartbeatService is responsible for: - Logging periodic heartbeats to the server - Tracking the current workflow run status - Detecting status changes from heartbeat responses (e.g., pause signals) Example:: heartbeat = HeartbeatService( gateway=gateway, interval=30.0, report_by_buffer=1.5, initial_status="R", ) # Manual tick (for testing or explicit heartbeat) update = await heartbeat.tick() if update.workflow_run_status: state.apply_update(update) # Background loop (typical usage) stop_event = asyncio.Event() task = asyncio.create_task(heartbeat.run_background(stop_event)) # ... do work ... stop_event.set() await task Initialize the heartbeat service. :param gateway: ServerGateway for server communication. :param interval: Seconds between heartbeats. :param report_by_buffer: Multiplier for next_report_increment sent to server. Server expects heartbeat within (interval * report_by_buffer). :param initial_status: Starting workflow run status to report. .. py:property:: interval :type: float Heartbeat interval in seconds. .. py:property:: current_status :type: str Current status being reported in heartbeats. .. py:property:: last_heartbeat_time :type: float Timestamp of the last successful heartbeat. .. py:property:: next_report_increment :type: float Time window the server expects the next heartbeat within. .. py:method:: set_status(status: str) -> None Update the status that will be reported in heartbeats. :param status: New workflow run status to report. .. py:method:: time_since_last_heartbeat() -> float Seconds since the last heartbeat was logged (or since initialization). .. py:method:: is_heartbeat_due() -> bool Check if a heartbeat is due based on the interval. .. py:method:: tick() -> jobmon.client.swarm.state.StateUpdate :async: Log a heartbeat and return any status change. :returns: StateUpdate with workflow_run_status if status changed, otherwise an empty StateUpdate. :raises Exception: If the heartbeat request fails. .. py:method:: tick_sync() -> jobmon.client.swarm.state.StateUpdate Synchronous version of tick for non-async contexts. :returns: StateUpdate with workflow_run_status if status changed, otherwise an empty StateUpdate. :raises Exception: If the heartbeat request fails. .. py:method:: run_background(stop_event: asyncio.Event) -> None :async: Background task that logs heartbeats periodically. This method runs in a loop, checking if a heartbeat is due and logging it if necessary. It continues until the stop_event is set. :param stop_event: Event to signal when to stop the loop. .. note:: This method catches and logs exceptions but re-raises them to allow proper error handling by the caller. .. py:method:: reset_timer() -> None Reset the heartbeat timer to the current time. Useful when a heartbeat is logged externally (e.g., during sync).