Source code for worker_node.cli

"""Command line interface for Execution."""

import argparse
import importlib
import importlib.machinery
import importlib.util
import os
import sys
from typing import Optional

import structlog

from jobmon.core.cli import CLI
from jobmon.core.logging import set_jobmon_context
from jobmon.core.task_generator import TaskGenerator

[docs] logger = structlog.get_logger(__name__)
[docs] class WorkerNodeCLI(CLI): """Command line interface for Worker Node with automatic logging.""" def __init__(self) -> None: """Initialization of the worker node CLI with automatic component logging.""" # Enable automatic logging for worker component super().__init__(component_name="worker")
[docs] self.parser = argparse.ArgumentParser("jobmon worker_node CLI")
[docs] self._subparsers = self.parser.add_subparsers( dest="sub_command", parser_class=argparse.ArgumentParser )
self._add_worker_node_job_parser() self._add_worker_node_array_parser() self._add_run_task_generator_parser()
[docs] def run_task_instance_job(self, args: argparse.Namespace) -> int: """Configuration for the jobmon worker node.""" from jobmon.core.exceptions import ReturnCodes from jobmon.worker_node import __version__ from jobmon.worker_node.worker_node_factory import WorkerNodeFactory # Bind global context for this worker instance set_jobmon_context( cluster_name=args.cluster_name, task_instance_id=args.task_instance_id, ) if __version__ != args.expected_jobmon_version: logger.error( "Version mismatch", expected=args.expected_jobmon_version, actual=__version__, ) sys.exit(ReturnCodes.WORKER_NODE_ENV_FAILURE) logger.debug( "Worker node starting for task instance", task_instance_id=args.task_instance_id, cluster_name=args.cluster_name, ) worker_node_factory = WorkerNodeFactory(cluster_name=args.cluster_name) worker_node_task_instance = worker_node_factory.get_job_task_instance( task_instance_id=args.task_instance_id ) try: logger.debug( "Worker node executing task instance", task_instance_id=args.task_instance_id, ) worker_node_task_instance.run() except Exception as e: logger.exception("Worker node error", error=str(e)) sys.exit(ReturnCodes.WORKER_NODE_CLI_FAILURE) return worker_node_task_instance.command_returncode
[docs] def run_task_instance_array(self, args: argparse.Namespace) -> int: """Configuration for the jobmon worker node.""" from jobmon.core.exceptions import ReturnCodes from jobmon.worker_node import __version__ from jobmon.worker_node.worker_node_factory import WorkerNodeFactory # Bind global context for this array worker instance set_jobmon_context( cluster_name=args.cluster_name, array_id=args.array_id, batch_number=args.batch_number, ) if __version__ != args.expected_jobmon_version: logger.error( "Version mismatch", expected=args.expected_jobmon_version, actual=__version__, ) sys.exit(ReturnCodes.WORKER_NODE_ENV_FAILURE) logger.info( "Worker node array starting", array_id=args.array_id, batch_number=args.batch_number, cluster_name=args.cluster_name, ) worker_node_factory = WorkerNodeFactory(cluster_name=args.cluster_name) worker_node_task_instance = worker_node_factory.get_array_task_instance( array_id=args.array_id, batch_number=args.batch_number, ) try: logger.info( "Worker node array executing task instance", array_id=args.array_id, batch_number=args.batch_number, ) worker_node_task_instance.run() except Exception as e: logger.exception("Worker node array error", error=str(e)) sys.exit(ReturnCodes.WORKER_NODE_CLI_FAILURE) return worker_node_task_instance.command_returncode
[docs] def run_task_generator(self, args: argparse.Namespace) -> int: from jobmon.core.exceptions import ReturnCodes # Import the module and get the task generator we've been pointed to, raise an error # if it's not a TaskGenerator # if the user used the --module_dir flag, add the module directory to the path if args.module_source_path: # Create a module spec from the source file loader = importlib.machinery.SourceFileLoader( args.module_name, os.path.expanduser(args.module_source_path) ) spec = importlib.util.spec_from_loader(loader.name, loader) # Create a new module based on the spec mod = importlib.util.module_from_spec(spec) # type: ignore # Add the module to sys.modules sys.modules[args.module_name] = mod loader.exec_module(mod) else: mod = importlib.import_module(args.module_name) task_generator = getattr(mod, args.func_name) if not isinstance(task_generator, TaskGenerator): raise ValueError( f"{args.module_name}:{args.func_name} doesn't point to a runnable jobmon task." ) # if the user used the --arghelp flag, print the help message for the task generator if args.arghelp: print(task_generator.help()) return ReturnCodes.OK try: if len(args.args) != 1: raise ValueError( "Internal error parsing command line arguments. Expected to find a single " f"element in the parsed args list, but found {len(args.args)}.\n" f"Parsed args: {args.args}" ) task_generator.run(args.args[0]) return ReturnCodes.OK except Exception as e: logger.exception("Worker node task generator error", error=str(e)) raise e
[docs] def _add_run_task_generator_parser(self) -> None: generator_parser = self._subparsers.add_parser("task_generator") generator_parser.set_defaults(func=self.run_task_generator) generator_parser.add_argument( "--module_name", help="name of the module containing the TaskGenerator", required=True, ) generator_parser.add_argument( "--func_name", type=str, help="the name of the function which has been turned into a TaskGenerator", required=True, ) generator_parser.add_argument( "--arghelp", type=str, help="Show the help message for the task generator. For example: --arghelp", required=False, ) generator_parser.add_argument( "--module_source_path", type=str, help="The directory the module source code located; " "you do not need this if the module is installed in your system.", required=False, ) generator_parser.add_argument( "args", type=str, nargs="*", action="append", help=( "Arbitrary key-value pairs to pass to the task generator. Expected form is:\n" " key1='val1' key2='val2' ..." "For lists of values, expected form is:\n" " key1='[val1,val2,val3]'" ), )
[docs] def _add_worker_node_job_parser(self) -> None: job_parser = self._subparsers.add_parser("worker_node_job") job_parser.set_defaults(func=self.run_task_instance_job) job_parser.add_argument( "--task_instance_id", help="task_instance_id of the work node.", required=False, ) job_parser.add_argument( "--cluster_name", type=str, help="cluster_name of the work node.", required=True, ) job_parser.add_argument( "--expected_jobmon_version", type=str, help="expected_jobmon_version of the work node.", required=True, )
[docs] def _add_worker_node_array_parser(self) -> None: array_parser = self._subparsers.add_parser("worker_node_array") array_parser.set_defaults(func=self.run_task_instance_array) array_parser.add_argument( "--array_id", help="array_id of the worker node if this is an array task.", required=False, ) array_parser.add_argument( "--batch_number", help="batch number of the array this task instance is associated with.", required=False, ) array_parser.add_argument( "--cluster_name", type=str, help="cluster_name of the work node.", required=True, ) array_parser.add_argument( "--expected_jobmon_version", type=str, help="expected_jobmon_version of the work node.", required=True, )
[docs] def run(argstr: Optional[str] = None) -> None: """Entrypoint to create WorkerNode CLI.""" cli = WorkerNodeCLI() cli.main(argstr)