from datetime import datetime import uuid from fastapi.responses import FileResponse import os import logging from typing import List, Optional from fastapi import APIRouter, Depends, UploadFile, File, HTTPException, Form, Query, Body from sqlalchemy.orm import Session, joinedload from app.database import get_db from app.models.meeting import Meeting, MeetingStatus from app.models.participant import MeetingParticipant from app.schemas.meeting import MeetingResponse, MeetingStatusResponse, ParticipantUpdate, ParticipantResponse from app.services.auth_service import get_current_user, is_admin from app.services.upload_service import save_audio_file from app.services.chunk_service import save_chunk, assemble_chunks, cleanup_chunks from app.tasks.processing import process_meeting import httpx from app.config import settings logger = logging.getLogger(__name__) router = APIRouter() AUDIO_STORAGE = "/opt/Backend/meeting-assistant-api/storage/audio" def _get_meeting_for_user(meeting_id: str, current_user, db: Session) -> Meeting: """Return meeting if user owns it or is admin. Raises 404 otherwise.""" q = db.query(Meeting).options(joinedload(Meeting.user)).filter(Meeting.id == meeting_id) if not is_admin(current_user): q = q.filter(Meeting.user_id == current_user.id) meeting = q.first() if not meeting: raise HTTPException(status_code=404, detail="Meeting not found") return meeting @router.post("/upload", response_model=MeetingResponse, status_code=201) async def upload_meeting( file: UploadFile = File(...), title: Optional[str] = Query(None), db: Session = Depends(get_db), current_user=Depends(get_current_user), ): meeting_id = str(uuid.uuid4()) audio_path = await save_audio_file(file, meeting_id) meeting = Meeting( id=meeting_id, title=title or file.filename or "Meeting", audio_path=audio_path, status=MeetingStatus.uploading, user_id=current_user.id, ) db.add(meeting) db.commit() db.refresh(meeting) process_meeting.delay(meeting_id) logger.info(f"Meeting {meeting_id} created and queued for processing") return meeting @router.post("/start", response_model=MeetingResponse, status_code=201) async def start_meeting( title: Optional[str] = Query(None), db: Session = Depends(get_db), current_user=Depends(get_current_user), ): meeting_id = str(uuid.uuid4()) from datetime import timezone, timedelta br_tz = timezone(timedelta(hours=-3)) now_br = datetime.now(br_tz) default_title = f"{current_user.name} {now_br.strftime('%d/%m/%Y, %H:%M')}" meeting = Meeting( id=meeting_id, title=title or default_title, status=MeetingStatus.receiving_chunks, user_id=current_user.id, ) db.add(meeting) db.commit() db.refresh(meeting) logger.info(f"Meeting {meeting_id} started (chunk mode)") return meeting @router.get("", response_model=List[MeetingResponse]) def list_meetings( skip: int = 0, limit: int = 50, db: Session = Depends(get_db), current_user=Depends(get_current_user), ): """List meetings. Admins see all meetings with owner info; users see only their own.""" q = db.query(Meeting).options(joinedload(Meeting.user)) if not is_admin(current_user): q = q.filter(Meeting.user_id == current_user.id) meetings = q.order_by(Meeting.created_at.desc()).offset(skip).limit(limit).all() return meetings @router.post("/{meeting_id}/chunks", status_code=200) async def upload_chunk( meeting_id: str, file: UploadFile = File(...), chunk_index: int = Form(...), db: Session = Depends(get_db), current_user=Depends(get_current_user), ): meeting = db.query(Meeting).filter( Meeting.id == meeting_id, Meeting.user_id == current_user.id ).first() if not meeting: raise HTTPException(status_code=404, detail="Meeting not found") if meeting.status not in (MeetingStatus.receiving_chunks, MeetingStatus.uploading): raise HTTPException(status_code=409, detail=f"Meeting not accepting chunks (status: {meeting.status})") ext = os.path.splitext(file.filename or "chunk.webm")[1].lower() or ".webm" data = await file.read() save_chunk(data, meeting_id, chunk_index, ext) logger.info(f"Chunk {chunk_index} received for meeting {meeting_id} ({len(data)} bytes)") return {"meeting_id": meeting_id, "chunk_index": chunk_index, "size": len(data)} @router.post("/{meeting_id}/finalize", response_model=MeetingResponse) async def finalize_meeting( meeting_id: str, title: Optional[str] = Query(None), db: Session = Depends(get_db), current_user=Depends(get_current_user), ): meeting = db.query(Meeting).filter( Meeting.id == meeting_id, Meeting.user_id == current_user.id ).first() if not meeting: raise HTTPException(status_code=404, detail="Meeting not found") if title and not meeting.title: meeting.title = title try: audio_path = assemble_chunks(meeting_id) meeting.audio_path = audio_path meeting.status = MeetingStatus.uploading db.commit() cleanup_chunks(meeting_id) process_meeting.delay(meeting_id) logger.info(f"Meeting {meeting_id} finalized and queued for processing") except Exception as e: meeting.status = MeetingStatus.error db.commit() logger.error(f"Failed to finalize meeting {meeting_id}: {e}") raise HTTPException(status_code=500, detail=f"Failed to assemble audio: {str(e)}") db.refresh(meeting) return meeting @router.get("/{meeting_id}", response_model=MeetingResponse) def get_meeting( meeting_id: str, db: Session = Depends(get_db), current_user=Depends(get_current_user), ): return _get_meeting_for_user(meeting_id, current_user, db) @router.get("/{meeting_id}/status", response_model=MeetingStatusResponse) def get_meeting_status( meeting_id: str, db: Session = Depends(get_db), current_user=Depends(get_current_user), ): return _get_meeting_for_user(meeting_id, current_user, db) @router.get("/{meeting_id}/audio") def get_meeting_audio( meeting_id: str, db: Session = Depends(get_db), current_user=Depends(get_current_user), ): """Stream the audio file for a meeting.""" meeting = _get_meeting_for_user(meeting_id, current_user, db) if not meeting.audio_path or not os.path.exists(meeting.audio_path): # Try common extensions for ext in [".webm", ".wav", ".m4a", ".mp3", ".ogg"]: candidate = os.path.join(AUDIO_STORAGE, f"{meeting_id}{ext}") if os.path.exists(candidate): meeting.audio_path = candidate break else: raise HTTPException(status_code=404, detail="Audio file not found") ext = os.path.splitext(meeting.audio_path)[1].lower() mime_map = {".webm": "audio/webm", ".wav": "audio/wav", ".mp3": "audio/mpeg", ".m4a": "audio/mp4", ".ogg": "audio/ogg"} media_type = mime_map.get(ext, "audio/webm") return FileResponse(meeting.audio_path, media_type=media_type) @router.get("/{meeting_id}/analyses") def get_meeting_analyses( meeting_id: str, db: Session = Depends(get_db), current_user=Depends(get_current_user), ): meeting = _get_meeting_for_user(meeting_id, current_user, db) return meeting.analyses @router.get("/{meeting_id}/analyses/{analysis_type}") def get_meeting_analysis_by_type( meeting_id: str, analysis_type: str, db: Session = Depends(get_db), current_user=Depends(get_current_user), ): meeting = _get_meeting_for_user(meeting_id, current_user, db) from app.models.analysis import AnalysisType, Analysis try: atype = AnalysisType(analysis_type) except ValueError: raise HTTPException(status_code=400, detail=f"Invalid analysis type '{analysis_type}'. Valid: minutes, sentiment, psychological, communication, summary") analysis = db.query(Analysis).filter( Analysis.meeting_id == meeting_id, Analysis.type == atype ).first() if not analysis: raise HTTPException(status_code=404, detail=f"Analysis '{analysis_type}' not yet available") return analysis @router.put("/{meeting_id}/participants/{speaker_label}", response_model=ParticipantResponse) def update_participant( meeting_id: str, speaker_label: str, data: ParticipantUpdate, db: Session = Depends(get_db), current_user=Depends(get_current_user), ): meeting = _get_meeting_for_user(meeting_id, current_user, db) participant = db.query(MeetingParticipant).filter( MeetingParticipant.meeting_id == meeting_id, MeetingParticipant.speaker_label == speaker_label, ).first() if not participant: participant = MeetingParticipant(meeting_id=meeting_id, speaker_label=speaker_label) db.add(participant) if data.identified_name is not None: participant.identified_name = data.identified_name if data.confidence_score is not None: participant.confidence_score = data.confidence_score db.commit() db.refresh(participant) return participant @router.post("/{meeting_id}/participants/{speaker_label}/enroll", response_model=ParticipantResponse) async def enroll_participant( meeting_id: str, speaker_label: str, name: str = Body(...), email: str = Body(None), db: Session = Depends(get_db), current_user=Depends(get_current_user), ): """Enroll a speaker voice profile and update their name in the transcript.""" import subprocess import json as _json import tempfile meeting = _get_meeting_for_user(meeting_id, current_user, db) if not meeting.transcription: raise HTTPException(status_code=400, detail="No transcription available") if not meeting.audio_path or not os.path.exists(meeting.audio_path): raise HTTPException(status_code=404, detail="Meeting audio file not found") segments = meeting.transcription.get("segments", []) speaker_segments = [s for s in segments if s.get("speaker") == speaker_label] if not speaker_segments: raise HTTPException(status_code=404, detail=f"Speaker '{speaker_label}' not found in transcript") valid_segs = [s for s in speaker_segments if (s.get("end", 0) - s.get("start", 0)) >= 0.5][:12] tmp_path = None profile_id = None try: if valid_segs: filter_parts = [] out_parts = [] for i, seg in enumerate(valid_segs): start = seg["start"] end = seg["end"] filter_parts.append(f"[0:a]atrim={start}:{end},asetpts=PTS-STARTPTS[a{i}]") out_parts.append(f"[a{i}]") n = len(filter_parts) filter_str = ";".join(filter_parts) + ";" + "".join(out_parts) + f"concat=n={n}:v=0:a=1[out]" with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp: tmp_path = tmp.name result = subprocess.run( ["ffmpeg", "-y", "-i", meeting.audio_path, "-filter_complex", filter_str, "-map", "[out]", "-ar", "16000", "-ac", "1", tmp_path], capture_output=True, timeout=120, ) audio_source = tmp_path if result.returncode == 0 else meeting.audio_path else: audio_source = meeting.audio_path whisper_url = settings.WHISPER_SERVER_URL with open(audio_source, "rb") as audio_f: audio_bytes = audio_f.read() async with httpx.AsyncClient(timeout=120.0) as http: resp = await http.post( f"{whisper_url}/api/v2/enroll", files={"file": (f"{name.replace(' ', '_')}.wav", audio_bytes, "audio/wav")}, data={ "name": name, "email": email or "", "metadata": _json.dumps({"source_meeting": meeting_id, "original_label": speaker_label}), }, ) if resp.status_code in (200, 201): profile_data = resp.json() profile_id = profile_data.get("id") else: logger.warning(f"Enrollment API error {resp.status_code}: {resp.text[:200]}") except HTTPException: raise except Exception as e: logger.warning(f"Enrollment process error: {e}") finally: if tmp_path and os.path.exists(tmp_path): try: os.unlink(tmp_path) except Exception: pass participant = db.query(MeetingParticipant).filter( MeetingParticipant.meeting_id == meeting.id, MeetingParticipant.speaker_label == speaker_label, ).first() if not participant: participant = MeetingParticipant(meeting_id=meeting.id, speaker_label=speaker_label) db.add(participant) participant.identified_name = name if profile_id: try: from uuid import UUID as _UUID participant.voice_profile_id = _UUID(profile_id) except Exception: pass updated_segments = [ {**seg, "speaker": name} if seg.get("speaker") == speaker_label else seg for seg in segments ] meeting.transcription = {**meeting.transcription, "segments": updated_segments} from sqlalchemy.orm.attributes import flag_modified flag_modified(meeting, "transcription") db.commit() db.refresh(participant) logger.info(f"Speaker '{speaker_label}' identified as '{name}', profile_id={profile_id}") return participant @router.delete("/{meeting_id}", status_code=204) def delete_meeting( meeting_id: str, db: Session = Depends(get_db), current_user=Depends(get_current_user), ): """Delete a meeting. Only maia.admin users can delete.""" if not is_admin(current_user): raise HTTPException(status_code=403, detail="Apenas administradores podem excluir reunioes") meeting = db.query(Meeting).filter(Meeting.id == meeting_id).first() if not meeting: raise HTTPException(status_code=404, detail="Meeting not found") db.delete(meeting) db.commit()