Source code for worker_node.cli

"""Command line interface for Execution."""

import argparse
import importlib
import importlib.machinery
import importlib.util
import logging
import os
import sys
from typing import Optional

from jobmon.core.cli import CLI
from jobmon.core.task_generator import TaskGenerator

[docs] logger = logging.getLogger(__name__)
[docs] class WorkerNodeCLI(CLI): """Command line interface for WorkderNode.""" def __init__(self) -> None: """Initialization of the worker node CLI."""
[docs] self.parser = argparse.ArgumentParser("jobmon worker_node CLI")
[docs] self._subparsers = self.parser.add_subparsers( dest="sub_command", parser_class=argparse.ArgumentParser )
self._add_worker_node_job_parser() self._add_worker_node_array_parser() self._add_run_task_generator_parser()
[docs] def run_task_instance_job(self, args: argparse.Namespace) -> int: """Configuration for the jobmon worker node.""" from jobmon.core.exceptions import ReturnCodes from jobmon.worker_node import __version__ from jobmon.worker_node.worker_node_factory import WorkerNodeFactory if __version__ != args.expected_jobmon_version: msg = ( f"Your expected Jobmon version is {args.expected_jobmon_version} and your " f"worker node is using {__version__}. Please check your bash profile " ) logger.error(msg) sys.exit(ReturnCodes.WORKER_NODE_ENV_FAILURE) worker_node_factory = WorkerNodeFactory(cluster_name=args.cluster_name) worker_node_task_instance = worker_node_factory.get_job_task_instance( task_instance_id=args.task_instance_id ) worker_node_task_instance.configure_logging() try: worker_node_task_instance.run() except Exception as e: logger.error(e) sys.exit(ReturnCodes.WORKER_NODE_CLI_FAILURE) return worker_node_task_instance.command_returncode
[docs] def run_task_instance_array(self, args: argparse.Namespace) -> int: """Configuration for the jobmon worker node.""" from jobmon.core.exceptions import ReturnCodes from jobmon.worker_node import __version__ from jobmon.worker_node.worker_node_factory import WorkerNodeFactory if __version__ != args.expected_jobmon_version: msg = ( f"Your expected Jobmon version is {args.expected_jobmon_version} and your " f"worker node is using {__version__}. Please check your bash profile " ) logger.error(msg) sys.exit(ReturnCodes.WORKER_NODE_ENV_FAILURE) worker_node_factory = WorkerNodeFactory(cluster_name=args.cluster_name) worker_node_task_instance = worker_node_factory.get_array_task_instance( array_id=args.array_id, batch_number=args.batch_number, ) worker_node_task_instance.configure_logging() try: worker_node_task_instance.run() except Exception as e: logger.error(e) sys.exit(ReturnCodes.WORKER_NODE_CLI_FAILURE) return worker_node_task_instance.command_returncode
[docs] def run_task_generator(self, args: argparse.Namespace) -> int: from jobmon.core.exceptions import ReturnCodes # Import the module and get the task generator we've been pointed to, raise an error # if it's not a TaskGenerator # if the user used the --module_dir flag, add the module directory to the path if args.module_source_path: # Create a module spec from the source file loader = importlib.machinery.SourceFileLoader( args.module_name, os.path.expanduser(args.module_source_path) ) spec = importlib.util.spec_from_loader(loader.name, loader) # Create a new module based on the spec mod = importlib.util.module_from_spec(spec) # type: ignore # Add the module to sys.modules sys.modules[args.module_name] = mod loader.exec_module(mod) else: mod = importlib.import_module(args.module_name) task_generator = getattr(mod, args.func_name) if not isinstance(task_generator, TaskGenerator): raise ValueError( f"{args.module_name}:{args.func_name} doesn't point to a runnable jobmon task." ) # if the user used the --arghelp flag, print the help message for the task generator if args.arghelp: print(task_generator.help()) return ReturnCodes.OK try: task_generator.run(args.args) return ReturnCodes.OK except Exception as e: print(e) raise e
[docs] def _add_run_task_generator_parser(self) -> None: generator_parser = self._subparsers.add_parser("task_generator") generator_parser.set_defaults(func=self.run_task_generator) generator_parser.add_argument( "--module_name", help="name of the module containing the TaskGenerator", required=True, ) generator_parser.add_argument( "--func_name", type=str, help="the name of the function which has been turned into a TaskGenerator", required=True, ) generator_parser.add_argument( "--args", type=str, help="Followed by the key=value; .\n" "For example: \n" "If you method has two argument: def func(foo: int, bar: str), pass\n" " --args foo=1 --args bar='test'\n" "If you method has two argument: def func(foo: int, bar: List[str), pass\n" " --args foo=1 --args bar=[a,b]\n", required=False, action="append", ) generator_parser.add_argument( "--arghelp", type=str, help="Show the help message for the task generator. For example: --arghelp", required=False, ) generator_parser.add_argument( "--module_source_path", type=str, help="The directory the module source code located; " "you do not need this if the module is installed in your system.", required=False, )
[docs] def _add_worker_node_job_parser(self) -> None: job_parser = self._subparsers.add_parser("worker_node_job") job_parser.set_defaults(func=self.run_task_instance_job) job_parser.add_argument( "--task_instance_id", help="task_instance_id of the work node.", required=False, ) job_parser.add_argument( "--cluster_name", type=str, help="cluster_name of the work node.", required=True, ) job_parser.add_argument( "--expected_jobmon_version", type=str, help="expected_jobmon_version of the work node.", required=True, )
[docs] def _add_worker_node_array_parser(self) -> None: array_parser = self._subparsers.add_parser("worker_node_array") array_parser.set_defaults(func=self.run_task_instance_array) array_parser.add_argument( "--array_id", help="array_id of the worker node if this is an array task.", required=False, ) array_parser.add_argument( "--batch_number", help="batch number of the array this task instance is associated with.", required=False, ) array_parser.add_argument( "--cluster_name", type=str, help="cluster_name of the work node.", required=True, ) array_parser.add_argument( "--expected_jobmon_version", type=str, help="expected_jobmon_version of the work node.", required=True, )
[docs] def run(argstr: Optional[str] = None) -> None: """Entrypoint to create WorkerNode CLI.""" cli = WorkerNodeCLI() cli.main(argstr)