"""Core OTLP manager for jobmon-scoped telemetry."""
from __future__ import annotations
import atexit
import importlib
import logging
import os
import signal
import threading
from contextlib import contextmanager
from typing import Any, Callable, Dict, Generator, Optional, Tuple, Type
from . import OTLP_AVAILABLE
if OTLP_AVAILABLE:
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk._logs import LoggerProvider
from .resources import create_jobmon_resources
# Module-level singleton for shared logger provider with thread safety
[docs]
_logger_provider: Optional[Any] = None
[docs]
_logger_provider_lock = threading.Lock()
[docs]
_shutdown_lock = threading.Lock()
[docs]
_shutdown_invoked = False
[docs]
_signal_handlers_installed = False
[docs]
def _build_exporter_args(config: Dict[str, Any], module_name: str) -> Dict[str, Any]:
exporter_args: Dict[str, Any] = {}
if "endpoint" in config:
exporter_args["endpoint"] = config["endpoint"]
if "headers" in config:
exporter_args["headers"] = dict(config["headers"])
if "timeout" in config:
exporter_args["timeout"] = config["timeout"]
if "insecure" in config:
exporter_args["insecure"] = config["insecure"]
compression = config.get("compression")
if compression is not None:
compression_str = str(compression).lower()
is_grpc = ".grpc." in module_name
if is_grpc:
try:
import grpc # type: ignore[import-untyped]
mapping = {
"gzip": grpc.Compression.Gzip,
"deflate": grpc.Compression.Deflate,
"none": grpc.Compression.NoCompression,
"nocompression": grpc.Compression.NoCompression,
}
if compression_str in mapping:
exporter_args["compression"] = mapping[compression_str]
except ImportError:
pass
elif compression_str not in {"none", "nocompression"}:
exporter_args["compression"] = compression_str
if "options" in config:
options_list = config["options"]
exporter_args["options"] = [tuple(option) for option in options_list]
return exporter_args
[docs]
def _normalize_exporter_config(
config: Any, defaults: Optional[Dict[str, Any]] = None
) -> Tuple[Optional[Type[Any]], Dict[str, Any]]:
if not isinstance(config, dict):
return None, {}
module_name = config.get("module")
class_name = config.get("class")
if not module_name or not class_name:
return None, {}
try:
module = importlib.import_module(module_name)
exporter_class = getattr(module, class_name)
except Exception:
return None, {}
exporter_args = dict(defaults or {})
exporter_args.update(_build_exporter_args(config, module_name))
return exporter_class, exporter_args
[docs]
class JobmonOTLPManager:
"""OTLP manager for shared trace and log resources.
This manager handles:
- Trace provider setup (for distributed tracing)
- Logger provider setup (for OTLP log export)
- Resource detection (shared across components)
- Request instrumentation (shared utility)
All OTLP handlers should use the shared logger provider to avoid
duplicate connections and log emissions.
"""
[docs]
_instance: Optional[JobmonOTLPManager] = None
[docs]
_instance_lock = threading.Lock()
def __init__(self) -> None:
"""Initialize the OTLP manager."""
[docs]
self.tracer_provider: Optional[Any] = None
[docs]
self.logger_provider: Optional[Any] = None
[docs]
self._initialized = False
[docs]
self._init_lock = threading.Lock()
@classmethod
[docs]
def get_instance(cls: Type[JobmonOTLPManager]) -> JobmonOTLPManager:
"""Get or create the singleton OTLP manager with thread safety."""
if cls._instance is None:
with cls._instance_lock:
# Double-check pattern: another thread might have created while we waited
if cls._instance is None:
cls._instance = cls()
return cls._instance
[docs]
def initialize(self) -> None:
"""Initialize trace and log providers with jobmon resources."""
if self._initialized or not OTLP_AVAILABLE:
return
with self._init_lock:
# Double-check pattern: another thread might have initialized while we waited
if self._initialized:
return
try:
# Create shared resources
resource_group = create_jobmon_resources()
# Create trace provider
self.tracer_provider = TracerProvider(resource=resource_group)
# Create logger provider (shared across all OTLP handlers)
self.logger_provider = LoggerProvider(resource=resource_group)
# Configure log processor (single processor for all handlers)
self._configure_log_processor()
# Configure span exporters from telemetry configuration
self._configure_span_exporters()
# Set the global tracer provider
trace.set_tracer_provider(self.tracer_provider)
self._initialized = True
except Exception:
# Don't log here to avoid circular dependency during initialization
pass
[docs]
def _create_log_exporter(self, config: Any) -> Optional[Any]:
"""Create a log exporter from configuration dictionary."""
try:
exporter_class, exporter_args = _normalize_exporter_config(config)
if exporter_class is None:
return None
return exporter_class(**exporter_args)
except Exception:
# Don't log here to avoid circular dependency during initialization
return None
[docs]
def _create_span_exporter(self, config: Any) -> Optional[Any]:
"""Create a span exporter from configuration dictionary."""
try:
exporter_class, exporter_args = _normalize_exporter_config(config)
if exporter_class is None:
return None
return exporter_class(**exporter_args)
except Exception:
# Don't log here to avoid circular dependency during initialization
return None
[docs]
def get_tracer(self, name: str) -> Optional[Any]:
"""Get a tracer for distributed tracing."""
if not OTLP_AVAILABLE or not self.tracer_provider:
return None
return self.tracer_provider.get_tracer(name)
@classmethod
[docs]
def instrument_requests(cls: Type[JobmonOTLPManager]) -> None:
"""Instrument requests library for HTTP tracing."""
if not OTLP_AVAILABLE:
return
try:
from opentelemetry.instrumentation.requests import RequestsInstrumentor
RequestsInstrumentor().instrument()
except ImportError:
pass
[docs]
def shutdown(self) -> None:
"""Shutdown trace and log providers."""
if not self._initialized:
return
try:
if self.tracer_provider and hasattr(self.tracer_provider, "shutdown"):
self.tracer_provider.shutdown()
if self.logger_provider and hasattr(self.logger_provider, "shutdown"):
self.logger_provider.shutdown()
except Exception as e:
logging.getLogger(__name__).warning(f"Error during OTLP shutdown: {e}")
finally:
self._initialized = False
[docs]
def flush_and_shutdown(self) -> None:
"""Flush pending OTLP telemetry and shut down providers."""
if not OTLP_AVAILABLE:
return
self._force_flush_logger_provider()
self._force_flush_tracer_provider()
self.shutdown()
[docs]
def _force_flush_logger_provider(self) -> None:
provider = self.logger_provider
if not provider:
return
flush = getattr(provider, "force_flush", None)
if callable(flush):
try:
flush()
except Exception:
pass
[docs]
def _force_flush_tracer_provider(self) -> None:
provider = self.tracer_provider
if not provider:
return
flush = getattr(provider, "force_flush", None)
if callable(flush):
try:
flush()
except Exception:
pass
# Configuration validation utilities
[docs]
def _flush_otlp_once(reason: str = "atexit") -> None:
"""Flush OTLP telemetry a single time per process."""
if not OTLP_AVAILABLE:
return
global _shutdown_invoked
with _shutdown_lock:
if _shutdown_invoked:
return
_shutdown_invoked = True
try:
manager = JobmonOTLPManager.get_instance()
manager.flush_and_shutdown()
except Exception:
# Best-effort shutdown; suppress failures during interpreter teardown
pass
[docs]
def _register_atexit_hook() -> None:
"""Register the OTLP flush hook for interpreter shutdown."""
if not OTLP_AVAILABLE:
return
atexit.register(_flush_otlp_once)
[docs]
def _register_signal_handlers() -> None:
"""Install signal handlers that flush OTLP before termination."""
if not OTLP_AVAILABLE:
return
global _signal_handlers_installed
if _signal_handlers_installed:
return
def _make_handler(previous_handler: Any, signum: int) -> Callable[[int, Any], None]:
def _handler(signum_inner: int, frame: Any) -> None:
_flush_otlp_once(f"signal:{signum_inner}")
if callable(previous_handler):
try:
previous_handler(signum_inner, frame)
except Exception:
pass
elif previous_handler == signal.SIG_DFL:
try:
signal.signal(signum_inner, signal.SIG_DFL)
if hasattr(signal, "raise_signal"):
signal.raise_signal(signum_inner)
else:
os.kill(os.getpid(), signum_inner)
except Exception:
pass
# Ignore SIG_IGN (do nothing)
return _handler
for signum in (getattr(signal, "SIGTERM", None), getattr(signal, "SIGINT", None)):
if signum is None:
continue
try:
previous = signal.getsignal(signum)
handler = _make_handler(previous, signum)
signal.signal(signum, handler)
except Exception:
continue
_signal_handlers_installed = True
[docs]
def _install_lifecycle_hooks() -> None:
"""Ensure lifecycle hooks are installed exactly once."""
_register_atexit_hook()
_register_signal_handlers()
_install_lifecycle_hooks()
@contextmanager
[docs]
def otlp_flush_on_exit() -> Generator[Optional[JobmonOTLPManager], None, None]:
"""Context manager that guarantees OTLP flush when exiting."""
if not OTLP_AVAILABLE:
yield None
return
manager = JobmonOTLPManager.get_instance()
try:
yield manager
finally:
manager.flush_and_shutdown()
[docs]
def register_otlp_shutdown_event(app: Any) -> None:
"""Register a FastAPI shutdown hook that flushes OTLP telemetry."""
if not OTLP_AVAILABLE:
return
add_event_handler = getattr(app, "add_event_handler", None)
if callable(add_event_handler):
async def _flush_otlp_on_shutdown() -> None: # pragma: no cover - integration
_flush_otlp_once("fastapi-shutdown")
add_event_handler("shutdown", _flush_otlp_on_shutdown)
return
on_event = getattr(app, "on_event", None)
if callable(on_event):
@on_event("shutdown")
async def _flush_otlp_on_shutdown() -> (
None
): # pragma: no cover - exercised in integration
_flush_otlp_once("fastapi-shutdown")
[docs]
def validate_otlp_exporter_config(config: Any, exporter_type: str = "log") -> list[str]:
"""Validate OTLP exporter configuration and return list of issues.
Args:
config: Exporter configuration dictionary
exporter_type: Type of exporter ('log', 'trace', 'metric')
Returns:
List of validation error messages. Empty list if valid.
"""
issues = []
# Check required fields
if not config.get("module"):
issues.append("Missing required field: module")
if not config.get("class"):
issues.append("Missing required field: class")
# Define supported parameters by exporter type
SUPPORTED_PARAMS = {
"log": {
"endpoint",
"headers",
"timeout",
"compression",
"insecure",
"max_export_batch_size",
"export_timeout_millis",
"schedule_delay_millis",
"max_queue_size",
},
"trace": {
"endpoint",
"headers",
"timeout",
"compression",
"insecure",
"options",
"max_export_batch_size",
"export_timeout_millis",
"schedule_delay_millis",
"max_queue_size",
},
"metric": {
"endpoint",
"headers",
"timeout",
"compression",
"insecure",
"options",
"aggregation_temporality",
"max_export_batch_size",
"export_timeout_millis",
"schedule_delay_millis",
"max_queue_size",
},
}
supported = SUPPORTED_PARAMS.get(exporter_type, SUPPORTED_PARAMS["log"])
# Check for unsupported parameters
config_keys = set(config.keys()) - {"module", "class"} # Exclude metadata fields
unsupported = config_keys - supported
if unsupported:
issues.append(
f"Unsupported parameters for {exporter_type} exporter: {sorted(unsupported)}"
)
# Specific validation for known problematic parameters
if "options" in config and exporter_type == "log":
issues.append(
"'options' parameter is not supported by OTLPLogExporter. Remove this parameter."
)
# Validate endpoint format
endpoint = config.get("endpoint")
if endpoint:
if not isinstance(endpoint, str):
issues.append("'endpoint' must be a string")
elif not (endpoint.startswith("http://") or endpoint.startswith("https://")):
issues.append("'endpoint' must start with http:// or https://")
# Validate timeout
timeout = config.get("timeout")
if timeout is not None:
if not isinstance(timeout, (int, float)) or timeout <= 0:
issues.append("'timeout' must be a positive number")
# Validate batch size parameters
for param in ["max_export_batch_size", "max_queue_size"]:
value = config.get(param)
if value is not None:
if not isinstance(value, int) or value <= 0:
issues.append(f"'{param}' must be a positive integer")
# Validate timing parameters
for param in ["export_timeout_millis", "schedule_delay_millis"]:
value = config.get(param)
if value is not None:
if not isinstance(value, int) or value < 0:
issues.append(f"'{param}' must be a non-negative integer")
return issues
[docs]
def initialize_jobmon_otlp() -> JobmonOTLPManager:
"""Initialize OTLP for shared resources (traces and logs).
This creates shared TracerProvider and LoggerProvider instances that
should be used by all OTLP handlers to avoid duplicate connections.
Returns:
The OTLP manager instance with shared providers
"""
manager = JobmonOTLPManager.get_instance()
manager.initialize()
return manager
[docs]
def get_shared_logger_provider() -> Optional[Any]:
"""Get the shared logger provider, initializing if needed.
This function provides a clean interface for handlers to access the
shared LoggerProvider without dealing with manager instances directly.
Uses double-checked locking to prevent race conditions in multi-threaded
environments like Kubernetes with multiple workers.
Returns:
The shared LoggerProvider instance, or None if unavailable
"""
global _logger_provider
if _logger_provider is None:
with _logger_provider_lock:
# Double-check pattern: another thread might have initialized while we waited
if _logger_provider is None:
manager = JobmonOTLPManager.get_instance()
manager.initialize()
_logger_provider = manager.logger_provider
return _logger_provider
[docs]
def get_logger(name: str) -> Optional[Any]:
"""Get a logger from the shared provider.
This is the cleanest way for handlers to get OTLP loggers without
dealing with manager instances or initialization complexity.
Args:
name: Logger name (typically __name__)
Returns:
OTLP logger instance, or None if unavailable
"""
provider = get_shared_logger_provider()
return provider.get_logger(name) if provider else None
[docs]
def create_log_exporter(**kwargs: Any) -> Optional[Any]:
"""Create a pre-configured log exporter for client applications.
This factory function creates exporters that can be passed to
JobmonOTLPLoggingHandler for pure separation.
Args:
**kwargs: Exporter configuration (endpoint, headers, etc.)
Returns:
Pre-configured OTLP log exporter, or None if unavailable
Example:
exporter = create_log_exporter(
endpoint="otelcol.dev.aks:443",
max_batch_size=8
)
handler = JobmonOTLPLoggingHandler(exporter=exporter)
"""
if not OTLP_AVAILABLE:
return None
try:
config: Dict[str, Any] = {
"module": "opentelemetry.exporter.otlp.proto.grpc._log_exporter",
"class": "OTLPLogExporter",
}
config.update(kwargs)
exporter_class, exporter_args = _normalize_exporter_config(
config, defaults={"insecure": True}
)
if exporter_class is None:
return None
return exporter_class(**exporter_args)
except Exception:
return None