core.otlp.manager ================= .. py:module:: core.otlp.manager .. autoapi-nested-parse:: Core OTLP manager for jobmon-scoped telemetry. Attributes ---------- .. autoapisummary:: core.otlp.manager._logger_provider core.otlp.manager._logger_provider_lock core.otlp.manager._shutdown_lock core.otlp.manager._shutdown_invoked core.otlp.manager._signal_handlers_installed Classes ------- .. autoapisummary:: core.otlp.manager.JobmonOTLPManager Functions --------- .. autoapisummary:: core.otlp.manager._build_exporter_args core.otlp.manager._normalize_exporter_config core.otlp.manager._flush_otlp_once core.otlp.manager._register_atexit_hook core.otlp.manager._register_signal_handlers core.otlp.manager._install_lifecycle_hooks core.otlp.manager.otlp_flush_on_exit core.otlp.manager.register_otlp_shutdown_event core.otlp.manager.validate_otlp_exporter_config core.otlp.manager.initialize_jobmon_otlp core.otlp.manager.get_shared_logger_provider core.otlp.manager.get_logger core.otlp.manager.create_log_exporter Module Contents --------------- .. py:data:: _logger_provider :type: Optional[Any] :value: None .. py:data:: _logger_provider_lock .. py:data:: _shutdown_lock .. py:data:: _shutdown_invoked :value: False .. py:data:: _signal_handlers_installed :value: False .. py:function:: _build_exporter_args(config: Dict[str, Any], module_name: str) -> Dict[str, Any] .. py:function:: _normalize_exporter_config(config: Any, defaults: Optional[Dict[str, Any]] = None) -> Tuple[Optional[Type[Any]], Dict[str, Any]] .. py:class:: JobmonOTLPManager OTLP manager for shared trace and log resources. This manager handles: - Trace provider setup (for distributed tracing) - Logger provider setup (for OTLP log export) - Resource detection (shared across components) - Request instrumentation (shared utility) All OTLP handlers should use the shared logger provider to avoid duplicate connections and log emissions. .. py:attribute:: _instance :type: Optional[JobmonOTLPManager] :value: None .. py:attribute:: _instance_lock .. py:attribute:: tracer_provider :type: Optional[Any] :value: None .. py:attribute:: logger_provider :type: Optional[Any] :value: None .. py:attribute:: _initialized :value: False .. py:attribute:: _log_processor_configured :value: False .. py:attribute:: _init_lock .. py:method:: get_instance() -> JobmonOTLPManager :classmethod: Get or create the singleton OTLP manager with thread safety. .. py:method:: initialize() -> None Initialize trace and log providers with jobmon resources. .. py:method:: _configure_log_processor() -> None Configure log processor from telemetry configuration. .. py:method:: _create_log_exporter(config: Any) -> Optional[Any] Create a log exporter from configuration dictionary. .. py:method:: _configure_span_exporters() -> None Configure span exporters from telemetry configuration. .. py:method:: _create_span_exporter(config: Any) -> Optional[Any] Create a span exporter from configuration dictionary. .. py:method:: get_tracer(name: str) -> Optional[Any] Get a tracer for distributed tracing. .. py:method:: instrument_requests() -> None :classmethod: Instrument requests library for HTTP tracing. .. py:method:: shutdown() -> None Shutdown trace and log providers. .. py:method:: flush_and_shutdown() -> None Flush pending OTLP telemetry and shut down providers. .. py:method:: _force_flush_logger_provider() -> None .. py:method:: _force_flush_tracer_provider() -> None .. py:function:: _flush_otlp_once(reason: str = 'atexit') -> None Flush OTLP telemetry a single time per process. .. py:function:: _register_atexit_hook() -> None Register the OTLP flush hook for interpreter shutdown. .. py:function:: _register_signal_handlers() -> None Install signal handlers that flush OTLP before termination. .. py:function:: _install_lifecycle_hooks() -> None Ensure lifecycle hooks are installed exactly once. .. py:function:: otlp_flush_on_exit() -> Generator[Optional[JobmonOTLPManager], None, None] Context manager that guarantees OTLP flush when exiting. .. py:function:: register_otlp_shutdown_event(app: Any) -> None Register a FastAPI shutdown hook that flushes OTLP telemetry. .. py:function:: validate_otlp_exporter_config(config: Any, exporter_type: str = 'log') -> list[str] Validate OTLP exporter configuration and return list of issues. :param config: Exporter configuration dictionary :param exporter_type: Type of exporter ('log', 'trace', 'metric') :returns: List of validation error messages. Empty list if valid. .. py:function:: initialize_jobmon_otlp() -> JobmonOTLPManager Initialize OTLP for shared resources (traces and logs). This creates shared TracerProvider and LoggerProvider instances that should be used by all OTLP handlers to avoid duplicate connections. :returns: The OTLP manager instance with shared providers .. py:function:: get_shared_logger_provider() -> Optional[Any] Get the shared logger provider, initializing if needed. This function provides a clean interface for handlers to access the shared LoggerProvider without dealing with manager instances directly. Uses double-checked locking to prevent race conditions in multi-threaded environments like Kubernetes with multiple workers. :returns: The shared LoggerProvider instance, or None if unavailable .. py:function:: get_logger(name: str) -> Optional[Any] Get a logger from the shared provider. This is the cleanest way for handlers to get OTLP loggers without dealing with manager instances or initialization complexity. :param name: Logger name (typically __name__) :returns: OTLP logger instance, or None if unavailable .. py:function:: create_log_exporter(**kwargs: Any) -> Optional[Any] Create a pre-configured log exporter for client applications. This factory function creates exporters that can be passed to JobmonOTLPLoggingHandler for pure separation. :param \*\*kwargs: Exporter configuration (endpoint, headers, etc.) :returns: Pre-configured OTLP log exporter, or None if unavailable .. rubric:: Example exporter = create_log_exporter( endpoint="otelcol.dev.aks:443", max_batch_size=8 ) handler = JobmonOTLPLoggingHandler(exporter=exporter)