core.otlp.manager

Core OTLP manager for jobmon-scoped telemetry.

Attributes

_logger_provider

_logger_provider_lock

_shutdown_lock

_shutdown_invoked

_signal_handlers_installed

Classes

JobmonOTLPManager

OTLP manager for shared trace and log resources.

Functions

_build_exporter_args(→ Dict[str, Any])

_normalize_exporter_config(...)

_flush_otlp_once(→ None)

Flush OTLP telemetry a single time per process.

_register_atexit_hook(→ None)

Register the OTLP flush hook for interpreter shutdown.

_register_signal_handlers(→ None)

Install signal handlers that flush OTLP before termination.

_install_lifecycle_hooks(→ None)

Ensure lifecycle hooks are installed exactly once.

otlp_flush_on_exit(...)

Context manager that guarantees OTLP flush when exiting.

register_otlp_shutdown_event(→ None)

Register a FastAPI shutdown hook that flushes OTLP telemetry.

validate_otlp_exporter_config(→ list[str])

Validate OTLP exporter configuration and return list of issues.

initialize_jobmon_otlp(→ JobmonOTLPManager)

Initialize OTLP for shared resources (traces and logs).

get_shared_logger_provider(→ Optional[Any])

Get the shared logger provider, initializing if needed.

get_logger(→ Optional[Any])

Get a logger from the shared provider.

create_log_exporter(→ Optional[Any])

Create a pre-configured log exporter for client applications.

Module Contents

core.otlp.manager._logger_provider: Any | None = None[source]
core.otlp.manager._logger_provider_lock[source]
core.otlp.manager._shutdown_lock[source]
core.otlp.manager._shutdown_invoked = False[source]
core.otlp.manager._signal_handlers_installed = False[source]
core.otlp.manager._build_exporter_args(config: Dict[str, Any], module_name: str) Dict[str, Any][source]
core.otlp.manager._normalize_exporter_config(config: Any, defaults: Dict[str, Any] | None = None) Tuple[Type[Any] | None, Dict[str, Any]][source]
class core.otlp.manager.JobmonOTLPManager[source]

OTLP manager for shared trace and log resources.

This manager handles: - Trace provider setup (for distributed tracing) - Logger provider setup (for OTLP log export) - Resource detection (shared across components) - Request instrumentation (shared utility)

All OTLP handlers should use the shared logger provider to avoid duplicate connections and log emissions.

_instance: JobmonOTLPManager | None = None[source]
_instance_lock[source]
tracer_provider: Any | None = None[source]
logger_provider: Any | None = None[source]
_initialized = False[source]
_log_processor_configured = False[source]
_init_lock[source]
classmethod get_instance() JobmonOTLPManager[source]

Get or create the singleton OTLP manager with thread safety.

initialize() None[source]

Initialize trace and log providers with jobmon resources.

_configure_log_processor() None[source]

Configure log processor from telemetry configuration.

_create_log_exporter(config: Any) Any | None[source]

Create a log exporter from configuration dictionary.

_configure_span_exporters() None[source]

Configure span exporters from telemetry configuration.

_create_span_exporter(config: Any) Any | None[source]

Create a span exporter from configuration dictionary.

get_tracer(name: str) Any | None[source]

Get a tracer for distributed tracing.

classmethod instrument_requests() None[source]

Instrument requests library for HTTP tracing.

shutdown() None[source]

Shutdown trace and log providers.

flush_and_shutdown() None[source]

Flush pending OTLP telemetry and shut down providers.

_force_flush_logger_provider() None[source]
_force_flush_tracer_provider() None[source]
core.otlp.manager._flush_otlp_once(reason: str = 'atexit') None[source]

Flush OTLP telemetry a single time per process.

core.otlp.manager._register_atexit_hook() None[source]

Register the OTLP flush hook for interpreter shutdown.

core.otlp.manager._register_signal_handlers() None[source]

Install signal handlers that flush OTLP before termination.

core.otlp.manager._install_lifecycle_hooks() None[source]

Ensure lifecycle hooks are installed exactly once.

core.otlp.manager.otlp_flush_on_exit() Generator[JobmonOTLPManager | None, None, None][source]

Context manager that guarantees OTLP flush when exiting.

core.otlp.manager.register_otlp_shutdown_event(app: Any) None[source]

Register a FastAPI shutdown hook that flushes OTLP telemetry.

core.otlp.manager.validate_otlp_exporter_config(config: Any, exporter_type: str = 'log') list[str][source]

Validate OTLP exporter configuration and return list of issues.

Parameters:
  • config – Exporter configuration dictionary

  • exporter_type – Type of exporter (‘log’, ‘trace’, ‘metric’)

Returns:

List of validation error messages. Empty list if valid.

core.otlp.manager.initialize_jobmon_otlp() JobmonOTLPManager[source]

Initialize OTLP for shared resources (traces and logs).

This creates shared TracerProvider and LoggerProvider instances that should be used by all OTLP handlers to avoid duplicate connections.

Returns:

The OTLP manager instance with shared providers

core.otlp.manager.get_shared_logger_provider() Any | None[source]

Get the shared logger provider, initializing if needed.

This function provides a clean interface for handlers to access the shared LoggerProvider without dealing with manager instances directly. Uses double-checked locking to prevent race conditions in multi-threaded environments like Kubernetes with multiple workers.

Returns:

The shared LoggerProvider instance, or None if unavailable

core.otlp.manager.get_logger(name: str) Any | None[source]

Get a logger from the shared provider.

This is the cleanest way for handlers to get OTLP loggers without dealing with manager instances or initialization complexity.

Parameters:

name – Logger name (typically __name__)

Returns:

OTLP logger instance, or None if unavailable

core.otlp.manager.create_log_exporter(**kwargs: Any) Any | None[source]

Create a pre-configured log exporter for client applications.

This factory function creates exporters that can be passed to JobmonOTLPLoggingHandler for pure separation.

Parameters:

**kwargs – Exporter configuration (endpoint, headers, etc.)

Returns:

Pre-configured OTLP log exporter, or None if unavailable

Example

exporter = create_log_exporter(

endpoint=”otelcol.dev.aks:443”, max_batch_size=8

) handler = JobmonOTLPLoggingHandler(exporter=exporter)