core.config.structlog_config

Shared structlog configuration for all jobmon components.

This module provides structlog configuration that enables: 1. Context variable merging (required for @bind_context decorator) 2. Basic stdlib metadata decoration (logger name, level) while deferring

rendering/formatting to the host application

  1. Optional Jobmon telemetry isolation and OTLP capture

  2. Optional component identification in logs

Attributes

_thread_local

_otlp_handler_count

_otlp_capture_lock

Classes

_NamedPrintLogger

Wrapper that adds a name attribute to PrintLogger.

_NamedPrintLoggerFactory

Factory that decorates PrintLogger instances with a name.

Functions

_store_event_dict_for_otlp(→ Dict[str, Any])

Store event_dict in thread-local storage for OTLP exporters.

_forward_event_to_logging_handlers(→ Dict[str, Any])

Forward structlog events to stdlib handlers when hosts render directly.

_prune_event_dict_for_console(→ Dict[str, Any])

Strip Jobmon-bound metadata while leaving host formatting intact.

_ensure_logger_name(→ Dict[str, Any])

Ensure the event dict contains a logger field.

_wrap_print_logger_factory(→ Any)

Decorate PrintLogger factories so produced loggers expose name.

_wrap_wrapper_class_for_otlp(→ Any)

Ensure filtered log levels still execute processors for OTLP capture.

_build_structlog_processor_chain(→ List[Callable])

Compose the processor chain shared across Jobmon structlog setup paths.

_extract_exc_info(→ Optional[Any])

Translate structlog exception metadata into logging exc_info tuple.

enable_structlog_otlp_capture(→ None)

Enable thread-local capture for OTLP handlers.

disable_structlog_otlp_capture(→ None)

Disable OTLP capture (used for tests or when handlers are removed).

structlog_otlp_capture_enabled(→ Iterator[None])

Context manager to enable OTLP capture temporarily.

configure_structlog(→ None)

Configure structlog for jobmon components.

_create_component_processor(→ Callable)

Create a processor that adds component name to all log events.

