from __future__ import annotations
import getpass
import logging
import logging.config
import os
import socket
import sys
from typing import Any, Callable, List, Optional, Tuple, Type
from flask import Flask
from opentelemetry import _logs
from opentelemetry import trace
from opentelemetry.sdk import resources
from opentelemetry.sdk._logs import LoggerProvider
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.trace import Tracer
from jobmon.core import __version__
from jobmon.core.configuration import JobmonConfig
[docs]
def get_resource(raise_on_error: bool) -> resources.Resource:
"""Gather data on the currently running process to define an opentelemetry resource.
Args:
raise_on_error: if True, will raise if an exception is encountered
Returns:
opentelemetry.sdk.resources.Resource
"""
detectors = [
_ServiceResourceDetector(raise_on_error=raise_on_error),
_ProcessResourceDetector(raise_on_error=raise_on_error),
_HostResourceDetector(raise_on_error=raise_on_error),
]
detected_resources = resources.get_aggregated_resources(detectors)
return detected_resources
[docs]
class _ProcessResourceDetector(resources.ResourceDetector):
[docs]
def detect(self) -> resources.Resource:
"""Returns a Resource related to the process."""
attrs = {
resources.PROCESS_PID: os.getpid(),
resources.PROCESS_RUNTIME_NAME: sys.implementation.name,
resources.PROCESS_OWNER: getpass.getuser(),
}
return resources.Resource(attrs)
[docs]
class _ServiceResourceDetector(resources.ResourceDetector):
[docs]
def detect(self) -> resources.Resource:
"""Returns a Resource related to the instrumentation library itself."""
attrs = {
resources.SERVICE_NAME: "jobmon",
resources.SERVICE_VERSION: __version__,
}
return resources.Resource(attrs)
[docs]
class _HostResourceDetector(resources.ResourceDetector):
[docs]
def detect(self) -> resources.Resource:
"""Returns a Resource related to the host machine."""
attrs = {resources.HOST_NAME: socket.gethostname()}
return resources.Resource(attrs)
[docs]
class _ClusterResourceDetector(resources.ResourceDetector):
[docs]
def detect(self) -> resources.Resource:
"""Returns a Resource related to the cluster job (if applicable)."""
attrs = {}
env_variables = ["SLURM_JOBID", "SLURM_JOB_ACCOUNT", "SLURM_JOB_NAME", "JOB_ID"]
for variable_str in env_variables:
variable_val = os.getenv(variable_str)
if variable_val:
attrs[variable_str.lower()] = variable_val
return resources.Resource(attrs)
[docs]
class OtlpAPI:
"""OpenTelemetry API."""
[docs]
_sqlalchemy_instrumented = False
[docs]
_requests_instrumented = False
[docs]
_log_config = {
"version": 1,
"disable_existing_loggers": False,
"root": {"level": "INFO", "handlers": []},
"formatters": {
"otel_jobmon": {
"class": "jobmon.core.otlp.OpenTelemetryLogFormatter",
"format": "%(asctime)s [%(levelname)s] [trace_id=%(trace_id)s,"
" span_id=%(span_id)s, parent_span_id=%(parent_span_id)s]"
" - %(message)s",
}
},
"handlers": {
"otel_jobmon": {
"class": "opentelemetry.sdk._logs.LoggingHandler",
"formatter": "otel_jobmon",
},
},
}
def __new__(cls: Type[OtlpAPI], *args: Any, **kwargs: Any) -> OtlpAPI:
if not cls._instance:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self, extra_detectors: List[resources.ResourceDetector] = []) -> None:
"""Initialize the OtlpAPI object."""
if OtlpAPI._initialized:
return
self._configure_resources(extra_detectors)
self._configure_providers()
OtlpAPI._initialized = True
[docs]
def _set_exporter(
self, kwargs: Any, add_processor_func: Callable, batch_processor: Any
) -> None:
module_name = kwargs["module"]
class_name = kwargs["class"]
module = __import__(module_name, fromlist=[class_name])
ExporterClass = getattr(module, class_name)
processor = batch_processor(
ExporterClass(
**{k: v for k, v in kwargs.items() if k not in ["module", "class"]}
)
)
add_processor_func(processor)
@classmethod
[docs]
def instrument_sqlalchemy(cls: Type[OtlpAPI]) -> None:
"""Instrument SQLAlchemy."""
if not cls._sqlalchemy_instrumented:
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
SQLAlchemyInstrumentor().instrument()
cls._sqlalchemy_instrumented = True
@classmethod
[docs]
def instrument_app(cls: Type[OtlpAPI], app: Flask) -> None:
"""Instrument Flask app."""
from opentelemetry.instrumentation.flask import FlaskInstrumentor
FlaskInstrumentor().instrument_app(app)
@classmethod
[docs]
def instrument_requests(cls: Type[OtlpAPI]) -> None:
"""Instrument requests."""
if not cls._requests_instrumented:
from opentelemetry.instrumentation.requests import RequestsInstrumentor
RequestsInstrumentor().instrument()
cls._requests_instrumented = True
[docs]
def get_tracer(self, name: str) -> Tracer:
"""Get a tracer."""
return trace.get_tracer(name)
[docs]
def get_logger_provider(self) -> LoggerProvider:
"""Get the logger provider."""
return _logs.get_logger_provider()
[docs]
def correlate_logger(self, logger_name: str, level: int = logging.INFO) -> None:
"""Correlate a logger with the current span."""
log_config = self._log_config.copy()
log_config.update(
{
"loggers": {
logger_name: {
"handlers": ["otel_jobmon"],
"level": level,
},
},
}
)
logging.config.dictConfig(log_config)
@staticmethod
[docs]
def get_span_details() -> Tuple[Optional[str], Optional[str], Optional[str]]:
"""Retrieve details of the current span."""
span = trace.get_current_span()
# Check if there's a valid span
if not span or not span.is_recording():
return None, None, None
ctx = span.get_span_context()
# Get parent span, but handle if it doesn't exist
parent = None
if hasattr(span, "parent"):
parent = span.parent
span_id = hex(ctx.span_id) if ctx and ctx.span_id else None
trace_id = hex(ctx.trace_id) if ctx and ctx.trace_id else None
parent_span_id = None if not parent else hex(parent.span_id)
return span_id, trace_id, parent_span_id