client.swarm.services.synchronizer ================================== .. py:module:: client.swarm.services.synchronizer .. autoapi-nested-parse:: Synchronizer: State synchronization between local state and server. This service manages synchronization of workflow run state with the Jobmon server, including task status updates and concurrency limit refreshes. Attributes ---------- .. autoapisummary:: client.swarm.services.synchronizer.logger Classes ------- .. autoapisummary:: client.swarm.services.synchronizer.Synchronizer Module Contents --------------- .. py:data:: logger .. py:class:: Synchronizer(gateway: jobmon.client.swarm.gateway.ServerGateway, task_ids: set[int], array_ids: set[int]) Keeps local workflow run state aligned with the server. The Synchronizer is responsible for: - Requesting server to triage overdue task instances - Fetching task status updates (full or incremental) - Synchronizing workflow-level concurrency limits - Synchronizing per-array concurrency limits All operations are performed in parallel for better throughput. Example:: sync = Synchronizer( gateway=gateway, task_ids=set(tasks.keys()), array_ids=set(arrays.keys()), ) # Single sync tick update = await sync.tick(full_sync=False, last_sync=last_sync_time) state.apply_update(update) # Full sync (ignores last_sync time) update = await sync.tick(full_sync=True, last_sync=None) state.apply_update(update) Initialize the synchronizer. :param gateway: ServerGateway for server communication. :param task_ids: Set of known task IDs to filter status updates. :param array_ids: Set of array IDs to sync concurrency limits for. .. py:property:: task_ids :type: set[int] Set of task IDs this synchronizer knows about. .. py:property:: array_ids :type: set[int] Set of array IDs to sync concurrency limits for. .. py:method:: update_task_ids(task_ids: set[int]) -> None Update the set of known task IDs. :param task_ids: New set of task IDs. .. py:method:: update_array_ids(array_ids: set[int]) -> None Update the set of array IDs. :param array_ids: New set of array IDs. .. py:method:: tick(full_sync: bool = False, last_sync: Optional[datetime.datetime] = None) -> jobmon.client.swarm.state.StateUpdate :async: Perform a synchronization tick with the server. Runs all sync operations in parallel: 1. Request triage for overdue task instances 2. Fetch task status updates 3. Fetch workflow concurrency limit 4. Fetch array concurrency limits :param full_sync: If True, fetch all task statuses regardless of last_sync. :param last_sync: Timestamp of last sync (for incremental updates). Ignored if full_sync is True. :returns: StateUpdate containing all state changes to apply. .. note:: Individual operation failures are logged but don't stop other operations. The returned StateUpdate will contain whatever data was successfully fetched. .. py:method:: request_triage_only() -> None :async: Convenience method to only request triage without full sync. This is useful when you only need to trigger the server's triage logic without fetching state updates. .. py:method:: get_task_updates_only(full_sync: bool = False, last_sync: Optional[datetime.datetime] = None) -> jobmon.client.swarm.state.StateUpdate :async: Convenience method to only fetch task status updates. :param full_sync: If True, fetch all statuses. :param last_sync: Timestamp for incremental sync. :returns: StateUpdate with task status changes. .. py:method:: get_concurrency_limits_only() -> jobmon.client.swarm.state.StateUpdate :async: Convenience method to only fetch concurrency limits. :returns: StateUpdate with workflow and array concurrency limits.