2025-11-10 16:11:36 -05:00

115 lines
4.0 KiB
Python

from __future__ import annotations
import os
import uuid
from datetime import datetime
from celery import Celery
from ..config import settings
from ..db import db_session
from ..models import AudioTrack, Lyrics, SourceClip
from ..services import audio_analysis, lyrics_from_audio, media_ingest, storage
celery_app = Celery(
"beatmatchr",
broker=settings.celery_broker_url,
backend=settings.celery_result_backend,
)
@celery_app.task(name="media.ingest_url")
def task_ingest_url(project_id: str, input_url: str, origin: str = "url") -> None:
"""Ingest media from a URL for the specified project."""
media_ingest.ingest_single_media_url(project_id=project_id, input_url=input_url, origin=origin)
@celery_app.task(name="media.process_uploaded_video")
def task_process_uploaded_video(source_clip_id: str) -> None:
"""Extract metadata and thumbnails for an uploaded video clip."""
with db_session() as session:
clip = session.query(SourceClip).filter_by(id=source_clip_id).one()
local_video = storage.download_to_temp(clip.storage_path)
try:
meta = media_ingest.extract_video_metadata(local_video)
thumb_bytes = media_ingest.generate_thumbnail(local_video, time_seconds=0.5)
thumb_dest = f"thumbnails/{clip.project_id}/{clip.id}.jpg"
thumb_path = storage.upload_bytes(thumb_bytes, thumb_dest, content_type="image/jpeg")
clip.duration_seconds = meta.get("duration_seconds")
clip.width = meta.get("width")
clip.height = meta.get("height")
clip.fps = meta.get("fps")
clip.thumbnail_path = thumb_path
clip.updated_at = datetime.utcnow()
session.commit()
finally:
if os.path.exists(local_video):
os.remove(local_video)
@celery_app.task(name="audio.analyze")
def task_analyze_audio(audio_track_id: str) -> None:
"""Analyze audio track to compute BPM and beat grid."""
with db_session() as session:
audio = session.query(AudioTrack).filter_by(id=audio_track_id).one()
local_path = storage.download_to_temp(audio.storage_path)
try:
result = audio_analysis.analyze_audio(local_path)
audio.duration_seconds = result.get("duration_seconds")
audio.bpm = result.get("bpm")
audio.beat_grid = result.get("beat_grid")
audio.updated_at = datetime.utcnow()
session.commit()
finally:
if os.path.exists(local_path):
os.remove(local_path)
@celery_app.task(name="lyrics.transcribe")
def task_transcribe_lyrics(project_id: str, audio_track_id: str) -> None:
"""Transcribe lyrics from the project's audio track."""
with db_session() as session:
audio = session.query(AudioTrack).filter_by(id=audio_track_id).one()
local_path = storage.download_to_temp(audio.storage_path)
try:
result = lyrics_from_audio.transcribe_audio_to_lyrics(local_path)
raw_text = result["raw_text"]
words = result.get("words", [])
lines = result.get("lines", [])
existing = session.query(Lyrics).filter_by(project_id=project_id).one_or_none()
now = datetime.utcnow()
if existing is None:
lyrics = Lyrics(
id=str(uuid.uuid4()),
project_id=project_id,
source="audio_transcription",
raw_text=raw_text,
timed_words=words,
timed_lines=lines,
created_at=now,
updated_at=now,
)
session.add(lyrics)
else:
existing.source = "audio_transcription"
existing.raw_text = raw_text
existing.timed_words = words
existing.timed_lines = lines
existing.updated_at = now
session.commit()
finally:
if os.path.exists(local_path):
os.remove(local_path)