Source code for sequential.seq_distributor

"""Sequential distributor that runs one task at a time."""

import logging
import os
import shutil
from collections import OrderedDict
from typing import Any, Dict, List, Optional, Set, Tuple, Union

from jobmon.core.cluster_protocol import ClusterDistributor, ClusterWorkerNode
from jobmon.core.constants import TaskInstanceStatus
from jobmon.core.exceptions import RemoteExitInfoNotAvailable, ReturnCodes
from jobmon.worker_node.cli import WorkerNodeCLI

[docs] logger = logging.getLogger(__name__)
[docs] class LimitedSizeDict(OrderedDict): """Dictionary for exit info.""" def __init__(self, *args: int, **kwds: int) -> None: """Initialization of LimitedSizeDict."""
[docs] self.size_limit = kwds.pop("size_limit", None)
OrderedDict.__init__(self, *args, **kwds) self._check_size_limit()
[docs] def __setitem__(self, key: Any, value: Any) -> None: """Set item in dict.""" OrderedDict.__setitem__(self, key, value) self._check_size_limit()
[docs] def _check_size_limit(self) -> None: if self.size_limit is not None: while len(self) > self.size_limit: self.popitem(last=False)
[docs] class SequentialDistributor(ClusterDistributor): """Executor to run tasks one at a time.""" def __init__( self, cluster_name: str, exit_info_queue_size: int = 1000, *args: tuple, **kwargs: dict, ) -> None: """Initialization of the sequential distributor. Args: cluster_name (str): name of the cluster exit_info_queue_size (int): how many exit codes to retain """
[docs] self.started = False
[docs] self._cluster_name = cluster_name
worker_node_entry_point = shutil.which("worker_node_entry_point") if not worker_node_entry_point: raise ValueError("worker_node_entry_point can't be found.")
[docs] self._worker_node_entry_point = worker_node_entry_point
[docs] self._next_distributor_id = 1
[docs] self._exit_info = LimitedSizeDict(size_limit=exit_info_queue_size)
@property
[docs] def worker_node_entry_point(self) -> str: """Path to jobmon worker_node_entry_point.""" return self._worker_node_entry_point
@property
[docs] def cluster_name(self) -> str: """Return the name of the cluster type.""" return self._cluster_name
[docs] def start(self) -> None: """Start the distributor.""" self.started = True
[docs] def stop(self) -> None: """Stop the distributor.""" self.started = False
[docs] def get_queueing_errors(self, distributor_ids: List[str]) -> Dict[str, str]: """Get the task instances that have errored out.""" raise NotImplementedError
[docs] def get_array_queueing_errors( self, distributor_id: Union[int, str] ) -> Dict[Union[int, str], str]: raise NotImplementedError
[docs] def get_remote_exit_info(self, distributor_id: str) -> Tuple[str, str]: """Get exit info from task instances that have run.""" try: exit_code = self._exit_info[distributor_id] if exit_code == 199: msg = "job was in kill self state" return TaskInstanceStatus.UNKNOWN_ERROR, msg else: return TaskInstanceStatus.UNKNOWN_ERROR, f"found {exit_code}" except KeyError: raise RemoteExitInfoNotAvailable
[docs] def get_submitted_or_running( self, distributor_ids: Optional[List[str]] = None ) -> Set[str]: """Check status of running task.""" running = os.environ.get("JOB_ID", "") return {running}
[docs] def terminate_task_instances(self, distributor_ids: List[str]) -> None: """Terminate task instances. If implemented, return a list of (task_instance_id, hostname) tuples for any task_instances that are terminated. """ logger.warning( "terminate_task_instances not implemented by ClusterDistributor: " f"{self.__class__.__name__}" )
[docs] def submit_to_batch_distributor( self, command: str, name: str, requested_resources: Dict[str, Any], ) -> str: """Execute sequentially.""" # add an executor id to the environment os.environ["JOB_ID"] = str(self._next_distributor_id) distributor_id = str(self._next_distributor_id) self._next_distributor_id += 1 # run the job and log the exit code try: # run command cli = WorkerNodeCLI() # Configure component logging since we bypass main() cli.configure_component_logging() args = cli.parse_args(command) exit_code = cli.run_task_instance_job(args) except SystemExit as e: if e.code == ReturnCodes.WORKER_NODE_CLI_FAILURE: exit_code = ReturnCodes.WORKER_NODE_CLI_FAILURE else: raise self._exit_info[distributor_id] = exit_code return str(distributor_id)
[docs] class SequentialWorkerNode(ClusterWorkerNode): """Get Executor Info for a Task Instance.""" def __init__(self) -> None: """Initialization of the sequential executor worker node."""
[docs] self._distributor_id: Optional[str] = None
[docs] self._logfile_template = { "stdout": "{root}/{name}.o{job_id}", "stderr": "{root}/{name}.e{job_id}", }
@property
[docs] def distributor_id(self) -> Optional[str]: """Distributor id of the task.""" if self._distributor_id is None: jid = os.environ.get("JOB_ID") if jid: self._distributor_id = jid return self._distributor_id
[docs] def initialize_logfile(self, log_type: str, log_dir: str, name: str) -> str: if log_dir: logpath = self._logfile_template[log_type].format( root=log_dir, name=name, job_id=self.distributor_id ) else: logpath = "/dev/null" return logpath
@staticmethod
[docs] def get_exit_info(exit_code: int, error_msg: str) -> Tuple[str, str]: """Exit info, error message.""" return TaskInstanceStatus.ERROR, error_msg
@staticmethod
[docs] def get_usage_stats() -> Dict: """Usage information specific to the exector.""" return {}