137 lines
4.8 KiB
Python
137 lines
4.8 KiB
Python
import logging
|
|
import asyncio
|
|
from celery import Celery
|
|
from app.config import settings
|
|
from app.database import SessionLocal
|
|
from app.models.meeting import Meeting, MeetingStatus
|
|
from app.models.analysis import Analysis, AnalysisType
|
|
from app.services.whisper_service import transcribe_audio
|
|
from app.services.ollama_service import run_analysis
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
celery_app = Celery(
|
|
"meeting_assistant",
|
|
broker=settings.REDIS_URL,
|
|
backend=settings.REDIS_URL,
|
|
)
|
|
|
|
celery_app.conf.update(
|
|
task_serializer="json",
|
|
accept_content=["json"],
|
|
result_serializer="json",
|
|
timezone="UTC",
|
|
enable_utc=True,
|
|
task_acks_late=True,
|
|
worker_prefetch_multiplier=1,
|
|
)
|
|
|
|
|
|
def run_async(coro):
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
try:
|
|
return loop.run_until_complete(coro)
|
|
finally:
|
|
loop.close()
|
|
|
|
|
|
@celery_app.task(bind=True, max_retries=3, default_retry_delay=60)
|
|
def process_meeting(self, meeting_id: str):
|
|
"""
|
|
Full meeting processing pipeline:
|
|
1. Save audio (already done at upload)
|
|
2. Transcribe via Whisper
|
|
3. Analyze via Ollama (minutes, sentiment, psychological, communication, summary)
|
|
4. Update status to completed
|
|
"""
|
|
db = SessionLocal()
|
|
meeting = None
|
|
try:
|
|
meeting = db.query(Meeting).filter(Meeting.id == meeting_id).first()
|
|
if not meeting:
|
|
logger.error(f"Meeting {meeting_id} not found in DB")
|
|
return
|
|
|
|
# Step 1 - Mark processing
|
|
meeting.status = MeetingStatus.processing
|
|
db.commit()
|
|
logger.info(f"Processing meeting {meeting_id}")
|
|
|
|
# Step 2 - Transcription
|
|
meeting.status = MeetingStatus.transcribing
|
|
db.commit()
|
|
|
|
try:
|
|
transcription = run_async(transcribe_audio(meeting.audio_path))
|
|
meeting.transcription = transcription
|
|
db.commit()
|
|
logger.info(f"Transcription done for meeting {meeting_id}")
|
|
|
|
# Save participants from transcription
|
|
try:
|
|
from app.models.participant import MeetingParticipant
|
|
speaker_labels = {seg.get("speaker") for seg in transcription.get("segments", []) if seg.get("speaker")}
|
|
for label in speaker_labels:
|
|
existing = db.query(MeetingParticipant).filter(
|
|
MeetingParticipant.meeting_id == meeting.id,
|
|
MeetingParticipant.speaker_label == label,
|
|
).first()
|
|
if not existing:
|
|
db.add(MeetingParticipant(
|
|
meeting_id=meeting.id,
|
|
speaker_label=label,
|
|
identified_name=None if label.startswith("SPEAKER_") else label,
|
|
))
|
|
db.commit()
|
|
except Exception as e:
|
|
logger.warning(f"Failed to save participants: {e}")
|
|
except Exception as e:
|
|
logger.error(f"Transcription failed for {meeting_id}: {e}")
|
|
meeting.status = MeetingStatus.error
|
|
db.commit()
|
|
return
|
|
|
|
# Step 3 - AI Analyses
|
|
meeting.status = MeetingStatus.analyzing
|
|
db.commit()
|
|
|
|
analysis_types = ["minutes", "sentiment", "psychological", "communication", "summary"]
|
|
for analysis_type in analysis_types:
|
|
try:
|
|
content = run_async(run_analysis(analysis_type, transcription))
|
|
existing = (
|
|
db.query(Analysis)
|
|
.filter(Analysis.meeting_id == meeting_id, Analysis.type == AnalysisType(analysis_type))
|
|
.first()
|
|
)
|
|
if existing:
|
|
existing.content = content
|
|
existing.model_used = settings.OLLAMA_MODEL
|
|
else:
|
|
analysis = Analysis(
|
|
meeting_id=meeting.id,
|
|
type=AnalysisType(analysis_type),
|
|
content=content,
|
|
model_used=settings.OLLAMA_MODEL,
|
|
)
|
|
db.add(analysis)
|
|
db.commit()
|
|
logger.info(f"Analysis '{analysis_type}' done for meeting {meeting_id}")
|
|
except Exception as e:
|
|
logger.error(f"Analysis '{analysis_type}' failed for {meeting_id}: {e}")
|
|
|
|
# Step 4 - Mark completed
|
|
meeting.status = MeetingStatus.completed
|
|
db.commit()
|
|
logger.info(f"Meeting {meeting_id} processing COMPLETED")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error processing meeting {meeting_id}: {e}")
|
|
if meeting:
|
|
meeting.status = MeetingStatus.error
|
|
db.commit()
|
|
raise self.retry(exc=e, countdown=60)
|
|
finally:
|
|
db.close()
|