import logging import asyncio from celery import Celery from app.config import settings from app.database import SessionLocal from app.models.meeting import Meeting, MeetingStatus from app.models.analysis import Analysis, AnalysisType from app.services.whisper_service import transcribe_audio from app.services.ollama_service import run_analysis logger = logging.getLogger(__name__) celery_app = Celery( "meeting_assistant", broker=settings.REDIS_URL, backend=settings.REDIS_URL, ) celery_app.conf.update( task_serializer="json", accept_content=["json"], result_serializer="json", timezone="UTC", enable_utc=True, task_acks_late=True, worker_prefetch_multiplier=1, ) def run_async(coro): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: return loop.run_until_complete(coro) finally: loop.close() @celery_app.task(bind=True, max_retries=3, default_retry_delay=60) def process_meeting(self, meeting_id: str): """ Full meeting processing pipeline: 1. Save audio (already done at upload) 2. Transcribe via Whisper 3. Analyze via Ollama (minutes, sentiment, psychological, communication, summary) 4. Update status to completed """ db = SessionLocal() meeting = None try: meeting = db.query(Meeting).filter(Meeting.id == meeting_id).first() if not meeting: logger.error(f"Meeting {meeting_id} not found in DB") return # Step 1 - Mark processing meeting.status = MeetingStatus.processing db.commit() logger.info(f"Processing meeting {meeting_id}") # Step 2 - Transcription meeting.status = MeetingStatus.transcribing db.commit() try: transcription = run_async(transcribe_audio(meeting.audio_path)) meeting.transcription = transcription db.commit() logger.info(f"Transcription done for meeting {meeting_id}") # Save participants from transcription try: from app.models.participant import MeetingParticipant speaker_labels = {seg.get("speaker") for seg in transcription.get("segments", []) if seg.get("speaker")} for label in speaker_labels: existing = db.query(MeetingParticipant).filter( MeetingParticipant.meeting_id == meeting.id, MeetingParticipant.speaker_label == label, ).first() if not existing: db.add(MeetingParticipant( meeting_id=meeting.id, speaker_label=label, identified_name=None if label.startswith("SPEAKER_") else label, )) db.commit() except Exception as e: logger.warning(f"Failed to save participants: {e}") except Exception as e: logger.error(f"Transcription failed for {meeting_id}: {e}") meeting.status = MeetingStatus.error db.commit() return # Step 3 - AI Analyses meeting.status = MeetingStatus.analyzing db.commit() analysis_types = ["minutes", "sentiment", "psychological", "communication", "summary"] for analysis_type in analysis_types: try: content = run_async(run_analysis(analysis_type, transcription)) existing = ( db.query(Analysis) .filter(Analysis.meeting_id == meeting_id, Analysis.type == AnalysisType(analysis_type)) .first() ) if existing: existing.content = content existing.model_used = settings.OLLAMA_MODEL else: analysis = Analysis( meeting_id=meeting.id, type=AnalysisType(analysis_type), content=content, model_used=settings.OLLAMA_MODEL, ) db.add(analysis) db.commit() logger.info(f"Analysis '{analysis_type}' done for meeting {meeting_id}") except Exception as e: logger.error(f"Analysis '{analysis_type}' failed for {meeting_id}: {e}") # Step 4 - Mark completed meeting.status = MeetingStatus.completed db.commit() logger.info(f"Meeting {meeting_id} processing COMPLETED") except Exception as e: logger.error(f"Unexpected error processing meeting {meeting_id}: {e}") if meeting: meeting.status = MeetingStatus.error db.commit() raise self.retry(exc=e, countdown=60) finally: db.close()