Maia/backend/app/tasks/processing.py

137 lines
4.8 KiB
Python

import logging
import asyncio
from celery import Celery
from app.config import settings
from app.database import SessionLocal
from app.models.meeting import Meeting, MeetingStatus
from app.models.analysis import Analysis, AnalysisType
from app.services.whisper_service import transcribe_audio
from app.services.ollama_service import run_analysis
logger = logging.getLogger(__name__)
celery_app = Celery(
"meeting_assistant",
broker=settings.REDIS_URL,
backend=settings.REDIS_URL,
)
celery_app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="UTC",
enable_utc=True,
task_acks_late=True,
worker_prefetch_multiplier=1,
)
def run_async(coro):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(coro)
finally:
loop.close()
@celery_app.task(bind=True, max_retries=3, default_retry_delay=60)
def process_meeting(self, meeting_id: str):
"""
Full meeting processing pipeline:
1. Save audio (already done at upload)
2. Transcribe via Whisper
3. Analyze via Ollama (minutes, sentiment, psychological, communication, summary)
4. Update status to completed
"""
db = SessionLocal()
meeting = None
try:
meeting = db.query(Meeting).filter(Meeting.id == meeting_id).first()
if not meeting:
logger.error(f"Meeting {meeting_id} not found in DB")
return
# Step 1 - Mark processing
meeting.status = MeetingStatus.processing
db.commit()
logger.info(f"Processing meeting {meeting_id}")
# Step 2 - Transcription
meeting.status = MeetingStatus.transcribing
db.commit()
try:
transcription = run_async(transcribe_audio(meeting.audio_path))
meeting.transcription = transcription
db.commit()
logger.info(f"Transcription done for meeting {meeting_id}")
# Save participants from transcription
try:
from app.models.participant import MeetingParticipant
speaker_labels = {seg.get("speaker") for seg in transcription.get("segments", []) if seg.get("speaker")}
for label in speaker_labels:
existing = db.query(MeetingParticipant).filter(
MeetingParticipant.meeting_id == meeting.id,
MeetingParticipant.speaker_label == label,
).first()
if not existing:
db.add(MeetingParticipant(
meeting_id=meeting.id,
speaker_label=label,
identified_name=None if label.startswith("SPEAKER_") else label,
))
db.commit()
except Exception as e:
logger.warning(f"Failed to save participants: {e}")
except Exception as e:
logger.error(f"Transcription failed for {meeting_id}: {e}")
meeting.status = MeetingStatus.error
db.commit()
return
# Step 3 - AI Analyses
meeting.status = MeetingStatus.analyzing
db.commit()
analysis_types = ["minutes", "sentiment", "psychological", "communication", "summary"]
for analysis_type in analysis_types:
try:
content = run_async(run_analysis(analysis_type, transcription))
existing = (
db.query(Analysis)
.filter(Analysis.meeting_id == meeting_id, Analysis.type == AnalysisType(analysis_type))
.first()
)
if existing:
existing.content = content
existing.model_used = settings.OLLAMA_MODEL
else:
analysis = Analysis(
meeting_id=meeting.id,
type=AnalysisType(analysis_type),
content=content,
model_used=settings.OLLAMA_MODEL,
)
db.add(analysis)
db.commit()
logger.info(f"Analysis '{analysis_type}' done for meeting {meeting_id}")
except Exception as e:
logger.error(f"Analysis '{analysis_type}' failed for {meeting_id}: {e}")
# Step 4 - Mark completed
meeting.status = MeetingStatus.completed
db.commit()
logger.info(f"Meeting {meeting_id} processing COMPLETED")
except Exception as e:
logger.error(f"Unexpected error processing meeting {meeting_id}: {e}")
if meeting:
meeting.status = MeetingStatus.error
db.commit()
raise self.retry(exc=e, countdown=60)
finally:
db.close()