Maia/backend/app/utils/keycloak_auth.py

51 lines
1.5 KiB
Python

import jwt
from jwt import PyJWKClient
from typing import Optional
JWKS_URI = "https://auth.asaptelecom.com.br/realms/one/protocol/openid-connect/certs"
ISSUER = "https://auth.asaptelecom.com.br/realms/one"
AUDIENCE = "maia-app"
jwks_client = PyJWKClient(JWKS_URI, cache_keys=True)
def validate_keycloak_token(token: str) -> Optional[dict]:
"""
Validate a Keycloak RS256 JWT token.
Returns decoded payload or None if invalid.
"""
try:
signing_key = jwks_client.get_signing_key_from_jwt(token)
payload = jwt.decode(
token,
signing_key.key,
algorithms=["RS256"],
issuer=ISSUER,
options={"verify_aud": False},
)
return payload
except Exception as e:
print(f"[keycloak_auth] Token validation failed: {e}")
return None
def has_role(payload: dict, role: str) -> bool:
"""Check if role is present in realm_access or resource_access."""
realm_roles = payload.get("realm_access", {}).get("roles", [])
if role in realm_roles:
return True
for client_roles in payload.get("resource_access", {}).values():
if role in client_roles.get("roles", []):
return True
return False
def extract_user_info(payload: dict) -> dict:
"""Extract user info from KC token."""
return {
"sub": payload.get("sub"),
"email": payload.get("email", ""),
"name": (payload.get("name") or
f"{payload.get('given_name', '')} {payload.get('family_name', '')}".strip()),
}