client.swarm.services.heartbeat

HeartbeatService: Background heartbeat management for workflow runs.

This service manages periodic heartbeat logging to the Jobmon server, ensuring the workflow run is kept alive and receiving any status updates from the server (e.g., pause/resume signals).

Attributes

logger

Classes

HeartbeatService

Background heartbeat management for workflow runs.

Module Contents

client.swarm.services.heartbeat.logger
class client.swarm.services.heartbeat.HeartbeatService(gateway: jobmon.client.swarm.gateway.ServerGateway, interval: float, report_by_buffer: float, initial_status: str)

Background heartbeat management for workflow runs.

The HeartbeatService is responsible for:

  • Logging periodic heartbeats to the server

  • Tracking the current workflow run status

  • Detecting status changes from heartbeat responses (e.g., pause signals)

Example:

heartbeat = HeartbeatService(
    gateway=gateway,
    interval=30.0,
    report_by_buffer=1.5,
    initial_status="R",
)

# Manual tick (for testing or explicit heartbeat)
update = await heartbeat.tick()
if update.workflow_run_status:
    state.apply_update(update)

# Background loop (typical usage)
stop_event = asyncio.Event()
task = asyncio.create_task(heartbeat.run_background(stop_event))
# ... do work ...
stop_event.set()
await task

Initialize the heartbeat service.

Parameters:
  • gateway – ServerGateway for server communication.

  • interval – Seconds between heartbeats.

  • report_by_buffer – Multiplier for next_report_increment sent to server. Server expects heartbeat within (interval * report_by_buffer).

  • initial_status – Starting workflow run status to report.

property interval: float

Heartbeat interval in seconds.

property current_status: str

Current status being reported in heartbeats.

property last_heartbeat_time: float

Timestamp of the last successful heartbeat.

property next_report_increment: float

Time window the server expects the next heartbeat within.

set_status(status: str) None

Update the status that will be reported in heartbeats.

Parameters:

status – New workflow run status to report.

time_since_last_heartbeat() float

Seconds since the last heartbeat was logged (or since initialization).

is_heartbeat_due() bool

Check if a heartbeat is due based on the interval.

async tick() jobmon.client.swarm.state.StateUpdate

Log a heartbeat and return any status change.

Returns:

StateUpdate with workflow_run_status if status changed, otherwise an empty StateUpdate.

Raises:

Exception – If the heartbeat request fails.

tick_sync() jobmon.client.swarm.state.StateUpdate

Synchronous version of tick for non-async contexts.

Returns:

StateUpdate with workflow_run_status if status changed, otherwise an empty StateUpdate.

Raises:

Exception – If the heartbeat request fails.

async run_background(stop_event: asyncio.Event) None

Background task that logs heartbeats periodically.

This method runs in a loop, checking if a heartbeat is due and logging it if necessary. It continues until the stop_event is set.

Parameters:

stop_event – Event to signal when to stop the loop.

Note

This method catches and logs exceptions but re-raises them to allow proper error handling by the caller.

reset_timer() None

Reset the heartbeat timer to the current time.

Useful when a heartbeat is logged externally (e.g., during sync).