core.requester ============== .. py:module:: core.requester .. autoapi-nested-parse:: Requester object to make HTTP requests to the Jobmon FastAPI services. Attributes ---------- .. autoapisummary:: core.requester.logger Classes ------- .. autoapisummary:: core.requester.Requester Functions --------- .. autoapisummary:: core.requester.http_request_ok core.requester.get_content Module Contents --------------- .. py:data:: logger .. py:function:: http_request_ok(status_code: int) -> bool Return True if HTTP return codes that are deemed ok. .. py:class:: Requester(service_url: str, retries_timeout: int = 300, retries_attempts: int = 10, request_timeout: int = 20, use_otlp: bool = False) Handles HTTP requests to the jobmon server with configurable OTLP integration. .. py:attribute:: _otlp_manager :value: None .. py:attribute:: service_url .. py:attribute:: retries_timeout :value: 300 .. py:attribute:: retries_attempts :value: 10 .. py:attribute:: request_timeout :value: 20 .. py:attribute:: server_structlog_context :type: Dict[str, str] .. py:method:: _init_otlp() -> None :classmethod: Initialize OTLP tracing only - logging handled by client configuration. .. py:method:: from_defaults() -> Requester :classmethod: Instantiate a requester from default config values. .. py:property:: url :type: str Legacy property for backward compatibility. .. py:method:: add_server_structlog_context(**kwargs: Any) -> None Add the structlogging context if it has been provided. .. py:method:: _get_current_structlog_context() -> Dict[str, Any] Get current structlog context variables to pass to server. Captures context like workflow_run_id, cluster_name, etc. so the server can include them in its logs for correlation. .. py:method:: tracing_span(app_route: str, request_type: str) -> Any .. py:method:: _maybe_trace(func: Callable) -> Callable .. py:method:: _should_retry_exception(exception: Any) -> bool Determine if an exception should trigger a retry. .. py:method:: _maybe_retry(func: Callable, tenacious: bool) -> Any .. py:method:: _send_request(app_route: str, message: dict, request_type: str) -> Tuple[int, Any] .. py:method:: send_request(app_route: str, message: dict, request_type: str, tenacious: bool = True) -> Tuple[int, Any] Send a request to the Jobmon server. .. py:method:: _send_request_async(session: aiohttp.ClientSession, app_route: str, message: dict, request_type: str) -> Tuple[int, Any] :async: Async version of _send_request using aiohttp. .. py:method:: _get_content_async(response: aiohttp.ClientResponse) -> Tuple[int, Any] :async: Parse an aiohttp response, handling JSON and non-JSON content gracefully. :param response: The aiohttp ClientResponse object to parse. :returns: Tuple of (status_code, content) where content is parsed JSON or raw text/bytes. .. py:method:: _maybe_retry_async(func: Callable, tenacious: bool) -> Any Async version of _maybe_retry using tenacity async retry. .. py:method:: send_request_async(session: aiohttp.ClientSession, app_route: str, message: dict, request_type: str, tenacious: bool = True) -> Tuple[int, Any] :async: Send an async request to the Jobmon server with sophisticated retry logic. This method provides the same robust retry capabilities as the sync version, using tenacity for exponential backoff with jitter, timeout protection, and comprehensive exception handling. :param session: An active aiohttp ClientSession for making requests. :param app_route: The API route to request (will be appended to base URL). :param message: Dictionary containing the request payload. :param request_type: HTTP method - 'get', 'post', or 'put'. :param tenacious: Whether to enable retry logic (default: True). :returns: Tuple of (status_code, response_content). :raises InvalidRequest: For 4xx client errors (no retry). :raises InvalidResponse: For 5xx server errors after exhausting retries. :raises RuntimeError: If retry budget is exceeded. .. py:function:: get_content(response: Any) -> Tuple[int, Any] Parse the response.