multiprocess.multiproc_distributor ================================== .. py:module:: multiprocess.multiproc_distributor .. autoapi-nested-parse:: Multiprocess executes tasks in parallel if multiple threads are available. Attributes ---------- .. autoapisummary:: multiprocess.multiproc_distributor.logger Classes ------- .. autoapisummary:: multiprocess.multiproc_distributor.PickableTask multiprocess.multiproc_distributor.Consumer multiprocess.multiproc_distributor.MultiprocessDistributor multiprocess.multiproc_distributor.MultiprocessWorkerNode Module Contents --------------- .. py:data:: logger .. py:class:: PickableTask(distributor_id: str, command: str, task_type: str = 'array') Object passed between processes. .. py:attribute:: distributor_id .. py:attribute:: command .. py:attribute:: task_type :value: 'array' .. py:class:: Consumer(task_queue: multiprocessing.JoinableQueue, response_queue: multiprocessing.Queue) Bases: :py:obj:`multiprocessing.Process` Consumes the tasks to be run. .. py:attribute:: task_queue :type: multiprocessing.JoinableQueue[Optional[PickableTask]] .. py:attribute:: response_queue :type: multiprocessing.Queue[Tuple[str, Optional[int]]] .. py:method:: run() -> None Wait for work, the execute it. .. py:class:: MultiprocessDistributor(cluster_name: str, parallelism: int = 3, *args: tuple, **kwargs: dict) Bases: :py:obj:`jobmon.core.cluster_protocol.ClusterDistributor` Executes tasks locally in parallel. It uses the multiprocessing Python library and queues to parallelize the execution of tasks. The subprocessing pattern looks like this: LocalExec --> consumer1 ----> subconsumer1 --> consumer2 ----> subconsumer2 ... --> consumerN ----> subconsumerN .. py:attribute:: temp_dir :type: Optional[str] :value: None .. py:attribute:: started :value: False .. py:attribute:: _cluster_name .. py:attribute:: _worker_node_entry_point .. py:attribute:: _parallelism :value: 3 .. py:attribute:: _next_job_id :value: 1 .. py:attribute:: _running_or_submitted :type: Dict[str, Optional[int]] .. py:attribute:: task_queue :type: multiprocessing.JoinableQueue[Optional[PickableTask]] .. py:attribute:: response_queue :type: multiprocessing.Queue[Tuple[str, Optional[int]]] .. py:attribute:: consumers :type: List[Consumer] :value: [] .. py:property:: worker_node_entry_point :type: str Path to jobmon worker_node_entry_point. .. py:property:: cluster_name :type: str Return the name of the cluster type. .. py:method:: _get_subtask_id(distributor_id: int, array_step_id: int) -> str Get the subtask_id based on distributor_id and array_step_id. .. py:method:: start() -> None Fire up N task consuming processes using Multiprocessing. Number of consumers is controlled by parallelism. .. py:method:: stop() -> None Terminate consumers and call sync 1 final time. .. py:method:: _update_internal_states() -> None .. py:method:: terminate_task_instances(distributor_ids: List[str]) -> None Terminate task instances. Only terminate the task instances that are running, not going to kill the jobs that are actually still in a waiting or a transitioning state. :param distributor_ids: A list of distributor IDs. .. py:method:: get_submitted_or_running(distributor_ids: Optional[List[str]] = None) -> Set[str] Get tasks that are active. .. py:method:: submit_to_batch_distributor(command: str, name: str, requested_resources: Dict[str, Any]) -> str Submit the command on the cluster technology and return a distributor_id. The distributor_id can be used to identify the associated TaskInstance, terminate it, monitor for missingness, or collect usage statistics. If an exception is raised by this method the task instance will move to "W" state and the exception will be logged in the database under the task_instance_error_log table. :param command: command to be run :param name: name of task :param requested_resources: resource requests sent to distributor API :returns: A tuple indicating the distributor id, the full output file location, and full error location. .. py:method:: submit_array_to_batch_distributor(command: str, name: str, requested_resources: Dict[str, Any], array_length: int) -> Dict[int, str] Submit an array task to the multiprocess cluster. Return: a mapping of array_step_id to distributor_id, output path, and error path .. py:method:: get_queueing_errors(distributor_ids: List[str]) -> Dict[str, str] Get the task instances that have errored out. .. py:method:: get_remote_exit_info(distributor_id: str) -> Tuple[str, str] Get the exit info about the task instance once it is done running. .. py:class:: MultiprocessWorkerNode Bases: :py:obj:`jobmon.core.cluster_protocol.ClusterWorkerNode` Task instance info for an instance run with the Multiprocessing distributor. .. py:attribute:: _distributor_id :type: Optional[str] :value: None .. py:attribute:: _array_step_id :type: Optional[int] :value: None .. py:attribute:: _subtask_id :type: Optional[str] :value: None .. py:attribute:: _logfile_template .. py:property:: distributor_id :type: Optional[str] The id from the distributor. .. py:method:: get_exit_info(exit_code: int, error_msg: str) -> Tuple[str, str] Exit code and message. .. py:method:: get_usage_stats() -> Dict Usage information specific to the distributor. .. py:method:: initialize_logfile(log_type: str, log_dir: str, name: str) -> str Error and exit code info from the executor. .. py:property:: array_step_id :type: Optional[int] Return array_step_id .