Source code for server.web.otlp.manager

"""Server-specific OTLP manager that builds on core functionality."""

from __future__ import annotations

import logging
from typing import Any, Optional, Type

# Check if OpenTelemetry is available
try:
    import opentelemetry.trace  # noqa: F401

[docs] OTLP_AVAILABLE = True
except ImportError: OTLP_AVAILABLE = False
[docs] class ServerOTLPManager: """Server-specific OTLP manager that builds on core functionality.""" def __init__(self) -> None: """Initialize server OTLP manager."""
[docs] self._core_manager: Optional[Any] = None
[docs] self._initialized = False
# Guard flags to prevent double instrumentation warnings
[docs] self._fastapi_instrumented = False
[docs] self._requests_instrumented = False
[docs] self._sqlalchemy_instrumented = False
[docs] def initialize(self) -> None: """Initialize the core OTLP manager for server use.""" if self._initialized: return if not OTLP_AVAILABLE: logger = logging.getLogger(__name__) logger.info("OpenTelemetry not available - server OTLP disabled") return try: from jobmon.core.otlp import JobmonOTLPManager # Initialize core manager (provides OTLP providers for logconfig handlers) self._core_manager = JobmonOTLPManager.get_instance() self._core_manager.initialize() # Note: OTLP logging handlers are configured via dictConfig/logconfig # using JobmonOTLPLoggingHandler and JobmonOTLPStructlogHandler self._initialized = True except Exception as e: logger = logging.getLogger(__name__) logger.warning(f"Failed to initialize server OTLP: {e}") self._initialized = False self._core_manager = None
@property
[docs] def tracer_provider(self) -> Optional[Any]: """Get the tracer provider from core manager.""" if self._core_manager: return self._core_manager.tracer_provider return None
@property
[docs] def logger_provider(self) -> Optional[Any]: """Get the logger provider from core manager.""" if self._core_manager: return self._core_manager.logger_provider return None
[docs] def get_tracer(self, name: str) -> Optional[Any]: """Get a tracer for the given name.""" if self._core_manager: return self._core_manager.get_tracer(name) return None
[docs] def instrument_app(self, app: Any) -> None: """Instrument FastAPI application with OpenTelemetry. This is server-specific functionality that should not be in core. """ if not OTLP_AVAILABLE or not self._initialized or self._fastapi_instrumented: return try: from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor FastAPIInstrumentor().instrument_app(app) self._fastapi_instrumented = True except Exception as e: logger = logging.getLogger(__name__) logger.warning(f"Failed to instrument FastAPI app: {e}")
@classmethod
[docs] def instrument_requests(cls: Type[ServerOTLPManager]) -> None: """Instrument requests library - server-specific implementation.""" if not OTLP_AVAILABLE: return try: from opentelemetry.instrumentation.requests import RequestsInstrumentor # Use a class-level guard on the singleton to avoid duplicate calls manager = get_server_otlp_manager() if not manager._requests_instrumented: RequestsInstrumentor().instrument() manager._requests_instrumented = True except ImportError: pass
@classmethod
[docs] def instrument_sqlalchemy(cls: Type[ServerOTLPManager]) -> None: """Instrument SQLAlchemy globally - server-specific implementation.""" if not OTLP_AVAILABLE: return try: from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor manager = get_server_otlp_manager() if not manager._sqlalchemy_instrumented: # Instrument globally once with desired options. SQLAlchemyInstrumentor().instrument( enable_commenter=True, skip_dep_check=True ) manager._sqlalchemy_instrumented = True except ImportError: pass
@classmethod
[docs] def instrument_engine(cls: Type[ServerOTLPManager], engine: Any) -> None: """Instrument a specific SQLAlchemy engine - server-specific implementation.""" if not OTLP_AVAILABLE: return try: from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor manager = get_server_otlp_manager() # If already instrumented globally, skip per-engine if not manager._sqlalchemy_instrumented: SQLAlchemyInstrumentor().instrument( engine=engine, enable_commenter=True, skip_dep_check=True ) manager._sqlalchemy_instrumented = True except ImportError: pass
# Singleton instance for server use
[docs] _server_otlp_manager: Optional[ServerOTLPManager] = None
[docs] def get_server_otlp_manager() -> ServerOTLPManager: """Get or create the server OTLP manager singleton.""" global _server_otlp_manager if _server_otlp_manager is None: _server_otlp_manager = ServerOTLPManager() _server_otlp_manager.initialize() return _server_otlp_manager
[docs] def initialize_server_otlp() -> ServerOTLPManager: """Initialize server-specific OTLP functionality. This should be called by the server during startup. Returns: The server OTLP manager instance """ return get_server_otlp_manager()