51 lines
1.5 KiB
Python
51 lines
1.5 KiB
Python
import jwt
|
|
from jwt import PyJWKClient
|
|
from typing import Optional
|
|
|
|
JWKS_URI = "https://auth.asaptelecom.com.br/realms/one/protocol/openid-connect/certs"
|
|
ISSUER = "https://auth.asaptelecom.com.br/realms/one"
|
|
AUDIENCE = "maia-app"
|
|
|
|
jwks_client = PyJWKClient(JWKS_URI, cache_keys=True)
|
|
|
|
|
|
def validate_keycloak_token(token: str) -> Optional[dict]:
|
|
"""
|
|
Validate a Keycloak RS256 JWT token.
|
|
Returns decoded payload or None if invalid.
|
|
"""
|
|
try:
|
|
signing_key = jwks_client.get_signing_key_from_jwt(token)
|
|
payload = jwt.decode(
|
|
token,
|
|
signing_key.key,
|
|
algorithms=["RS256"],
|
|
issuer=ISSUER,
|
|
options={"verify_aud": False},
|
|
)
|
|
return payload
|
|
except Exception as e:
|
|
print(f"[keycloak_auth] Token validation failed: {e}")
|
|
return None
|
|
|
|
|
|
def has_role(payload: dict, role: str) -> bool:
|
|
"""Check if role is present in realm_access or resource_access."""
|
|
realm_roles = payload.get("realm_access", {}).get("roles", [])
|
|
if role in realm_roles:
|
|
return True
|
|
for client_roles in payload.get("resource_access", {}).values():
|
|
if role in client_roles.get("roles", []):
|
|
return True
|
|
return False
|
|
|
|
|
|
def extract_user_info(payload: dict) -> dict:
|
|
"""Extract user info from KC token."""
|
|
return {
|
|
"sub": payload.get("sub"),
|
|
"email": payload.get("email", ""),
|
|
"name": (payload.get("name") or
|
|
f"{payload.get('given_name', '')} {payload.get('family_name', '')}".strip()),
|
|
}
|