core.cluster_protocol

Classes

ClusterQueue

The protocol class for queues on a cluster.

ClusterDistributor

The protocol class for cluster distributors.

ClusterWorkerNode

Base class defining interface for gathering executor info in the execution_wrapper.

Module Contents

class core.cluster_protocol.ClusterQueue(queue_id: int, queue_name: str, parameters: Dict)

Bases: Protocol

The protocol class for queues on a cluster.

Initialization of ClusterQueue.

abstract validate_resources(strict: bool = False, **kwargs: str | int | float) Tuple[bool, str]

Ensures that requested resources aren’t greater than what’s available.

abstract coerce_resources(**kwargs: str | int | float) Dict

Ensures that requested resources aren’t greater than what’s available.

property parameters: Dict
Abstractmethod:

Returns the dictionary of parameters.

property queue_name: str
Abstractmethod:

Returns the name of the queue.

property queue_id: int
Abstractmethod:

Returns the ID of the queue.

property required_resources: List
Abstractmethod:

Returns the list of resources that are required.

class core.cluster_protocol.ClusterDistributor(cluster_name: str, *args: str, **kwargs: str)

Bases: Protocol

The protocol class for cluster distributors.

Initialization of ClusterQueue.

property worker_node_entry_point: str
Abstractmethod:

Path to jobmon worker_node_entry_point.

property cluster_name: str
Abstractmethod:

Return the name of the cluster type.

abstract start() None

Start the distributor.

abstract stop() None

Stop the distributor.

abstract get_queueing_errors(distributor_ids: List[str]) Dict[str, str]

Get the task instances that have errored out.

abstract get_submitted_or_running(distributor_ids: List[str] | None = None) Set[str]

Check which task instances are active.

Returns: a set strings

abstract terminate_task_instances(distributor_ids: List[str]) None

Terminate task instances.

If implemented, return a list of (task_instance_id, hostname) tuples for any task_instances that are terminated.

abstract get_remote_exit_info(distributor_id: str) jobmon.core.exit_info.RemoteExitInfo | Tuple[str, str]

Get the exit info about the task instance once it is done running.

Returns:

RemoteExitInfo with error_state, error_message, and finalized flag. Legacy plugins may return a (error_state, error_message) tuple, which is treated as finalized=True.

abstract submit_to_batch_distributor(command: str, name: str, requested_resources: Dict[str, Any]) str

Submit the command on the cluster technology and return a distributor_id.

The distributor_id can be used to identify the associated TaskInstance, terminate it, monitor for missingness, or collect usage statistics. If an exception is raised by this method the task instance will move to “W” state and the exception will be logged in the database under the task_instance_error_log table.

Parameters:
  • command – command to be run

  • name – name of task

  • requested_resources – resource requests sent to distributor API

Returns:

A tuple indicating the distributor id, the full output file location, and full error location.

abstract submit_array_to_batch_distributor(command: str, name: str, requested_resources: Dict[str, Any], array_length: int) Dict[int, str]

Submit an array task to the underlying distributor and return a distributor_id.

The distributor ID represents the ID of the overall array job, sub-tasks will have their own associated IDs.

Parameters:
  • command – the array worker node command to run

  • name – name of the array

  • requested_resources – resources with which to run the array

  • array_length – how many tasks associated with the array

Returns:

a mapping of array_step_id to distributor_id, output location, and error location.

build_worker_node_command(task_instance_id: int | None = None, array_id: int | None = None, batch_number: int | None = None) str

Build a command that can be executed by the worker_node.

Parameters:
  • task_instance_id – id for the given instance of this task

  • array_id – id for the array if using an array strategy

  • batch_number – if array strategy is used, the submission counter index to use

Returns:

(str) unwrappable command

class core.cluster_protocol.ClusterWorkerNode

Bases: Protocol

Base class defining interface for gathering executor info in the execution_wrapper.

While not required, implementing get_usage_stats() will allow collection of CPU/memory utilization stats for each job.

Get exit info is used to determine the error type if the task hits a system error of some variety.

property distributor_id: str | None
Abstractmethod:

Executor specific id assigned to a task instance.

abstract get_usage_stats() Dict

Return resource usage with server-ready keys.

Expected keys (all stringified):

maxrss – peak resident set size in bytes cpu – total CPU seconds (user + system) usage_str – JSON blob of the full raw stats

abstract get_exit_info(exit_code: int, error_msg: str) Tuple[str, str]

Error and exit code info from the executor.

abstract initialize_logfile(log_type: str, log_dir: str, name: str) str

Error and exit code info from the executor.

property array_step_id: int | None
Abstractmethod:

The step id in each batch.

For each array task instance, array_id, array_batch_num, and array_step_id should uniquely identify a subtask_id.

It depends on the plug in whether you can generate the subtask_id using array_step_id.