Source code for core.requester

"""Requester object to make HTTP requests to the Jobmon Flask services."""

from __future__ import annotations

import contextlib
import functools
import json
import logging
from typing import Any, Callable, Dict, Tuple, Type

import requests
import tenacity
import urllib3

from jobmon.core import __version__
from jobmon.core.configuration import JobmonConfig
from jobmon.core.exceptions import InvalidRequest, InvalidResponse

[docs] logger = logging.getLogger(__name__)
[docs] def http_request_ok(status_code: int) -> bool: """Return True if HTTP return codes that are deemed ok.""" return status_code in (200, 302, 307)
[docs] class Requester(object): """Requester object to make HTTP requests to the Jobmon Flask services.""" # Class-level attribute to store the OtlpAPI instance
[docs] _otlp_api = None
def __init__( self, url: str, route_prefix: str = "", request_timeout: int = 20, retries_timeout: int = 300, retries_attempts: int = 10, use_otlp: bool = False, ) -> None: """Initialize the Requester object with the url to make requests to."""
[docs] self.base_url = url
[docs] self.route_prefix = route_prefix
[docs] self.request_timeout = request_timeout
[docs] self.retries_timeout = retries_timeout
[docs] self.retries_attempts = retries_attempts
if use_otlp and Requester._otlp_api is None: self._init_otlp()
[docs] self.server_structlog_context: Dict[str, str] = {}
@classmethod
[docs] def _init_otlp(cls: Type[Requester]) -> None: from jobmon.core.otlp import OtlpAPI # setup connections to backend otlp_instance = OtlpAPI() otlp_instance.instrument_requests() otlp_instance.correlate_logger("jobmon.core.requester") # setup tracer for Requester to use cls._otlp_api = otlp_instance
@classmethod
[docs] def from_defaults(cls: Type[Requester]) -> Requester: """Instantiate a requester from default config values.""" config = JobmonConfig() service_url = config.get("http", "service_url") route_prefix = config.get("http", "route_prefix") request_timeout = config.get_int("http", "request_timeout") retries_timeout = config.get_int("http", "retries_timeout") retries_attempts = config.get_int("http", "retries_attempts") use_otlp = config.get_boolean("otlp", "http_enabled") return cls( service_url, route_prefix, request_timeout, retries_timeout, retries_attempts, use_otlp, )
@property
[docs] def url(self) -> str: """Return the base url for the requester.""" return self.base_url + self.route_prefix
[docs] def add_server_structlog_context(self, **kwargs: Any) -> None: """Add the structlogging context if it has been provided.""" for key, value in kwargs.items(): self.server_structlog_context[key] = value
@contextlib.contextmanager
[docs] def tracing_span(self, app_route: str, request_type: str) -> Any: if self._otlp_api: tracer = self._otlp_api.get_tracer("requester") with tracer.start_as_current_span("send_request") as span: span.set_attribute("http.method", request_type.upper()) span.set_attribute("http.url", self.url + app_route) yield span else: yield None
[docs] def _maybe_trace(self, func: Callable) -> Callable: @functools.wraps(func) def wrapper(*args: Any, **kwargs: Any) -> Callable: with self.tracing_span( kwargs.get("app_route", "UNKNOWN"), kwargs.get("request_type", "UNKNOWN"), ): return func(*args, **kwargs) return wrapper
[docs] def _maybe_retry(self, func: Callable, tenacious: bool) -> Any: if not tenacious: return func def should_retry_exception(exception: Any) -> Any: """Return True if we should retry on the given exception.""" logger.warning(f"Exception occurred: {exception}") # Do not retry for certain client errors. if isinstance(exception, InvalidRequest): return False # Retry for specific exceptions. return isinstance( exception, ( InvalidResponse, TimeoutError, requests.ConnectionError, requests.adapters.MaxRetryError, requests.exceptions.ReadTimeout, urllib3.exceptions.NewConnectionError, urllib3.exceptions.MaxRetryError, ), ) def raise_if_exceed_retry(retry_state: tenacity.RetryCallState) -> Any: """If we trigger retry error, raise informative RuntimeError.""" # Check if the retry outcome is an exception outcome = retry_state.outcome if outcome and outcome.exception(): exception = outcome.exception() raise RuntimeError( f"Exceeded HTTP request retry budget due to: {exception}" ) from exception retrying = tenacity.retry( stop=( tenacity.stop_after_attempt(self.retries_attempts) | tenacity.stop_after_delay(self.retries_timeout) ), wait=tenacity.wait_exponential_jitter(initial=1, exp_base=2, jitter=1), retry=tenacity.retry_if_exception(should_retry_exception), retry_error_callback=raise_if_exceed_retry, )(func) return retrying
[docs] def _send_request( self, app_route: str, message: dict, request_type: str, ) -> Tuple[int, Any]: # construct url route = self.url + app_route logger.debug(f"Route: {route}, message: {message}") if request_type in ["post", "put"]: message["server_structlog_context"] = self.server_structlog_context else: {} # send request to server if request_type == "post": params = {"client_jobmon_version": __version__} response = requests.post( route, params=params, json=message, headers={"Content-Type": "application/json"}, timeout=self.request_timeout, ) elif request_type == "get": params = message.copy() params["client_jobmon_version"] = __version__ response = requests.get( route, params=params, data=json.dumps(self.server_structlog_context), headers={"Content-Type": "application/json"}, timeout=self.request_timeout, ) elif request_type == "put": params = {"client_jobmon_version": __version__} response = requests.put( route, params=params, json=message, headers={"Content-Type": "application/json"}, timeout=self.request_timeout, ) else: raise ValueError( f"request_type must be one of 'get', 'post', or 'put'. Got {request_type}" ) status_code, content = get_content(response) # Raise the InvalidResponse exception based on the logic from should_retry_result if 499 < status_code < 600 or status_code == 423: raise InvalidResponse( f"Request failed due to status code {status_code} from {request_type.upper()} " f"request through route {app_route}. Response content: {content}" ) # Keep the logic for other status codes that might be encountered but # aren't in the retry condition. if 400 <= status_code < 500: raise InvalidRequest( f"Client error with status code {status_code} from {request_type.upper()} " f"request through route {app_route}. Response content: {content}" ) return status_code, content
[docs] def send_request( self, app_route: str, message: dict, request_type: str, tenacious: bool = True ) -> Tuple[int, Any]: """Send a request to the Jobmon server.""" def send_fn( app_route: str, message: dict, request_type: str ) -> Tuple[int, Any]: return self._send_request(app_route, message, request_type) send_method = self._maybe_retry(send_fn, tenacious) send_with_trace = self._maybe_trace(send_method) res = send_with_trace( app_route=app_route, message=message, request_type=request_type ) return res
[docs] def get_content(response: Any) -> Tuple[int, Any]: """Parse the response.""" if "application/json" in response.headers.get("Content-Type", ""): try: content = response.json() except TypeError: # for test_client, response.json is a dict not fn content = response.json else: content = response.content return response.status_code, content