from typing import Any, List, Mapping
import structlog
from fastapi import HTTPException
from starlette.requests import Request
from typing_extensions import TypedDict
from jobmon.core.configuration import JobmonConfig
from jobmon.core.exceptions import ConfigError
[docs]
_CONFIG = JobmonConfig()
[docs]
logger = structlog.get_logger(__name__)
[docs]
class User(TypedDict):
[docs]
preferred_username: str
[docs]
def to_user_dict(data: Mapping[str, Any]) -> User:
"""to_user_dict.
Converts dict to User TypedDict.
"""
return User(
sub=data["sub"],
email=data["email"],
preferred_username=data["preferred_username"],
name=data["name"],
updated_at=data["updated_at"],
given_name=data["given_name"],
family_name=data["family_name"],
groups=data["groups"],
nonce=data["nonce"],
at_hash=data["at_hash"],
sid=data["sid"],
aud=data["aud"],
exp=data["exp"],
iat=data["iat"],
iss=data["iss"],
)
[docs]
def get_user(request: Request) -> User:
"""get_user.
A shared function to get the user from the session.
Make it a method to mock in testing.
"""
# for dev and production
user = request.session.get("user")
if not user:
raise HTTPException(status_code=403, detail="Unauthorized")
# if 'exp' not in user or user['exp'] < int(datetime.now().timestamp()):
# raise HTTPException(status_code=403, detail="Session expired")
return to_user_dict(user)
[docs]
def get_request_username(request: Request) -> str:
"""get_request_username.
Returns the username part of the email address from the request.
"""
email = get_user(request)["email"]
return email.split("@")[0]
[docs]
def user_in_group(request: Request, group: str) -> bool:
"""user_in_group.
Check is a user is a member of the specified group.
"""
user = get_user(request)
groups: List[str] = user["groups"]
logger.info(f"{groups}")
if group in groups:
return True
return False
[docs]
def is_super_user(user: User) -> bool:
"""is_super_user.
Checks if a user is a member of the superuser group defined in the
OIDC__ADMIN_GROUP configuration option.
"""
admin_group = _CONFIG.get("oidc", "admin_group")
return admin_group in user["groups"]
[docs]
def is_auth_enabled() -> bool:
"""Check if authentication is enabled."""
config = JobmonConfig()
try:
return config.get_boolean("auth", "enabled")
except ConfigError:
return True # Default to enabled for backwards compatibility
[docs]
def create_anonymous_user() -> User:
"""Create an anonymous user for unauthenticated mode."""
return User(
sub="anonymous",
email="anonymous@localhost",
preferred_username="anonymous",
name="Anonymous User",
updated_at=0,
given_name="Anonymous",
family_name="User",
groups=["anonymous"],
nonce="",
at_hash="",
sid="",
aud="",
exp=0,
iat=0,
iss="localhost",
)
[docs]
def get_user_or_anonymous(request: Request) -> User:
"""Get user or return anonymous user when auth is disabled."""
if not is_auth_enabled():
return create_anonymous_user()
return get_user(request)