multiprocess.multiproc_distributor

Multiprocess executes tasks in parallel if multiple threads are available.

Attributes

logger

Classes

PickableTask

Object passed between processes.

Consumer

Consumes the tasks to be run.

MultiprocessDistributor

Executes tasks locally in parallel.

MultiprocessWorkerNode

Task instance info for an instance run with the Multiprocessing distributor.

Module Contents

multiprocess.multiproc_distributor.logger
class multiprocess.multiproc_distributor.PickableTask(distributor_id: str, command: str, task_type: str = 'array')

Object passed between processes.

Initialization of PickableTask.

array_step_id: is only meaningful and has int value when for array

distributor_id
command
task_type = 'array'
class multiprocess.multiproc_distributor.Consumer(task_queue: multiprocessing.JoinableQueue, response_queue: multiprocessing.Queue)

Bases: multiprocessing.Process

Consumes the tasks to be run.

Consume work sent from LocalExecutor through multiprocessing queue.

this class is structured based on https://pymotw.com/2/multiprocessing/communication.html

task_queue:

a (multiprocessing.JoinableQueue[Optional[PickableTask]]) object created by LocalExecutor used to retrieve work from the distributor.

response_queue:

A (Queue[Tuple[int, Optional[int], Optional[int]]]) object, that will hold information with Queue: Tuple[distributor_id, array_step_id if applicable, pid]

task_queue: multiprocessing.JoinableQueue[PickableTask | None]
response_queue: multiprocessing.Queue[Tuple[str, int | None]]
run() None

Wait for work, the execute it.

class multiprocess.multiproc_distributor.MultiprocessDistributor(cluster_name: str, parallelism: int = 3, *args: tuple, **kwargs: dict)

Bases: jobmon.core.cluster_protocol.ClusterDistributor

Executes tasks locally in parallel.

It uses the multiprocessing Python library and queues to parallelize the execution of tasks. The subprocessing pattern looks like this:

LocalExec
--> consumer1
----> subconsumer1
--> consumer2
----> subconsumer2
...
--> consumerN
----> subconsumerN

Initialization of the multiprocess distributor.

Parameters:
  • cluster_name – the name of the cluster.

  • parallelism (int, optional) – how many parallel jobs to distribute at a time

temp_dir: str | None = None
started = False
task_queue: multiprocessing.JoinableQueue[PickableTask | None]
response_queue: multiprocessing.Queue[Tuple[str, int | None]]
consumers: List[Consumer] = []
property worker_node_entry_point: str

Path to jobmon worker_node_entry_point.

property cluster_name: str

Return the name of the cluster type.

start() None

Fire up N task consuming processes using Multiprocessing.

Number of consumers is controlled by parallelism.

stop() None

Terminate consumers and call sync 1 final time.

terminate_task_instances(distributor_ids: List[str]) None

Terminate task instances.

Only terminate the task instances that are running, not going to kill the jobs that are actually still in a waiting or a transitioning state.

Parameters:

distributor_ids – A list of distributor IDs.

get_submitted_or_running(distributor_ids: List[str] | None = None) Set[str]

Get tasks that are active.

submit_to_batch_distributor(command: str, name: str, requested_resources: Dict[str, Any]) str

Submit the command on the cluster technology and return a distributor_id.

The distributor_id can be used to identify the associated TaskInstance, terminate it, monitor for missingness, or collect usage statistics. If an exception is raised by this method the task instance will move to “W” state and the exception will be logged in the database under the task_instance_error_log table.

Parameters:
  • command – command to be run

  • name – name of task

  • requested_resources – resource requests sent to distributor API

Returns:

A tuple indicating the distributor id, the full output file location, and full error location.

submit_array_to_batch_distributor(command: str, name: str, requested_resources: Dict[str, Any], array_length: int) Dict[int, str]

Submit an array task to the multiprocess cluster.

Return: a mapping of array_step_id to distributor_id, output path, and error path

get_queueing_errors(distributor_ids: List[str]) Dict[str, str]

Get the task instances that have errored out.

get_remote_exit_info(distributor_id: str) Tuple[str, str]

Get the exit info about the task instance once it is done running.

class multiprocess.multiproc_distributor.MultiprocessWorkerNode

Bases: jobmon.core.cluster_protocol.ClusterWorkerNode

Task instance info for an instance run with the Multiprocessing distributor.

Initialization of the multiprocess distributor worker node.

property distributor_id: str | None

The id from the distributor.

get_exit_info(exit_code: int, error_msg: str) Tuple[str, str]

Exit code and message.

get_usage_stats() Dict

Usage information specific to the distributor.

initialize_logfile(log_type: str, log_dir: str, name: str) str

Error and exit code info from the executor.

property array_step_id: int | None

Return array_step_id .