distributor.distributor_service

Attributes

logger

Classes

DistributorService

Module Contents

distributor.distributor_service.logger[source]
class distributor.distributor_service.DistributorService(cluster_interface: jobmon.core.cluster_protocol.ClusterDistributor, requester: jobmon.core.requester.Requester | None = None, workflow_run_heartbeat_interval: int | None = None, task_instance_heartbeat_interval: int | None = None, heartbeat_report_by_buffer: float | None = None, distributor_poll_interval: int | None = None, raise_on_error: bool = False)[source]
raise_on_error = False[source]
_task_instances: Dict[int, jobmon.distributor.distributor_task_instance.DistributorTaskInstance][source]
_task_instance_batches: Dict[Tuple[int, int], jobmon.distributor.task_instance_batch.TaskInstanceBatch][source]
_distributor_commands: Iterator[jobmon.distributor.distributor_command.DistributorCommand][source]
_task_instance_status_map: Dict[str, Set[jobmon.distributor.distributor_task_instance.DistributorTaskInstance]][source]
_command_generator_map[source]
_last_heartbeat_time[source]
cluster_interface[source]
property _next_report_increment: float[source]
set_workflow_run(workflow_run_id: int) None[source]

Set the workflow run for this distributor service.

run() None[source]

Main distributor run loop.

process_status(status: str, timeout: int | float = -1) None[source]

Processes commands until all work is done or timeout is reached.

Parameters:
  • status – which status to process work for.

  • timeout – time until we stop processing. -1 means process till no more work

instantiate_task_instances(task_instances: List[jobmon.distributor.distributor_task_instance.DistributorTaskInstance]) None[source]
launch_task_instance_batch(task_instance_batch: jobmon.distributor.task_instance_batch.TaskInstanceBatch) None[source]
launch_task_instance(task_instance: jobmon.distributor.distributor_task_instance.DistributorTaskInstance) None[source]

Submits a task instance on a given distributor.

Adds the new task instance to self.submitted_or_running_task_instances.

triage_error(task_instance: jobmon.distributor.distributor_task_instance.DistributorTaskInstance) None[source]

Triage a running task instance that has missed a heartbeat.

Allowed transitions are (R, U, Z, F)

kill_self_batch(task_instance_batch: jobmon.distributor.task_instance_batch.TaskInstanceBatch) None[source]

Terminate all TIs in this batch.

Parameters:

task_instance_batch – The batch of task instances to terminate.

no_heartbeat_error(task_instance: jobmon.distributor.distributor_task_instance.DistributorTaskInstance) None[source]

Move a task instance in NO_HEARTBEAT state to a recoverable error state.

This signal is sent from the swarm in the event a task instance in LAUNCHED state fails to log a heartbeat, either due to the distributor failing to log a heartbeat batch or due to the worker node failing to start up properly.

ERROR state allows for a retry, so that a new task instance can attempt to run.

log_task_instance_report_by_date() None[source]

Log the heartbeat to show that the task instance is still alive.

async _log_heartbeats(task_instance_batches: List[List[int]]) None[source]

Create a task for each batch of task instances to send heartbeat.

async _log_heartbeat_by_batch(session: aiohttp.ClientSession, task_instance_ids_to_heartbeat: List[int]) None[source]

Send heartbeat for a batch of task instances using sophisticated retry logic.

_initialize_signal_handlers() None[source]
refresh_status_from_db(status: str) None[source]

Got to DB to check the list tis status.

_check_queued_for_work() Generator[jobmon.distributor.distributor_command.DistributorCommand, None, None][source]
_check_instantiated_for_work() Generator[jobmon.distributor.distributor_command.DistributorCommand, None, None][source]
_check_triaging_for_work() Generator[jobmon.distributor.distributor_command.DistributorCommand, None, None][source]

Handle TIs in TRIAGING state.

For TaskInstances with TRIAGING status, check the nature of no heartbeat, and change the statuses accordingly.

_check_kill_self_for_work() Generator[jobmon.distributor.distributor_command.DistributorCommand, None, None][source]

Handle TIs in KILL_SELF state, grouped by their TaskInstanceBatch.

_check_no_heartbeat_for_work() Generator[jobmon.distributor.distributor_command.DistributorCommand, None, None][source]

Handle TIs in NO_HEARTBEAT state.

For TaskInstances with NO_HEARTBEAT status, move to an error recoverable state