create_telemetry_isolation_processor(→ Callable[[Any, ...)

Create a processor that isolates telemetry metadata to specific logger prefixes.

configure_structlog_with_otlp(→ None)

Configure structlog with OTLP trace integration if enabled.

prepend_jobmon_processors_to_existing_config(→ None)

Prepend Jobmon processors to an existing structlog configuration.

is_structlog_configured(→ bool)

Check if structlog has been configured.

_iter_factory_candidates(→ Iterator[Any])

Yield the given factory and any nested factories exposed via common attrs.

_looks_like_direct_rendering_factory(→ bool)

Best-effort detection of structlog PrintLogger-based factories.

_wrapper_indicates_direct_rendering(→ bool)

Detect wrapper classes that imply direct rendering (e.g., PrintLogger).

_uses_stdlib_integration(→ bool)

Best-effort detection of stdlib integration vs direct rendering.

_processor_present(→ bool)

Return True when the exact processor function is already present.

_has_isolation_processor(→ bool)

Check whether an isolation processor for the given prefixes already exists.

Module Contents

class core.config.structlog_config._NamedPrintLogger(logger_name: str, base_logger: structlog._output.PrintLogger)[source]

Wrapper that adds a name attribute to PrintLogger.

__slots__ = ('_logger', 'name')[source]
_logger[source]
name[source]
__getattr__(item: str) Any[source]
__repr__() str[source]
class core.config.structlog_config._NamedPrintLoggerFactory(base_factory: structlog._output.PrintLoggerFactory)[source]

Factory that decorates PrintLogger instances with a name.

__jobmon_named_factory__ = True[source]
_base_factory[source]
__call__(*args: Any) _NamedPrintLogger[source]
core.config.structlog_config._thread_local[source]
core.config.structlog_config._otlp_handler_count = 0[source]
core.config.structlog_config._otlp_capture_lock[source]
core.config.structlog_config._store_event_dict_for_otlp(logger: Any, method_name: str, event_dict: Dict[str, Any]) Dict[str, Any][source]

Store event_dict in thread-local storage for OTLP exporters.

core.config.structlog_config._forward_event_to_logging_handlers(logger: Any, method_name: str, event_dict: Dict[str, Any]) Dict[str, Any][source]

Forward structlog events to stdlib handlers when hosts render directly.

core.config.structlog_config._prune_event_dict_for_console(event_dict: Dict[str, Any]) Dict[str, Any][source]

Strip Jobmon-bound metadata while leaving host formatting intact.

Removes all keys starting with ‘telemetry_’ prefix along with helper fields added for OTLP forwarding (logger and level). This keeps console output clean while preserving the full context for OTLP exports.

core.config.structlog_config._ensure_logger_name(logger: Any, method_name: str, event_dict: Dict[str, Any]) Dict[str, Any][source]

Ensure the event dict contains a logger field.

core.config.structlog_config._wrap_print_logger_factory(factory: Any) Any[source]

Decorate PrintLogger factories so produced loggers expose name.

core.config.structlog_config._wrap_wrapper_class_for_otlp(wrapper_class: Any) Any[source]

Ensure filtered log levels still execute processors for OTLP capture.

core.config.structlog_config._build_structlog_processor_chain(*, uses_stdlib_integration: bool, component_name: str | None, telemetry_logger_prefixes: Sequence[str] | None, extra_processors: Iterable[Callable], include_store_for_otlp: bool, include_wrap_for_formatter: bool, include_telemetry_isolation: bool = True) List[Callable][source]

Compose the processor chain shared across Jobmon structlog setup paths.

Parameters:
  • uses_stdlib_integration – If True, configures for stdlib logging integration. If False, configures for direct rendering (e.g., PrintLogger).

  • component_name – Optional component name to add to all logs.

  • telemetry_logger_prefixes – Logger name prefixes for telemetry isolation.

  • extra_processors – Additional processors to insert after telemetry isolation.

  • include_store_for_otlp – Whether to include OTLP event storage processor.

  • include_wrap_for_formatter – Whether to include ProcessorFormatter wrapper (for stdlib).

  • include_telemetry_isolation – Whether to include telemetry isolation processor.

core.config.structlog_config._extract_exc_info(event_dict: Dict[str, Any]) Any | None[source]

Translate structlog exception metadata into logging exc_info tuple.

core.config.structlog_config.enable_structlog_otlp_capture() None[source]

Enable thread-local capture for OTLP handlers.

core.config.structlog_config.disable_structlog_otlp_capture() None[source]

Disable OTLP capture (used for tests or when handlers are removed).

core.config.structlog_config.structlog_otlp_capture_enabled() Iterator[None][source]

Context manager to enable OTLP capture temporarily.

Ensures the reference count is decremented even if an exception is raised. Particularly useful in tests that need to toggle capture around assertions.

core.config.structlog_config.configure_structlog(component_name: str | None = None, *, extra_processors: Iterable[Callable] | None = None) None[source]

Configure structlog for jobmon components.

This function sets up structlog with processors that: 1. Merge context variables (from bind_contextvars and @bind_context decorator) 2. Add logger metadata (logger name, log level) 3. Optionally add a component field to the event_dict 4. Isolate Jobmon telemetry metadata to jobmon.* loggers 5. Capture the raw event_dict for OTLP handlers 6. Optionally include extra processors supplied by the caller

IMPORTANT: This must be called before using the @bind_context decorator or any structlog.contextvars.bind_contextvars() calls.

PROCESSOR CHAIN ORDER (after configuration): 1. merge_contextvars (Jobmon context) 2. component processor (optional, when component_name is provided) 3. filter_by_level (stdlib) 4. add_logger_name (stdlib) 5. add_log_level (stdlib) 6. telemetry isolation processor 7. _store_event_dict_for_otlp (Jobmon OTLP capture) 8. Extra processors supplied via extra_processors (if any) 9. ProcessorFormatter.wrap_for_formatter (stdlib - keeps stdlib handlers working)

Parameters:
  • component_name – Component name to add to all logs (e.g., “distributor”)

  • extra_processors – Additional structlog processors to append after Jobmon’s defaults (e.g., custom formatting or telemetry processors)

Example

>>> # Basic usage
>>> configure_structlog(component_name="distributor")
core.config.structlog_config._create_component_processor(component_name: str) Callable[source]

Create a processor that adds component name to all log events.

Parameters:

component_name – Name of the component (e.g., “distributor”, “worker”)

Returns:

Processor function that adds component field to event_dict

core.config.structlog_config.create_telemetry_isolation_processor(telemetry_prefixes: List[str]) Callable[[Any, str, Dict[str, Any]], Dict[str, Any]][source]

Create a processor that isolates telemetry metadata to specific logger prefixes.

Parameters:

telemetry_prefixes – List of logger name prefixes that should receive telemetry metadata (e.g., [“jobmon.”, “myapp.telemetry”]).

Returns:

A structlog processor function that isolates telemetry metadata.

core.config.structlog_config.configure_structlog_with_otlp(component_name: str) None[source]

Configure structlog with OTLP trace integration if enabled.

Checks JobmonConfig for telemetry.tracing settings and configures structlog with OpenTelemetry trace processors if enabled.

This is the recommended function to call from component CLIs as it automatically handles OTLP configuration based on settings.

Parameters:

component_name – Component name (e.g., “distributor”, “worker”)

Example

>>> # In distributor CLI
>>> configure_structlog_with_otlp(component_name="distributor")
>>> # In worker CLI
>>> configure_structlog_with_otlp(component_name="worker")
core.config.structlog_config.prepend_jobmon_processors_to_existing_config() None[source]

Prepend Jobmon processors to an existing structlog configuration.

This is called when Jobmon is used as a library and the host application has already configured structlog. It intelligently prepends Jobmon’s processors to the existing processor chain to enable telemetry context management and isolation while preserving the host application’s final rendering.

Adapts to the host app’s logging architecture: - Stdlib integration adds merge_contextvars, filter_by_level,

add_logger_name, and telemetry isolation

  • Direct rendering (like FHS) adds merge_contextvars and telemetry isolation

  • Host processors remain untouched so final rendering is preserved

core.config.structlog_config.is_structlog_configured() bool[source]

Check if structlog has been configured.

Returns:

True if structlog is configured, False otherwise

Example

>>> if not is_structlog_configured():
...     configure_structlog(component_name="my_component")
core.config.structlog_config._iter_factory_candidates(factory: Any) Iterator[Any][source]

Yield the given factory and any nested factories exposed via common attrs.

core.config.structlog_config._looks_like_direct_rendering_factory(factory: Any) bool[source]

Best-effort detection of structlog PrintLogger-based factories.

core.config.structlog_config._wrapper_indicates_direct_rendering(wrapper_class: Any) bool[source]

Detect wrapper classes that imply direct rendering (e.g., PrintLogger).

core.config.structlog_config._uses_stdlib_integration(logger_factory: Any, wrapper_class: Any) bool[source]

Best-effort detection of stdlib integration vs direct rendering.

core.config.structlog_config._processor_present(processors: Iterable[Callable], processor: Callable) bool[source]

Return True when the exact processor function is already present.

core.config.structlog_config._has_isolation_processor(processors: Iterable[Callable], prefixes: tuple[str, Ellipsis]) bool[source]

Check whether an isolation processor for the given prefixes already exists.