Source code for core.otlp.validation

"""OTLP configuration validation utilities."""

from __future__ import annotations

import logging
from typing import Any, Dict, List, Optional


[docs] def validate_otlp_exporter_config( config: Dict[str, Any], exporter_type: str = "log" ) -> List[str]: """Validate OTLP exporter configuration and return list of issues. Args: config: Exporter configuration dictionary exporter_type: Type of exporter ('log', 'trace', 'metric') Returns: List of validation error messages. Empty list if valid. """ issues = [] # Check required fields if not config.get("module"): issues.append("Missing required field: module") if not config.get("class"): issues.append("Missing required field: class") # Define supported parameters by exporter type SUPPORTED_PARAMS = { "log": { "endpoint", "headers", "timeout", "compression", "insecure", "max_export_batch_size", "export_timeout_millis", "schedule_delay_millis", "max_queue_size", }, "trace": { "endpoint", "headers", "timeout", "compression", "insecure", "options", "max_export_batch_size", "export_timeout_millis", "schedule_delay_millis", "max_queue_size", }, "metric": { "endpoint", "headers", "timeout", "compression", "insecure", "options", "aggregation_temporality", "max_export_batch_size", "export_timeout_millis", "schedule_delay_millis", "max_queue_size", }, } supported = SUPPORTED_PARAMS.get(exporter_type, SUPPORTED_PARAMS["log"]) # Check for unsupported parameters config_keys = set(config.keys()) - {"module", "class"} # Exclude metadata fields unsupported = config_keys - supported if unsupported: issues.append( f"Unsupported parameters for {exporter_type} exporter: {sorted(unsupported)}" ) # Specific validation for known problematic parameters if "options" in config and exporter_type == "log": issues.append( "'options' parameter is not supported by OTLPLogExporter. Remove this parameter." ) # Validate endpoint format endpoint = config.get("endpoint") if endpoint: if not isinstance(endpoint, str): issues.append("'endpoint' must be a string") elif not (endpoint.startswith("http://") or endpoint.startswith("https://")): issues.append("'endpoint' must start with http:// or https://") # Validate timeout timeout = config.get("timeout") if timeout is not None: if not isinstance(timeout, (int, float)) or timeout <= 0: issues.append("'timeout' must be a positive number") # Validate batch size parameters for param in ["max_export_batch_size", "max_queue_size"]: value = config.get(param) if value is not None: if not isinstance(value, int) or value <= 0: issues.append(f"'{param}' must be a positive integer") # Validate timing parameters for param in ["export_timeout_millis", "schedule_delay_millis"]: value = config.get(param) if value is not None: if not isinstance(value, int) or value < 0: issues.append(f"'{param}' must be a non-negative integer") return issues
[docs] def validate_logging_config_otlp(config: Dict[str, Any]) -> Dict[str, List[str]]: """Validate OTLP configuration in a logging config dictionary. Args: config: Full logging configuration dictionary Returns: Dictionary mapping handler names to lists of validation issues """ validation_results = {} handlers = config.get("handlers", {}) otlp_handler_classes = { "jobmon.core.otlp.JobmonOTLPLoggingHandler", "jobmon.core.otlp.JobmonOTLPStructlogHandler", "jobmon.core.otlp.handlers.JobmonOTLPLoggingHandler", "jobmon.core.otlp.handlers.JobmonOTLPStructlogHandler", } for handler_name, handler_config in handlers.items(): handler_class = handler_config.get("class", "") if handler_class in otlp_handler_classes: exporter_config = handler_config.get("exporter", {}) if exporter_config: issues = validate_otlp_exporter_config(exporter_config, "log") if issues: validation_results[handler_name] = issues return validation_results
[docs] def log_validation_results( validation_results: Dict[str, List[str]], logger: Optional[logging.Logger] = None ) -> None: """Log validation results using the provided logger. Args: validation_results: Dictionary mapping handler names to validation issues logger: Logger to use. If None, uses default logger. """ if logger is None: logger = logging.getLogger(__name__) if not validation_results: logger.info("OTLP configuration validation passed - no issues found") return logger.error("OTLP configuration validation failed:") for handler_name, issues in validation_results.items(): logger.error(f"Handler '{handler_name}' has {len(issues)} issue(s):") for issue in issues: logger.error(f" - {issue}")
[docs] def validate_and_log_otlp_config( config: Dict[str, Any], logger: Optional[logging.Logger] = None ) -> bool: """Validate OTLP configuration and log results. Args: config: Full logging configuration dictionary logger: Logger to use for validation results Returns: True if validation passed, False if issues were found """ validation_results = validate_logging_config_otlp(config) log_validation_results(validation_results, logger) return len(validation_results) == 0