Source code for server.web.routes.v3

from importlib import import_module

from fastapi import APIRouter
from starlette.responses import JSONResponse
from starlette.status import HTTP_200_OK

from jobmon.server.web import routes

[docs] version = "v3"
# Create a router for version 3 of the API
[docs] api_v3_router = APIRouter(tags=[version], prefix=f"/{version}")
# Create a separate health router without authentication
[docs] api_v3_health_router = APIRouter(tags=[f"{version}-health"], prefix=f"/{version}")
for r in ["fsm", "cli", "reaper"]:
[docs] mod = import_module(f"jobmon.server.web.routes.{version}.{r}")
router = getattr(mod, f"{r}_router") api_v3_router.include_router(router) # Shared routes @api_v3_router.get("/")
[docs] def is_alive() -> JSONResponse: """Test connectivity to the database.""" return routes.is_alive()
@api_v3_router.get("/time")
[docs] def get_pst_now() -> JSONResponse: """Get the current time in the Pacific.""" return routes.get_pst_now()
@api_v3_health_router.get("/health")
[docs] def health() -> JSONResponse: """Test connectivity to the app. Always unauthenticated for health checks.""" return routes.health()
@api_v3_router.get("/test_bad")
[docs] def test_route() -> None: """Test route.""" return routes.test_route()
# Define an API version route @api_v3_router.get("/api_version", status_code=HTTP_200_OK)
[docs] def api_version() -> JSONResponse: """Test connectivity to the database.""" return JSONResponse(content={"status": version})