Source code for core.cli

import argparse
import shlex
from typing import Any, List, Optional


[docs] class CLI: """Base CLI with automatic component logging support.""" def __init__(self, component_name: Optional[str] = None) -> None: """Initialize the CLI with optional component logging. Args: component_name: Component name for automatic logging configuration ('distributor', 'worker', 'server', or None to disable) """
[docs] self.parser = argparse.ArgumentParser("jobmon CLI")
[docs] self.component_name = component_name
[docs] def main(self, argstr: Optional[str] = None) -> Any: """Parse args and configure component logging before execution.""" args = self.parse_args(argstr) # Configure component logging if enabled if self.component_name: self.configure_component_logging() if self.component_name: from jobmon.core.otlp.manager import otlp_flush_on_exit with otlp_flush_on_exit(): return args.func(args) return args.func(args)
[docs] def configure_component_logging(self) -> None: """Configure logging for this component using existing infrastructure. This method can be called directly when needed (e.g., by plugins that bypass the normal main() flow). Configures both: 1. Standard logging (handlers, formatters, etc.) 2. Structlog (processors, context merging, OTLP integration) """ if not self.component_name: return # Configure standard logging (handlers, formatters) try: from jobmon.core.config.logconfig_utils import configure_component_logging configure_component_logging(self.component_name) except Exception: # Fail silently - component starts with no logging # This ensures components always start successfully pass # Configure structlog (processors, context variables) # This enables @bind_context decorator and bind_contextvars() # Structlog is a required dependency, so this should always succeed from jobmon.core.config.structlog_config import configure_structlog_with_otlp configure_structlog_with_otlp(component_name=self.component_name)
[docs] def parse_args(self, argstr: Optional[str] = None) -> argparse.Namespace: """Construct a parser, parse either sys.argv (default) or the provided argstr. Returns a Namespace. The Namespace should have a 'func' attribute which can be used to dispatch to the appropriate downstream function. """ arglist: Optional[List[str]] = None if argstr is not None: arglist = shlex.split(argstr) args = self.parser.parse_args(arglist) return args
[docs] def _shutdown_otlp_providers(self) -> None: """Deprecated: retained for backwards compatibility.""" try: from jobmon.core.otlp.manager import JobmonOTLPManager JobmonOTLPManager.get_instance().flush_and_shutdown() except Exception: pass