"""Custom OTLP logging handlers that prevent global log pollution."""
from __future__ import annotations
import logging
from typing import Any, Dict, Optional, Union
from jobmon.core.configuration import JobmonConfig
from . import OTLP_AVAILABLE
from .formatters import JobmonOTLPFormatter
[docs]
class JobmonOTLPLoggingHandler(logging.Handler):
"""Universal OTLP logging handler supporting dict configs and pre-configured exporters.
This handler follows the principle of single responsibility while being flexible enough
to work with different configuration patterns:
1. Inline dict configuration (server pattern):
handlers:
otlp_logs:
class: jobmon.core.otlp.JobmonOTLPLoggingHandler
level: INFO
exporter:
module: opentelemetry.exporter.otlp.proto.grpc._log_exporter
class: OTLPLogExporter
endpoint: otelcol.dev.aks.scicomp.ihme.washington.edu:443
options: [["grpc.max_send_message_length", 16777216]]
max_export_batch_size: 8
2. Pre-configured exporter instance:
handler = JobmonOTLPLoggingHandler(exporter=my_exporter)
"""
def __init__(
self, level: int = logging.NOTSET, exporter: Optional[Union[Any, Dict]] = None
) -> None:
"""Initialize with either a dict configuration or pre-configured exporter instance.
Args:
level: Logging level for this handler
exporter: Either a dict configuration or pre-configured OTLP exporter instance
"""
super().__init__(level)
[docs]
self._exporter_config = exporter
[docs]
self._otlp_handler: Optional[logging.Handler] = None
# Simple debug mode for troubleshooting
config = JobmonConfig()
[docs]
self._debug_mode = config.get_boolean("telemetry", "debug")
self.setFormatter(JobmonOTLPFormatter())
[docs]
def emit(self, record: logging.LogRecord) -> None:
"""Emit a log record to OTLP."""
# Create handler on first use
if not self._otlp_handler and self._exporter_config and OTLP_AVAILABLE:
try:
self._otlp_handler = self._create_handler()
if self._debug_mode and self._otlp_handler:
logging.getLogger("jobmon.otlp.debug").info(
"OTLP handler initialized successfully"
)
except Exception as e:
if self._debug_mode:
logging.getLogger("jobmon.otlp.debug").error(
f"OTLP handler initialization failed: {e}", exc_info=True
)
# Emit to OTLP if handler is available
if self._otlp_handler:
try:
self._otlp_handler.emit(record)
except Exception as e:
if self._debug_mode:
logging.getLogger("jobmon.otlp.debug").error(
f"OTLP emit failed: {e}"
)
[docs]
def _create_handler(self) -> Optional[logging.Handler]:
"""Create OTLP handler by processing the exporter configuration."""
try:
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from .resources import create_jobmon_resources
# Create isolated provider with jobmon resources
resource_group = create_jobmon_resources()
logger_provider = LoggerProvider(resource=resource_group)
# Determine if we have a dict config or pre-configured exporter
# Handle both dict and ConvertingDict (from logging config)
if hasattr(self._exporter_config, "get") and hasattr(
self._exporter_config, "keys"
):
# Handle inline dict configuration (server pattern)
exporter = self._create_exporter_from_dict(self._exporter_config)
processor = self._create_processor_from_dict(
exporter, self._exporter_config
)
else:
# Handle pre-configured exporter instance
exporter = self._exporter_config
if not exporter:
return None
processor = BatchLogRecordProcessor(exporter)
if not exporter:
return None
logger_provider.add_log_record_processor(processor)
# Create and return the handler
handler = LoggingHandler(level=self.level, logger_provider=logger_provider)
handler.setFormatter(self.formatter)
return handler
except Exception:
return None
[docs]
def _create_exporter_from_dict(self, config: Any) -> Optional[Any]:
"""Create an OTLP exporter from dictionary configuration (handles ConvertingDict)."""
try:
# Extract exporter configuration
module_name = config.get("module")
class_name = config.get("class")
if not module_name or not class_name:
return None
# Dynamically import and instantiate the exporter
import importlib
module = importlib.import_module(module_name)
exporter_class = getattr(module, class_name)
# Build exporter arguments
exporter_args = {}
# Common exporter arguments
if "endpoint" in config:
exporter_args["endpoint"] = config["endpoint"]
if "headers" in config:
exporter_args["headers"] = dict(
config["headers"]
) # Convert ConvertingDict to dict
if "timeout" in config:
exporter_args["timeout"] = config["timeout"]
if "compression" in config:
exporter_args["compression"] = config["compression"]
if "insecure" in config:
exporter_args["insecure"] = config["insecure"]
if "options" in config:
# Convert list of [key, value] pairs to list of tuples
options_list = config["options"]
exporter_args["options"] = [tuple(option) for option in options_list]
return exporter_class(**exporter_args)
except Exception:
return None
[docs]
def _create_processor_from_dict(self, exporter: Any, config: Any) -> Any:
"""Create a batch processor with configuration from dict (handles ConvertingDict)."""
try:
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
# Extract batch processor configuration
processor_args = {}
if "max_export_batch_size" in config:
processor_args["max_export_batch_size"] = config[
"max_export_batch_size"
]
if "export_timeout_millis" in config:
processor_args["export_timeout_millis"] = config[
"export_timeout_millis"
]
if "schedule_delay_millis" in config:
processor_args["schedule_delay_millis"] = config[
"schedule_delay_millis"
]
if "max_queue_size" in config:
processor_args["max_queue_size"] = config["max_queue_size"]
return BatchLogRecordProcessor(exporter, **processor_args)
except Exception:
# Fallback to basic processor
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
return BatchLogRecordProcessor(exporter)
[docs]
class JobmonOTLPStructlogHandler(JobmonOTLPLoggingHandler):
"""OTLP logging handler with structlog formatting for structured logs.
This handler extends JobmonOTLPLoggingHandler to provide structured logging
using structlog formatting before sending to OTLP.
"""
def __init__(
self, level: int = logging.NOTSET, exporter: Optional[Union[Any, Dict]] = None
) -> None:
"""Initialize with structlog formatter."""
super().__init__(level, exporter)
# Set up structlog formatting if available
try:
import structlog
self.setFormatter(
structlog.stdlib.ProcessorFormatter(
processor=structlog.processors.JSONRenderer(), foreign_pre_chain=[]
)
)
except ImportError:
# Fall back to regular OTLP formatter if structlog not available
pass