Source code for core.configuration

"""Parse configuration options and set them to be used throughout the Jobmon Architecture."""

import argparse
import ast
import json
import os
from collections.abc import Mapping, Sequence
from pathlib import Path
from typing import Any, Dict, Optional, Union

import yaml
from dotenv import load_dotenv

from jobmon.core import CONFIG_FILE_FROM_INSTALLER_PLUGIN
from jobmon.core.cli import CLI
from jobmon.core.exceptions import ConfigError

# Load .env file for development convenience, but skip for pytest to ensure test isolation
# This ensures tests have complete control over configuration while developers get
# .env convenience
if not os.environ.get("PYTEST_CURRENT_TEST"):
    load_dotenv()

[docs] DEFAULTS_FILE_NAME = "defaults.yaml"
[docs] DEFAULTS_FILE = Path(__file__).parent / "config" / DEFAULTS_FILE_NAME
[docs] ENV_VAR_PREFIX = "JOBMON__"
[docs] class JobmonConfig: """Default config setup using YAML.""" def __init__(self, filepath: str = "", dict_config: Optional[Dict] = None) -> None: """Jobmon config class. Args: filepath: where to read defaults from. dict_config: dictionary of values to override Config file priority: 1. user specified file passed in 2. environment variable JOBMON__CONFIG_FILE (backdoor for testing):q! 3. config file from installer 4. default config file in core """ if filepath: self._filepath = filepath else: # Allow the user to specify a different config file using an environment variable self._filepath = os.getenv("JOBMON__CONFIG_FILE", "") # if the env not set, check if the installer plugin exists if self._filepath == "": # if the installer plugin exists, use the config file form the plugin self._filepath = CONFIG_FILE_FROM_INSTALLER_PLUGIN if self._filepath: with open(self._filepath, "r", encoding="utf-8") as f: self._config = yaml.safe_load(f) else: # when no config file in env and not installer plug-in, # use the default yaml in core self._filepath = DEFAULTS_FILE # type: ignore with open(DEFAULTS_FILE, "r", encoding="utf-8") as f: self._config = yaml.safe_load(f)
[docs] self._dict_config = dict_config
[docs] def _merge_dicts(self, base: Dict, override: Dict) -> Dict: """Utility function to merge two dictionaries.""" for key, value in override.items(): if isinstance(value, dict): base[key] = self._merge_dicts(base.get(key, {}), value) else: base[key] = value return base
[docs] def _get_env_var_name(self, section: str, key: str) -> str: return f"{ENV_VAR_PREFIX}{section.upper()}__{key.upper()}"
[docs] def _get_environment_variable(self, section: str, key: str) -> Optional[str]: # must have format JOBMON__{SECTION}__{KEY} (note double underscore) env_var = self._get_env_var_name(section, key) return os.environ.get(env_var)
[docs] def _interpolate_env_vars(self, value: Any) -> Any: if isinstance(value, str): return os.path.expandvars(value) return value
[docs] def _coerce_value(self, value: Any) -> Any: """Recursively coerce values to appropriate Python types. • 'true', 'false', etc. → bool • Numeric strings → int/float • JSON / Python literals → parsed objects • Dict / list containers → recurse element-wise • Anything else → returned unchanged """ # Already a non-string, recurse if container if isinstance(value, Mapping): return {k: self._coerce_value(v) for k, v in value.items()} if isinstance(value, Sequence) and not isinstance( value, (str, bytes, bytearray) ): return [self._coerce_value(v) for v in value] if not isinstance(value, str): return value s_val = value.strip() lower_s_val = s_val.lower() # Try numeric conversion first - numbers like "1" and "0" should remain # numeric, not be converted to booleans try: # Try int first, then float if "." not in s_val: return int(s_val) else: return float(s_val) except ValueError: pass # Boolean conversion for explicit boolean strings only # (excludes "1" and "0" which are handled above as integers) if lower_s_val in ("t", "true", "yes"): return True if lower_s_val in ("f", "false", "no"): return False # Try JSON, then Python literal, fall back to raw string for parser in (json.loads, ast.literal_eval): try: return parser(s_val) except Exception: # noqa: BLE001 pass return s_val
[docs] def _get_yaml_variable(self, section: str, key: str) -> Optional[str]: return self._config.get(section, {}).get(key)
[docs] def _get_dict_config_variable(self, section: str, key: str) -> Optional[str]: if self._dict_config: return self._dict_config.get(section, {}).get(key) return None
[docs] def _wrapped_get(self, section: str, key: str) -> str: # First check in the dict_config val = self._get_dict_config_variable(section, key) if val is not None: return self._interpolate_env_vars(val) # Then check environment variable val = self._get_environment_variable(section, key) if val is not None: return val # Then check in the merged _config val = self._get_yaml_variable(section, key) if val is not None: return self._interpolate_env_vars(val) raise ConfigError( f'"{key}" key not found in "{section}" section of {self._filepath}. Fallback ' f'option using environment var "{self._get_env_var_name(section, key)}" was not ' "found." )
[docs] def get(self, section: str, key: str) -> str: """Get the configuration value for the section and key. Raise if key not found. Args: section: the section of the yaml to search. key: the key within the section to retrieve Raises: ConfigError """ try: val = self._wrapped_get(section, key) return self._interpolate_env_vars(val) except ConfigError as e: raise e
[docs] def get_section(self, section: str) -> Dict[str, Any]: """Returns a dictionary of all key-value pairs in the given section. The order of precedence: dict_config > Environment Variable > YAML File. """ # Start with the section from the YAML file, or an empty dict if the section # doesn't exist yet. section_dict = self._config.get(section, {}).copy() # Check environment for variables related to this section and overlay them prefix = f"{ENV_VAR_PREFIX}{section.upper()}__" def _merge_path(target: Dict[str, Any], path: list[str], value: str) -> None: """Recursively merge value into target following path segments.""" cur = target for idx, seg in enumerate(path): seg_lower = seg.lower() if idx == len(path) - 1: # last segment – assign cur[seg_lower] = value else: # Check if parent key exists as primitive value # Convert it to dict to accommodate nested children if seg_lower in cur and not isinstance(cur[seg_lower], dict): # Parent key exists as primitive - convert to dict to allow children cur[seg_lower] = {} # Create nested dict structure if needed if seg_lower not in cur: cur[seg_lower] = {} cur = cur[seg_lower] for env_key, env_val in os.environ.items(): if not env_key.startswith(prefix): continue # Strip prefix -> "SQLALCHEMY_CONNECT_ARGS__SSL" remainder = env_key[len(prefix) :] # Split on double underscores— JOBMON__DB__A__B => ["A", "B"] path_segments = [seg for seg in remainder.split("__") if seg] if not path_segments: continue _merge_path(section_dict, path_segments, env_val) # Overlay values from dict_config, if any (higher precedence than env vars) if self._dict_config and section in self._dict_config: # Need recursive merge for nested structures section_dict = self._merge_dicts(section_dict, self._dict_config[section]) return section_dict
[docs] def get_section_coerced(self, section: str) -> Dict[str, Any]: """Returns a dictionary with all values coerced to appropriate Python types. Same as get_section() but automatically converts: - String booleans to bool - Numeric strings to int/float - JSON/Python literals to their parsed values - Nested structures recursively """ section_dict = self.get_section(section) return self._coerce_value(section_dict)
[docs] def get_boolean(self, section: str, key: str) -> bool: """Get the configuration value for the section and key as bool. Raise if key not found. """ val = self.get(section, key) coerced_val = self._coerce_value(val) if isinstance(coerced_val, bool): return coerced_val # Also accept integers 0 and 1 as boolean values elif isinstance(coerced_val, int) and coerced_val in (0, 1): return bool(coerced_val) else: raise ConfigError( f'Failed to convert value to bool. Please check "{key}" key in "{section}" ' f'section or environment var "{self._get_env_var_name(section, key)}". ' f'Current value: "{val}".' )
[docs] def get_int(self, section: str, key: str) -> int: """Get the configuration value for the section and key as int. Raise if key not found. """ val = self.get(section, key) coerced_val = self._coerce_value(val) if isinstance(coerced_val, int): return coerced_val else: raise ConfigError( f'Failed to convert value to int. Please check "{key}" key in "{section}" ' f'section or environment var "{self._get_env_var_name(section, key)}". ' f'Current value: "{val}".' )
[docs] def get_float(self, section: str, key: str) -> float: """Get the configuration value for the section/key as float. Raise if key not found.""" val = self.get(section, key) coerced_val = self._coerce_value(val) if isinstance(coerced_val, (int, float)): return float(coerced_val) else: raise ConfigError( f'Failed to convert value to float. Please check "{key}" key in "{section}" ' f'section or environment var "{self._get_env_var_name(section, key)}". ' f'Current value: "{val}".' )
[docs] def set(self, section: str, key: str, val: str) -> None: """Set the configuration value for the section/key.""" if section not in self._config: self._config[section] = {} self._config[section][key] = val
[docs] def write(self, filepath: Union[str, Path] = "") -> None: """Persist the current config to disk.""" if not filepath: filepath = self._filepath if not filepath: filepath = DEFAULTS_FILE with open(filepath, "w") as f: yaml.safe_dump(self._config, f)
[docs] class ConfigCLI(CLI): """CLI for `jobmon_config.""" def __init__(self) -> None: """Initialization of client CLI.""" super().__init__()
[docs] self._subparsers = self.parser.add_subparsers( dest="sub_command", parser_class=argparse.ArgumentParser )
[docs] def main(argstr: Optional[str] = None) -> None: """Create CLI.""" cli = ConfigCLI() cli.main(argstr)