from importlib import import_module
from fastapi import APIRouter
from starlette.responses import JSONResponse
from starlette.status import HTTP_200_OK
from jobmon.server.web import routes
# Create a router for version 3 of the API
[docs]
api_v3_router = APIRouter(tags=[version], prefix=f"/{version}")
# Create a separate health router without authentication
[docs]
api_v3_health_router = APIRouter(tags=[f"{version}-health"], prefix=f"/{version}")
for r in ["fsm", "cli", "reaper"]:
[docs]
mod = import_module(f"jobmon.server.web.routes.{version}.{r}")
router = getattr(mod, f"{r}_router")
api_v3_router.include_router(router)
# Shared routes
@api_v3_router.get("/")
[docs]
def is_alive() -> JSONResponse:
"""Test connectivity to the database."""
return routes.is_alive()
@api_v3_router.get("/time")
[docs]
def get_pst_now() -> JSONResponse:
"""Get the current time in the Pacific."""
return routes.get_pst_now()
@api_v3_health_router.get("/health")
[docs]
def health() -> JSONResponse:
"""Test connectivity to the app. Always unauthenticated for health checks."""
return routes.health()
@api_v3_router.get("/test_bad")
[docs]
def test_route() -> None:
"""Test route."""
return routes.test_route()
# Define an API version route
@api_v3_router.get("/api_version", status_code=HTTP_200_OK)
[docs]
def api_version() -> JSONResponse:
"""Test connectivity to the database."""
return JSONResponse(content={"status": version})