distributor.distributor_service =============================== .. py:module:: distributor.distributor_service Attributes ---------- .. autoapisummary:: distributor.distributor_service.logger Classes ------- .. autoapisummary:: distributor.distributor_service.DistributorService Module Contents --------------- .. py:data:: logger .. py:class:: DistributorService(cluster_interface: jobmon.core.cluster_protocol.ClusterDistributor, requester: Optional[jobmon.core.requester.Requester] = None, workflow_run_heartbeat_interval: Optional[int] = None, task_instance_heartbeat_interval: Optional[int] = None, heartbeat_report_by_buffer: Optional[float] = None, distributor_poll_interval: Optional[int] = None, raise_on_error: bool = False) .. py:attribute:: raise_on_error :value: False .. py:attribute:: _task_instances :type: Dict[int, jobmon.distributor.distributor_task_instance.DistributorTaskInstance] .. py:attribute:: _task_instance_batches :type: Dict[Tuple[int, int], jobmon.distributor.task_instance_batch.TaskInstanceBatch] .. py:attribute:: _distributor_commands :type: Iterator[jobmon.distributor.distributor_command.DistributorCommand] .. py:attribute:: _task_instance_status_map :type: Dict[str, Set[jobmon.distributor.distributor_task_instance.DistributorTaskInstance]] .. py:attribute:: _command_generator_map .. py:attribute:: _last_heartbeat_time .. py:attribute:: cluster_interface .. py:property:: _next_report_increment :type: float .. py:method:: set_workflow_run(workflow_run_id: int) -> None Set the workflow run for this distributor service. .. py:method:: run() -> None Main distributor run loop. .. py:method:: process_status(status: str, timeout: Union[int, float] = -1) -> None Processes commands until all work is done or timeout is reached. :param status: which status to process work for. :param timeout: time until we stop processing. -1 means process till no more work .. py:method:: instantiate_task_instances(task_instances: List[jobmon.distributor.distributor_task_instance.DistributorTaskInstance]) -> None .. py:method:: launch_task_instance_batch(task_instance_batch: jobmon.distributor.task_instance_batch.TaskInstanceBatch) -> None .. py:method:: launch_task_instance(task_instance: jobmon.distributor.distributor_task_instance.DistributorTaskInstance) -> None Submits a task instance on a given distributor. Adds the new task instance to self.submitted_or_running_task_instances. .. py:method:: triage_error(task_instance: jobmon.distributor.distributor_task_instance.DistributorTaskInstance) -> None Triage a running task instance that has missed a heartbeat. Allowed transitions are (R, U, Z, F) .. py:method:: kill_self_batch(task_instance_batch: jobmon.distributor.task_instance_batch.TaskInstanceBatch) -> None Terminate all TIs in this batch. :param task_instance_batch: The batch of task instances to terminate. .. py:method:: no_heartbeat_error(task_instance: jobmon.distributor.distributor_task_instance.DistributorTaskInstance) -> None Move a task instance in NO_HEARTBEAT state to a recoverable error state. This signal is sent from the swarm in the event a task instance in LAUNCHED state fails to log a heartbeat, either due to the distributor failing to log a heartbeat batch or due to the worker node failing to start up properly. ERROR state allows for a retry, so that a new task instance can attempt to run. .. py:method:: log_task_instance_report_by_date() -> None Log the heartbeat to show that the task instance is still alive. .. py:method:: _log_heartbeats(task_instance_batches: List[List[int]]) -> None :async: Create a task for each batch of task instances to send heartbeat. .. py:method:: _log_heartbeat_by_batch(session: aiohttp.ClientSession, task_instance_ids_to_heartbeat: List[int]) -> None :async: Send heartbeat for a batch of task instances using sophisticated retry logic. .. py:method:: _initialize_signal_handlers() -> None .. py:method:: refresh_status_from_db(status: str) -> None Got to DB to check the list tis status. .. py:method:: _check_queued_for_work() -> Generator[jobmon.distributor.distributor_command.DistributorCommand, None, None] .. py:method:: _check_instantiated_for_work() -> Generator[jobmon.distributor.distributor_command.DistributorCommand, None, None] .. py:method:: _check_triaging_for_work() -> Generator[jobmon.distributor.distributor_command.DistributorCommand, None, None] Handle TIs in TRIAGING state. For TaskInstances with TRIAGING status, check the nature of no heartbeat, and change the statuses accordingly. .. py:method:: _check_kill_self_for_work() -> Generator[jobmon.distributor.distributor_command.DistributorCommand, None, None] Handle TIs in KILL_SELF state, grouped by their TaskInstanceBatch. .. py:method:: _check_no_heartbeat_for_work() -> Generator[jobmon.distributor.distributor_command.DistributorCommand, None, None] Handle TIs in NO_HEARTBEAT state. For TaskInstances with NO_HEARTBEAT status, move to an error recoverable state