"""The client for the Multiprocess executor."""
from typing import Dict, List, Tuple, Union
from jobmon.core.cluster_protocol import ClusterQueue
[docs]
class MultiprocessQueue(ClusterQueue):
"""Implementation of the multiprocess executor queue, derived from ClusterQueue."""
def __init__(self, queue_id: int, queue_name: str, parameters: Dict) -> None:
"""Intialization of the multiprocess queue.
Get the limits from the database in the client.
"""
[docs]
self._queue_id = queue_id
[docs]
self._queue_name = queue_name
[docs]
self._parameters = parameters
[docs]
def validate_resources(
self, strict: bool = False, **kwargs: Union[str, int, float]
) -> Tuple[bool, str]:
"""Ensure cores requested isn't more than available on that node."""
is_valid = True
msg = ""
cores = kwargs.get("cores")
core_parameters = self.parameters.get("cores")
if core_parameters:
min_cores, max_cores = core_parameters
else:
raise ValueError("min_cores and max_cores parameters not set on queue.")
if cores:
if cores > max_cores:
msg += (
f"ResourceError: provided cores {cores} exceeds "
f"queue limit of {max_cores} "
f"for queue {self.queue_name}"
)
cores = max_cores
if strict:
is_valid = False
elif cores < min_cores:
msg += (
f"ResourceError: provided cores {cores} is below "
f"queue minimum of {min_cores} "
f"for queue {self.queue_name}"
)
cores = min_cores
if strict:
is_valid = False
else:
# Set cores to the queue minimum
msg += f"Cores not provided, setting to {self.queue_name} minimum of {min_cores}"
cores = min_cores
if strict:
is_valid = False
return is_valid, msg
[docs]
def coerce_resources(self, **kwargs: Union[str, int, float]) -> Dict:
return kwargs
@property
[docs]
def queue_id(self) -> int:
"""Return the ID of the queue."""
return self._queue_id
@property
[docs]
def queue_name(self) -> str:
"""Return the name of the queue."""
return self._queue_name
@property
[docs]
def parameters(self) -> Dict:
"""Return the dictionary of parameters."""
return self._parameters
@property
[docs]
def required_resources(self) -> List:
"""No required resources specified for dummy executor, return empty list."""
return []