Maia/backend/app/services/auth_service.py

85 lines
3.2 KiB
Python

from sqlalchemy.orm import Session
from fastapi import HTTPException, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from app.models.user import User
from app.utils.security import (
hash_password,
verify_password,
create_access_token,
create_refresh_token,
decode_token,
)
from app.database import get_db
security = HTTPBearer()
def register_user(db: Session, email: str, password: str, name: str) -> User:
existing = db.query(User).filter(User.email == email).first()
if existing:
raise HTTPException(status_code=400, detail="Email already registered")
user = User(email=email, password_hash=hash_password(password), name=name, roles=[])
db.add(user)
db.commit()
db.refresh(user)
return user
def login_user(db: Session, email: str, password: str) -> dict:
user = db.query(User).filter(User.email == email).first()
if not user or not verify_password(password, user.password_hash):
raise HTTPException(status_code=401, detail="Invalid email or password")
access_token = create_access_token(str(user.id))
refresh_token = create_refresh_token(str(user.id))
return {"access_token": access_token, "refresh_token": refresh_token, "token_type": "bearer"}
def refresh_tokens(refresh_token: str) -> dict:
payload = decode_token(refresh_token)
if not payload or payload.get("type") != "refresh":
raise HTTPException(status_code=401, detail="Invalid or expired refresh token")
user_id = payload.get("sub")
access_token = create_access_token(user_id)
new_refresh = create_refresh_token(user_id)
return {"access_token": access_token, "refresh_token": new_refresh, "token_type": "bearer"}
def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db),
) -> User:
token = credentials.credentials
payload = decode_token(token)
if not payload or payload.get("type") != "access":
raise HTTPException(status_code=401, detail="Invalid or expired access token")
user_id = payload.get("sub")
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=401, detail="User not found")
return user
def is_admin(user: User) -> bool:
"""Check if user has maia.admin role."""
return "maia.admin" in (user.roles or [])
def get_or_create_sso_user(db: Session, user_info: dict, roles: list = None) -> User:
"""Look up user by email (SSO login). Create or update if found."""
import uuid as _uuid
email = user_info.get("email", "")
name = user_info.get("name", "") or email or "SSO User"
roles = roles or []
user = db.query(User).filter(User.email == email).first()
if user:
user.name = name
# Merge KC roles with existing DB roles (don't downgrade if KC lacks a role)
existing = set(user.roles or [])
incoming = set(roles or [])
user.roles = sorted(existing | incoming)
db.commit()
db.refresh(user)
return user
placeholder_hash = hash_password(str(_uuid.uuid4()))
user = User(email=email, password_hash=placeholder_hash, name=name, roles=roles)
db.add(user)
db.commit()
db.refresh(user)
return user