client.swarm.services.synchronizer
Synchronizer: State synchronization between local state and server.
This service manages synchronization of workflow run state with the Jobmon server, including task status updates and concurrency limit refreshes.
Attributes
Classes
Keeps local workflow run state aligned with the server. |
Module Contents
- client.swarm.services.synchronizer.logger
- class client.swarm.services.synchronizer.Synchronizer(gateway: jobmon.client.swarm.gateway.ServerGateway, task_ids: set[int], array_ids: set[int])
Keeps local workflow run state aligned with the server.
The Synchronizer is responsible for:
Requesting server to triage overdue task instances
Fetching task status updates (full or incremental)
Synchronizing workflow-level concurrency limits
Synchronizing per-array concurrency limits
All operations are performed in parallel for better throughput.
Example:
sync = Synchronizer( gateway=gateway, task_ids=set(tasks.keys()), array_ids=set(arrays.keys()), ) # Single sync tick update = await sync.tick(full_sync=False, last_sync=last_sync_time) state.apply_update(update) # Full sync (ignores last_sync time) update = await sync.tick(full_sync=True, last_sync=None) state.apply_update(update)
Initialize the synchronizer.
- Parameters:
gateway – ServerGateway for server communication.
task_ids – Set of known task IDs to filter status updates.
array_ids – Set of array IDs to sync concurrency limits for.
- update_task_ids(task_ids: set[int]) None
Update the set of known task IDs.
- Parameters:
task_ids – New set of task IDs.
- update_array_ids(array_ids: set[int]) None
Update the set of array IDs.
- Parameters:
array_ids – New set of array IDs.
- async tick(full_sync: bool = False, last_sync: datetime.datetime | None = None) jobmon.client.swarm.state.StateUpdate
Perform a synchronization tick with the server.
Runs all sync operations in parallel: 1. Request triage for overdue task instances 2. Fetch task status updates 3. Fetch workflow concurrency limit 4. Fetch array concurrency limits
- Parameters:
full_sync – If True, fetch all task statuses regardless of last_sync.
last_sync – Timestamp of last sync (for incremental updates). Ignored if full_sync is True.
- Returns:
StateUpdate containing all state changes to apply.
Note
Individual operation failures are logged but don’t stop other operations. The returned StateUpdate will contain whatever data was successfully fetched.
- async request_triage_only() None
Convenience method to only request triage without full sync.
This is useful when you only need to trigger the server’s triage logic without fetching state updates.
- async get_task_updates_only(full_sync: bool = False, last_sync: datetime.datetime | None = None) jobmon.client.swarm.state.StateUpdate
Convenience method to only fetch task status updates.
- Parameters:
full_sync – If True, fetch all statuses.
last_sync – Timestamp for incremental sync.
- Returns:
StateUpdate with task status changes.
- async get_concurrency_limits_only() jobmon.client.swarm.state.StateUpdate
Convenience method to only fetch concurrency limits.
- Returns:
StateUpdate with workflow and array concurrency limits.