"""Dummy Executor fakes execution for testing purposes."""
from __future__ import annotations
from importlib.metadata import version
import logging
import os
import random
from typing import Any, Dict, List, Optional, Set, Tuple, Type, Union
from jobmon.core.cluster_protocol import (
ClusterDistributor,
ClusterQueue,
ClusterWorkerNode,
)
from jobmon.core.constants import TaskInstanceStatus
from jobmon.core.exceptions import RemoteExitInfoNotAvailable
from jobmon.worker_node.cli import WorkerNodeCLI
from jobmon.worker_node.worker_node_factory import WorkerNodeFactory
[docs]
logger = logging.getLogger(__name__)
[docs]
__version__ = version("jobmon_core")
[docs]
class DummyQueue(ClusterQueue):
"""Implementation of the dummy executor queue, derived from ClusterQueue."""
def __init__(self, queue_id: int, queue_name: str, parameters: dict) -> None:
"""Intialization of the the dummy processor queue.
Get the limits from the database in the client.
"""
[docs]
self._queue_id = queue_id
[docs]
self._queue_name = queue_name
[docs]
self._parameters = parameters
[docs]
def validate_resources(
self, strict: bool = False, **kwargs: Union[str, int, float]
) -> Tuple[bool, str]:
"""No resources defined for sequential execution. All resources valid."""
return True, ""
[docs]
def coerce_resources(self, **kwargs: Union[str, int, float]) -> Dict:
return kwargs
@property
[docs]
def queue_id(self) -> int:
"""Return the ID of the queue."""
return self._queue_id
@property
[docs]
def queue_name(self) -> str:
"""Return the name of the queue."""
return self._queue_name
@property
[docs]
def parameters(self) -> Dict:
"""Return the dictionary of parameters."""
return self._parameters
@property
[docs]
def required_resources(self) -> List:
"""No required resources for dummy executor, return empty list."""
return []
[docs]
class DummyDistributor(ClusterDistributor):
"""The Dummy Executor fakes the execution of a Task and acts as though it succeeded."""
def __init__(self, cluster_name: str, *args: tuple, **kwargs: dict) -> None:
"""Initialization of the dummy distributor."""
[docs]
self._cluster_name = cluster_name
@property
[docs]
def worker_node_entry_point(self) -> str:
"""Path to jobmon worker_node_entry_point."""
return ""
@property
[docs]
def cluster_name(self) -> str:
"""Return the name of the cluster type."""
return self._cluster_name
[docs]
def start(self) -> None:
"""Start the executor."""
self.started = True
[docs]
def stop(self) -> None:
"""Stop the executor."""
self.started = False
[docs]
def get_queueing_errors(self, distributor_ids: List[str]) -> Dict[str, str]:
"""Dummy tasks never error, since they never run. So always return an empty dict."""
return {}
[docs]
def get_submitted_or_running(
self, distributor_ids: Optional[List[str]] = None
) -> Set[str]:
"""Check which task instances are active."""
return set()
[docs]
def terminate_task_instances(self, distributor_ids: List[str]) -> None:
"""No such thing as running Dummy tasks. Therefore, nothing to terminate."""
return
[docs]
def submit_to_batch_distributor(
self,
command: str,
name: str,
requested_resources: Dict[str, Any],
) -> str:
"""Run a fake execution of the task.
In a real executor, this is where submission to the cluster would happen (e.g. sbatch
or qsub). Here, since it's a dummy executor, we just get a random number and empty
file paths.
"""
logger.debug("This is the Dummy Distributor")
# even number for non array tasks
distributor_id = random.randint(1, int(1e6)) * 2
os.environ["JOB_ID"] = str(distributor_id)
cli = WorkerNodeCLI()
args = cli.parse_args(command)
worker_node_factory = WorkerNodeFactory(cluster_name=args.cluster_name)
# Do not do ANY logging at all
worker_node_task_instance = worker_node_factory.get_job_task_instance(
task_instance_id=args.task_instance_id
)
# Log running, log done, and exit
worker_node_task_instance.log_running()
worker_node_task_instance.set_command_output(0, "", "")
worker_node_task_instance.log_done()
return str(distributor_id)
[docs]
def get_remote_exit_info(self, distributor_id: str) -> Tuple[str, str]:
"""Get the exit info about the task instance once it is done running."""
raise RemoteExitInfoNotAvailable
[docs]
class DummyWorkerNode(ClusterWorkerNode):
"""Get Executor Info for a Task Instance."""
def __init__(self) -> None:
"""Initialization of the sequential executor worker node."""
[docs]
self._distributor_id: Optional[str] = None
@property
[docs]
def distributor_id(self) -> Optional[str]:
"""Distributor id of the task."""
if self._distributor_id is None:
jid = os.environ.get("JOB_ID")
if jid:
self._distributor_id = jid
return self._distributor_id
@staticmethod
[docs]
def get_exit_info(exit_code: int, error_msg: str) -> Tuple[str, str]:
"""Exit info, error message."""
return TaskInstanceStatus.ERROR, error_msg
@staticmethod
[docs]
def get_usage_stats() -> Dict:
"""Usage information specific to the exector."""
return {}
[docs]
def initialize_logfile(self, log_type: str, log_dir: str, name: str) -> str:
return "/dev/null"
[docs]
def get_cluster_queue_class() -> Type[ClusterQueue]:
"""Return the queue class for the dummy executor."""
return DummyQueue
[docs]
def get_cluster_distributor_class() -> Type[ClusterDistributor]:
"""Return the cluster distributor for the dummy executor."""
return DummyDistributor
[docs]
def get_cluster_worker_node_class() -> Type[ClusterWorkerNode]:
"""Return the cluster worker node class for the dummy executor."""
return DummyWorkerNode