61 lines
2.3 KiB
Python
61 lines
2.3 KiB
Python
from fastapi import APIRouter, Depends, HTTPException
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
|
from pydantic import BaseModel
|
|
from sqlalchemy.orm import Session
|
|
from app.database import get_db
|
|
from app.schemas.auth import UserRegister, UserLogin, TokenRefresh, TokenResponse, UserResponse
|
|
from app.services.auth_service import (
|
|
register_user,
|
|
login_user,
|
|
refresh_tokens,
|
|
get_current_user,
|
|
get_or_create_sso_user,
|
|
)
|
|
from app.utils.keycloak_auth import validate_keycloak_token, has_role, extract_user_info
|
|
from app.utils.security import create_access_token, create_refresh_token
|
|
|
|
MAIA_ROLES = ["maia.app", "maia.admin"]
|
|
|
|
router = APIRouter()
|
|
_bearer = HTTPBearer()
|
|
|
|
@router.post("/register", response_model=UserResponse, status_code=201)
|
|
def register(data: UserRegister, db: Session = Depends(get_db)):
|
|
return register_user(db, data.email, data.password, data.name)
|
|
|
|
@router.post("/login", response_model=TokenResponse)
|
|
def login(data: UserLogin, db: Session = Depends(get_db)):
|
|
return login_user(db, data.email, data.password)
|
|
|
|
@router.post("/refresh", response_model=TokenResponse)
|
|
def refresh(data: TokenRefresh):
|
|
return refresh_tokens(data.refresh_token)
|
|
|
|
@router.get("/me", response_model=UserResponse)
|
|
def me(current_user=Depends(get_current_user)):
|
|
return current_user
|
|
|
|
class KeycloakAuthRequest(BaseModel):
|
|
kc_token: str
|
|
|
|
@router.post("/keycloak", response_model=TokenResponse)
|
|
def keycloak_login(data: KeycloakAuthRequest, db: Session = Depends(get_db)):
|
|
"""Exchange a Keycloak RS256 access token for internal HS256 JWT tokens."""
|
|
payload = validate_keycloak_token(data.kc_token)
|
|
if not payload:
|
|
raise HTTPException(status_code=401, detail="Token Keycloak invalido ou expirado")
|
|
|
|
if not has_role(payload, "maia.app"):
|
|
raise HTTPException(status_code=403, detail="Acesso negado: role maia.app nao encontrada")
|
|
|
|
user_info = extract_user_info(payload)
|
|
|
|
# Collect all maia.* roles the user has
|
|
user_roles = [r for r in MAIA_ROLES if has_role(payload, r)]
|
|
|
|
user = get_or_create_sso_user(db, user_info, roles=user_roles)
|
|
|
|
access_token = create_access_token(str(user.id))
|
|
refresh_token = create_refresh_token(str(user.id))
|
|
return {"access_token": access_token, "refresh_token": refresh_token, "token_type": "bearer"}
|