Source code for core.otlp.resources

"""OpenTelemetry resource detection for jobmon."""

from __future__ import annotations

import getpass
import os
import socket
import sys
from typing import Any, Dict, Optional

from jobmon.core import __version__
from jobmon.core.configuration import JobmonConfig

from . import OTLP_AVAILABLE

if OTLP_AVAILABLE:
    from opentelemetry.sdk import resources
    from opentelemetry.sdk.resources import ResourceDetector
else:
    # Dummy base class when OTLP is not available
[docs] class ResourceDetector: # type: ignore pass
[docs] def create_jobmon_resources() -> Optional[Any]: """Create OpenTelemetry resources for jobmon.""" if not OTLP_AVAILABLE: return None try: detectors: list[ResourceDetector] = [ ProcessResourceDetector(), HostResourceDetector(), JobmonServiceResourceDetector(), ] return resources.get_aggregated_resources(detectors) except ImportError: return None
[docs] class BaseJobmonResourceDetector(ResourceDetector): """Base class for jobmon resource detectors that handles common logic."""
[docs] def detect(self) -> Optional[Any]: # type: ignore[override] """Detect resource attributes using common jobmon logic.""" if not OTLP_AVAILABLE: return None try: config = JobmonConfig() # Get deployment environment with fallback try: deployment_environment = config.get( "telemetry", "deployment_environment" ) except Exception: deployment_environment = "unknown" # Get detector-specific attributes attrs = self._get_attributes(config, deployment_environment) return resources.Resource(attrs) # type: ignore[arg-type] except ImportError: return None
[docs] def _get_attributes( self, config: JobmonConfig, deployment_environment: str ) -> Dict[str, Any]: """Get detector-specific attributes. Must be implemented by subclasses.""" raise NotImplementedError
[docs] class ProcessResourceDetector(BaseJobmonResourceDetector): """Detects process-related resource attributes."""
[docs] def _get_attributes( self, config: JobmonConfig, deployment_environment: str ) -> Dict[str, Any]: """Get process-specific attributes.""" return { str(resources.PROCESS_PID): int(os.getpid()), str(resources.PROCESS_RUNTIME_NAME): str(sys.implementation.name), str(resources.PROCESS_OWNER): str(getpass.getuser()), str(resources.DEPLOYMENT_ENVIRONMENT): deployment_environment, }
[docs] class JobmonServiceResourceDetector(BaseJobmonResourceDetector): """Detects jobmon service-related resource attributes."""
[docs] def _get_attributes( self, config: JobmonConfig, deployment_environment: str ) -> Dict[str, Any]: """Get jobmon service-specific attributes.""" return { resources.SERVICE_NAME: "jobmon", resources.SERVICE_VERSION: __version__, str(resources.DEPLOYMENT_ENVIRONMENT): deployment_environment, }
[docs] class HostResourceDetector(BaseJobmonResourceDetector): """Detects host-related resource attributes."""
[docs] def _get_attributes( self, config: JobmonConfig, deployment_environment: str ) -> Dict[str, Any]: """Get host-specific attributes.""" return { resources.HOST_NAME: socket.gethostname(), str(resources.DEPLOYMENT_ENVIRONMENT): deployment_environment, }