From ac66c5d9be4cefbedd36a8f8cdbeab4431656a80 Mon Sep 17 00:00:00 2001 From: BusyBee3333 Date: Mon, 10 Nov 2025 16:11:36 -0500 Subject: [PATCH] Implement core backend services for beatmatchr --- backend/__init__.py | 6 + backend/app.py | 32 ++++ backend/config.py | 58 +++++++ backend/db.py | 37 +++++ backend/models.py | 83 ++++++++++ backend/routers/__init__.py | 5 + backend/routers/audio.py | 54 +++++++ backend/routers/lyrics.py | 59 +++++++ backend/routers/media.py | 108 +++++++++++++ backend/services/audio_analysis.py | 34 ++++ backend/services/lyrics_from_audio.py | 128 +++++++++++++++ backend/services/media_ingest.py | 220 ++++++++++++++++++++++++++ backend/services/rendering.py | 97 ++++++++++++ backend/services/storage.py | 48 ++++++ backend/workers/__init__.py | 5 + backend/workers/tasks.py | 114 +++++++++++++ 16 files changed, 1088 insertions(+) create mode 100644 backend/__init__.py create mode 100644 backend/app.py create mode 100644 backend/config.py create mode 100644 backend/db.py create mode 100644 backend/models.py create mode 100644 backend/routers/__init__.py create mode 100644 backend/routers/audio.py create mode 100644 backend/routers/lyrics.py create mode 100644 backend/routers/media.py create mode 100644 backend/services/audio_analysis.py create mode 100644 backend/services/lyrics_from_audio.py create mode 100644 backend/services/media_ingest.py create mode 100644 backend/services/rendering.py create mode 100644 backend/services/storage.py create mode 100644 backend/workers/__init__.py create mode 100644 backend/workers/tasks.py diff --git a/backend/__init__.py b/backend/__init__.py new file mode 100644 index 0000000..1df70b7 --- /dev/null +++ b/backend/__init__.py @@ -0,0 +1,6 @@ +"""Beatmatchr backend package.""" + +from .config import settings +from .db import db_session, init_db + +__all__ = ["settings", "db_session", "init_db"] diff --git a/backend/app.py b/backend/app.py new file mode 100644 index 0000000..668be63 --- /dev/null +++ b/backend/app.py @@ -0,0 +1,32 @@ +from __future__ import annotations + +import logging + +from fastapi import FastAPI + +from .db import init_db +from .routers import audio, lyrics, media + +logger = logging.getLogger(__name__) + + +def create_app() -> FastAPI: + app = FastAPI(title="Beatmatchr API", version="0.1.0") + + @app.on_event("startup") + def _startup() -> None: # pragma: no cover - FastAPI lifecycle + init_db() + logger.info("Database initialized") + + app.include_router(media.router, prefix="/api") + app.include_router(audio.router, prefix="/api") + app.include_router(lyrics.router, prefix="/api") + + @app.get("/health") + async def healthcheck() -> dict: + return {"status": "ok"} + + return app + + +app = create_app() diff --git a/backend/config.py b/backend/config.py new file mode 100644 index 0000000..b2773be --- /dev/null +++ b/backend/config.py @@ -0,0 +1,58 @@ +from __future__ import annotations + +import os +from functools import lru_cache +from pathlib import Path +from typing import Optional + +from pydantic import BaseSettings, Field + + +class Settings(BaseSettings): + """Application configuration loaded from environment variables.""" + + database_url: str = Field( + default="sqlite+aiosqlite:///./beatmatchr.db", + description="SQLAlchemy database URL", + ) + sync_database_url: Optional[str] = Field( + default="sqlite:///./beatmatchr.db", + description="Optional sync URL for background workers", + ) + storage_base_path: Path = Field( + default=Path(os.getenv("BEATMATCHR_STORAGE", "./storage")), + description="Base path for file storage when using local filesystem backend.", + ) + celery_broker_url: str = Field( + default=os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0"), + description="Broker URL for Celery workers.", + ) + celery_result_backend: str = Field( + default=os.getenv("CELERY_RESULT_BACKEND", "redis://localhost:6379/0"), + description="Result backend URL for Celery workers.", + ) + transcription_api_url: Optional[str] = Field( + default=os.getenv("TRANSCRIPTION_API_URL"), + description="External transcription service endpoint.", + ) + transcription_api_key: Optional[str] = Field( + default=os.getenv("TRANSCRIPTION_API_KEY"), + description="API key for transcription service if required.", + ) + + class Config: + env_file = ".env" + env_file_encoding = "utf-8" + + +@lru_cache() +def get_settings() -> Settings: + """Return cached application settings instance.""" + + settings = Settings() + base_path = Path(settings.storage_base_path) + base_path.mkdir(parents=True, exist_ok=True) + return settings + + +settings = get_settings() diff --git a/backend/db.py b/backend/db.py new file mode 100644 index 0000000..fac36bc --- /dev/null +++ b/backend/db.py @@ -0,0 +1,37 @@ +from __future__ import annotations + +from contextlib import contextmanager +from typing import Iterator + +from sqlalchemy import create_engine +from sqlalchemy.orm import declarative_base, sessionmaker, Session + +from .config import settings + + +SYNC_DATABASE_URL = settings.sync_database_url or settings.database_url + +engine = create_engine(SYNC_DATABASE_URL, future=True) +SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False, expire_on_commit=False) + +Base = declarative_base() + + +def init_db() -> None: + """Create all tables in the database.""" + + Base.metadata.create_all(bind=engine) + + +@contextmanager +def db_session() -> Iterator[Session]: + """Provide a transactional scope around a series of operations.""" + + session: Session = SessionLocal() + try: + yield session + except Exception: + session.rollback() + raise + finally: + session.close() diff --git a/backend/models.py b/backend/models.py new file mode 100644 index 0000000..9f2a5ea --- /dev/null +++ b/backend/models.py @@ -0,0 +1,83 @@ +from __future__ import annotations + +import uuid +from datetime import datetime + +from sqlalchemy import Column, DateTime, Float, ForeignKey, JSON, String, Text, Integer +from sqlalchemy.orm import relationship + +from .db import Base, db_session + + +class TimestampMixin: + created_at = Column(DateTime, default=datetime.utcnow, nullable=False) + updated_at = Column( + DateTime, + default=datetime.utcnow, + onupdate=datetime.utcnow, + nullable=False, + ) + + +class Project(Base, TimestampMixin): + __tablename__ = "projects" + + id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4())) + name = Column(String, nullable=False) + + audio_tracks = relationship("AudioTrack", back_populates="project", cascade="all, delete-orphan") + source_clips = relationship("SourceClip", back_populates="project", cascade="all, delete-orphan") + lyrics = relationship("Lyrics", back_populates="project", uselist=False, cascade="all, delete-orphan") + + +class AudioTrack(Base, TimestampMixin): + __tablename__ = "audio_tracks" + + id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4())) + project_id = Column(String, ForeignKey("projects.id"), nullable=False, index=True) + storage_path = Column(Text, nullable=False) + local_path = Column(Text, nullable=True) + duration_seconds = Column(Float, nullable=True) + bpm = Column(Float, nullable=True) + beat_grid = Column(JSON, nullable=True) + + project = relationship("Project", back_populates="audio_tracks") + + +class SourceClip(Base, TimestampMixin): + __tablename__ = "source_clips" + + id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4())) + project_id = Column(String, ForeignKey("projects.id"), nullable=False, index=True) + origin = Column(String, nullable=False) + original_url = Column(Text, nullable=True) + storage_path = Column(Text, nullable=False) + thumbnail_path = Column(Text, nullable=True) + duration_seconds = Column(Float, nullable=True) + width = Column(Integer, nullable=True) + height = Column(Integer, nullable=True) + fps = Column(Float, nullable=True) + + project = relationship("Project", back_populates="source_clips") + + +class Lyrics(Base, TimestampMixin): + __tablename__ = "lyrics" + + id = Column(String, primary_key=True, default=lambda: str(uuid.uuid4())) + project_id = Column(String, ForeignKey("projects.id"), nullable=False, unique=True, index=True) + source = Column(String, nullable=False) + raw_text = Column(Text, nullable=False) + timed_words = Column(JSON, nullable=True) + timed_lines = Column(JSON, nullable=True) + + project = relationship("Project", back_populates="lyrics") + + +__all__ = [ + "Project", + "AudioTrack", + "SourceClip", + "Lyrics", + "db_session", +] diff --git a/backend/routers/__init__.py b/backend/routers/__init__.py new file mode 100644 index 0000000..1b929cb --- /dev/null +++ b/backend/routers/__init__.py @@ -0,0 +1,5 @@ +"""API routers for the Beatmatchr service.""" + +from . import audio, lyrics, media + +__all__ = ["audio", "lyrics", "media"] diff --git a/backend/routers/audio.py b/backend/routers/audio.py new file mode 100644 index 0000000..0718c6f --- /dev/null +++ b/backend/routers/audio.py @@ -0,0 +1,54 @@ +from __future__ import annotations + +import uuid + +from fastapi import APIRouter, File, HTTPException, UploadFile, status + +from ..db import db_session +from ..models import AudioTrack, Project +from ..services import storage +from ..workers.tasks import task_analyze_audio, task_transcribe_lyrics + +router = APIRouter(prefix="/projects/{project_id}/audio", tags=["audio"]) + + +@router.post("", status_code=status.HTTP_201_CREATED) +async def upload_audio(project_id: str, file: UploadFile = File(...)) -> dict: + if not file.content_type or not file.content_type.startswith("audio/"): + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Uploaded file must be audio") + + audio_id = str(uuid.uuid4()) + extension = "." + file.filename.split(".")[-1] if file.filename and "." in file.filename else ".mp3" + storage_dest = f"audio/{project_id}/{audio_id}{extension}" + storage_path = "" + + try: + with db_session() as session: + project = session.query(Project).filter_by(id=project_id).one_or_none() + if project is None: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Project not found") + + file.file.seek(0) + storage_path = storage.upload_file(file.file, storage_dest) + + audio_track = AudioTrack( + id=audio_id, + project_id=project_id, + storage_path=storage_path, + ) + session.add(audio_track) + session.commit() + finally: + file.file.close() + + if not storage_path: + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to store audio file") + + task_analyze_audio.delay(audio_track_id=audio_id) + task_transcribe_lyrics.delay(project_id=project_id, audio_track_id=audio_id) + + return { + "audio_track_id": audio_id, + "project_id": project_id, + "storage_path": storage_path, + } diff --git a/backend/routers/lyrics.py b/backend/routers/lyrics.py new file mode 100644 index 0000000..199e78c --- /dev/null +++ b/backend/routers/lyrics.py @@ -0,0 +1,59 @@ +from __future__ import annotations + +from fastapi import APIRouter, HTTPException + +from ..db import db_session +from ..models import Lyrics, Project + +router = APIRouter(prefix="/projects/{project_id}/lyrics", tags=["lyrics"]) + + +@router.get("") +def get_lyrics(project_id: str) -> dict: + with db_session() as session: + project = session.query(Project).filter_by(id=project_id).one_or_none() + if project is None: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Project not found") + + lyrics = session.query(Lyrics).filter_by(project_id=project_id).one_or_none() + if lyrics is None: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Lyrics not found") + + return { + "project_id": lyrics.project_id, + "source": lyrics.source, + "raw_text": lyrics.raw_text, + "timed_lines": lyrics.timed_lines or [], + "timed_words": lyrics.timed_words or [], + "created_at": lyrics.created_at, + "updated_at": lyrics.updated_at, + } + + +@router.put("") +def update_lyrics(project_id: str, payload: dict) -> dict: + new_text = payload.get("raw_text") + if not isinstance(new_text, str) or not new_text.strip(): + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="raw_text must be provided") + + with db_session() as session: + project = session.query(Project).filter_by(id=project_id).one_or_none() + if project is None: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Project not found") + + lyrics = session.query(Lyrics).filter_by(project_id=project_id).one_or_none() + if lyrics is None: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Lyrics not found") + + lyrics.raw_text = new_text.strip() + session.commit() + + return { + "project_id": lyrics.project_id, + "source": lyrics.source, + "raw_text": lyrics.raw_text, + "timed_lines": lyrics.timed_lines or [], + "timed_words": lyrics.timed_words or [], + "created_at": lyrics.created_at, + "updated_at": lyrics.updated_at, + } diff --git a/backend/routers/media.py b/backend/routers/media.py new file mode 100644 index 0000000..75c16fb --- /dev/null +++ b/backend/routers/media.py @@ -0,0 +1,108 @@ +from __future__ import annotations + +import uuid +from typing import List + +from fastapi import APIRouter, File, HTTPException, UploadFile, status +from sqlalchemy.orm import Session + +from ..db import db_session +from ..models import Project, SourceClip +from ..services import storage +from ..workers.tasks import task_ingest_url, task_process_uploaded_video + +router = APIRouter(prefix="/projects/{project_id}/source-clips", tags=["source-clips"]) + + +def get_project(session: Session, project_id: str) -> Project: + project = session.query(Project).filter_by(id=project_id).one_or_none() + if project is None: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Project not found") + return project + + +@router.post("/urls") +def enqueue_url_ingest(project_id: str, payload: dict) -> dict: + urls = payload.get("urls") or [] + if not isinstance(urls, list) or not urls: + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="urls must be a non-empty list") + origin = payload.get("origin") or "url" + + for value in urls: + if not isinstance(value, str) or not value.strip(): + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="All URLs must be non-empty strings") + + with db_session() as session: + get_project(session, project_id) + + for url in urls: + task_ingest_url.delay(project_id=project_id, input_url=url, origin=origin) + + return {"status": "queued", "count": len(urls)} + + +@router.post("/upload", status_code=status.HTTP_201_CREATED) +async def upload_source_clip(project_id: str, file: UploadFile = File(...)) -> dict: + if not file.content_type or not file.content_type.startswith("video/"): + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Uploaded file must be a video") + + clip_id = str(uuid.uuid4()) + extension = "." + file.filename.split(".")[-1] if file.filename and "." in file.filename else ".mp4" + storage_dest = f"videos/{project_id}/{clip_id}{extension}" + storage_path = "" + + try: + with db_session() as session: + get_project(session, project_id) + + file.file.seek(0) + storage_path = storage.upload_file(file.file, storage_dest) + + clip = SourceClip( + id=clip_id, + project_id=project_id, + origin="upload", + original_url=None, + storage_path=storage_path, + ) + session.add(clip) + session.commit() + finally: + file.file.close() + + if not storage_path: + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to store uploaded file") + + task_process_uploaded_video.delay(source_clip_id=clip_id) + + return { + "id": clip_id, + "project_id": project_id, + "storage_path": storage_path, + "origin": "upload", + "status": "processing", + } + + +@router.get("") +def list_source_clips(project_id: str) -> List[dict]: + with db_session() as session: + get_project(session, project_id) + clips = session.query(SourceClip).filter_by(project_id=project_id).order_by(SourceClip.created_at.asc()).all() + + return [ + { + "id": clip.id, + "origin": clip.origin, + "original_url": clip.original_url, + "storage_path": clip.storage_path, + "thumbnail_path": clip.thumbnail_path, + "duration_seconds": clip.duration_seconds, + "width": clip.width, + "height": clip.height, + "fps": clip.fps, + "created_at": clip.created_at, + "updated_at": clip.updated_at, + } + for clip in clips + ] diff --git a/backend/services/audio_analysis.py b/backend/services/audio_analysis.py new file mode 100644 index 0000000..3cdfd13 --- /dev/null +++ b/backend/services/audio_analysis.py @@ -0,0 +1,34 @@ +from __future__ import annotations + +import logging +from typing import Dict, List + +try: + import librosa +except ImportError: # pragma: no cover - optional dependency + librosa = None + +logger = logging.getLogger(__name__) + + +def analyze_audio(local_path: str) -> Dict[str, object]: + """Analyze the uploaded audio file for duration, BPM, and beat grid.""" + + if librosa is None: + logger.warning("librosa not available; returning stubbed audio analysis values") + return { + "duration_seconds": 0.0, + "bpm": 120.0, + "beat_grid": [i * 0.5 for i in range(16)], + } + + y, sr = librosa.load(local_path) + tempo, beats = librosa.beat.beat_track(y=y, sr=sr) + beat_times: List[float] = librosa.frames_to_time(beats, sr=sr).tolist() + duration = librosa.get_duration(y=y, sr=sr) + + return { + "duration_seconds": float(duration), + "bpm": float(tempo), + "beat_grid": beat_times, + } diff --git a/backend/services/lyrics_from_audio.py b/backend/services/lyrics_from_audio.py new file mode 100644 index 0000000..fae14e0 --- /dev/null +++ b/backend/services/lyrics_from_audio.py @@ -0,0 +1,128 @@ +from __future__ import annotations + +import logging +from dataclasses import dataclass +from pathlib import Path +from typing import Any, Dict, List, Optional + +import requests + +from ..config import settings + +logger = logging.getLogger(__name__) + + +@dataclass +class TranscriptionResult: + raw_text: str + words: List[Dict[str, Any]] + + +class TranscriptionClient: + """Client for an external transcription API that returns word timestamps.""" + + def __init__( + self, + api_url: Optional[str] = None, + api_key: Optional[str] = None, + timeout: int = 300, + ) -> None: + self.api_url = api_url or settings.transcription_api_url + self.api_key = api_key or settings.transcription_api_key + self.timeout = timeout + + def transcribe(self, audio_path: str) -> TranscriptionResult: + if not self.api_url: + raise RuntimeError( + "Transcription API URL is not configured. Set TRANSCRIPTION_API_URL to enable lyrics extraction." + ) + + file_path = Path(audio_path) + if not file_path.exists(): + raise FileNotFoundError(f"Audio file not found: {audio_path}") + + headers = {} + if self.api_key: + headers["Authorization"] = f"Bearer {self.api_key}" + + with file_path.open("rb") as audio_file: + files = {"file": (file_path.name, audio_file, "application/octet-stream")} + data = {"timestamps": "word"} + response = requests.post( + self.api_url, + headers=headers, + data=data, + files=files, + timeout=self.timeout, + ) + response.raise_for_status() + + payload = response.json() + raw_text = payload.get("text") or payload.get("raw_text") + if raw_text is None: + raise ValueError("Transcription response missing 'text' field") + + words = payload.get("words") or [] + normalized_words: List[Dict[str, Any]] = [] + for word in words: + try: + normalized_words.append( + { + "start": float(word["start"]), + "end": float(word["end"]), + "word": str(word.get("word") or word.get("text") or "").strip(), + } + ) + except (KeyError, TypeError, ValueError) as exc: + logger.warning("Skipping malformed word entry %s: %s", word, exc) + return TranscriptionResult(raw_text=raw_text.strip(), words=normalized_words) + + +def words_to_lines(words: List[Dict[str, Any]], max_silence_gap: float = 0.7) -> List[Dict[str, Any]]: + """Group word-level timestamps into line-level segments.""" + + if not words: + return [] + + sorted_words = sorted(words, key=lambda w: w["start"]) + lines: List[Dict[str, Any]] = [] + current_line_words: List[Dict[str, Any]] = [sorted_words[0]] + + for previous_word, current_word in zip(sorted_words, sorted_words[1:]): + gap = float(current_word["start"]) - float(previous_word["end"]) + if gap > max_silence_gap: + lines.append( + { + "start": float(current_line_words[0]["start"]), + "end": float(current_line_words[-1]["end"]), + "text": " ".join(word["word"] for word in current_line_words).strip(), + } + ) + current_line_words = [current_word] + else: + current_line_words.append(current_word) + + if current_line_words: + lines.append( + { + "start": float(current_line_words[0]["start"]), + "end": float(current_line_words[-1]["end"]), + "text": " ".join(word["word"] for word in current_line_words).strip(), + } + ) + + return lines + + +def transcribe_audio_to_lyrics(local_audio_path: str) -> Dict[str, Any]: + """Transcribe the uploaded audio file into lyrics with timing information.""" + + client = TranscriptionClient() + result = client.transcribe(local_audio_path) + lines = words_to_lines(result.words) + + return { + "raw_text": result.raw_text, + "words": result.words, + "lines": lines, + } diff --git a/backend/services/media_ingest.py b/backend/services/media_ingest.py new file mode 100644 index 0000000..2b585c9 --- /dev/null +++ b/backend/services/media_ingest.py @@ -0,0 +1,220 @@ +from __future__ import annotations + +import json +import logging +import os +import subprocess +import tempfile +import uuid +from datetime import datetime +from pathlib import Path +from typing import Dict, List + +import requests +from PIL import Image + +from ..db import db_session +from ..models import Project, SourceClip +from . import storage + +logger = logging.getLogger(__name__) + + +def resolve_media_urls_from_input(url: str) -> List[str]: + """Resolve direct media URLs for a given user-provided URL using yt-dlp.""" + + try: + process = subprocess.run( + [ + "yt-dlp", + "--dump-json", + "--skip-download", + url, + ], + check=False, + capture_output=True, + text=True, + ) + except FileNotFoundError: + logger.warning("yt-dlp not installed; returning provided URL directly") + return [url] + + if process.returncode != 0: + logger.error("yt-dlp failed for %s: %s", url, process.stderr.strip()) + return [url] + + media_urls: List[str] = [] + for line in process.stdout.strip().splitlines(): + try: + payload = json.loads(line) + except json.JSONDecodeError: + continue + url_field = payload.get("url") or payload.get("webpage_url") + if url_field: + media_urls.append(url_field) + if not media_urls: + media_urls.append(url) + return media_urls + + +def download_media_file(media_url: str) -> str: + """Download a media file to a temporary local path and return it.""" + + response = requests.get(media_url, stream=True, timeout=60) + response.raise_for_status() + + suffix = Path(media_url).suffix or ".mp4" + with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as temp_file: + for chunk in response.iter_content(chunk_size=8192): + if chunk: + temp_file.write(chunk) + temp_path = temp_file.name + return temp_path + + +def extract_video_metadata(local_path: str) -> Dict[str, float | int | None]: + """Extract video metadata using ffprobe.""" + + command = [ + "ffprobe", + "-v", + "error", + "-select_streams", + "v:0", + "-show_entries", + "stream=width,height,r_frame_rate:format=duration", + "-of", + "json", + local_path, + ] + result = subprocess.run(command, capture_output=True, text=True, check=False) + if result.returncode != 0: + raise RuntimeError(f"ffprobe failed: {result.stderr}") + + payload = json.loads(result.stdout) + stream = (payload.get("streams") or [{}])[0] + format_info = payload.get("format") or {} + + r_frame_rate = stream.get("r_frame_rate", "0/1") + try: + num, den = r_frame_rate.split("/") + fps = float(num) / float(den) if float(den) else None + except (ValueError, ZeroDivisionError): + fps = None + + metadata = { + "duration_seconds": float(format_info.get("duration")) if format_info.get("duration") else None, + "width": stream.get("width"), + "height": stream.get("height"), + "fps": fps, + } + return metadata + + +def generate_thumbnail(local_video_path: str, time_seconds: float = 0.5) -> bytes: + """Generate a thumbnail image for a video clip using ffmpeg.""" + + resized_path: str | None = None + with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as temp_image: + temp_image_path = temp_image.name + + try: + command = [ + "ffmpeg", + "-ss", + str(time_seconds), + "-i", + local_video_path, + "-frames:v", + "1", + "-q:v", + "2", + temp_image_path, + ] + result = subprocess.run(command, capture_output=True, check=False) + if result.returncode != 0: + raise RuntimeError(f"ffmpeg thumbnail generation failed: {result.stderr}") + + with Image.open(temp_image_path) as img: + width = 480 + ratio = width / float(img.width) + resized = img.resize((width, int(img.height * ratio))) + with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as resized_file: + resized.save(resized_file.name, format="JPEG", quality=90) + resized_path = resized_file.name + if resized_path is None: + raise RuntimeError("Failed to create thumbnail image") + with open(resized_path, "rb") as thumbnail_file: + data = thumbnail_file.read() + finally: + if os.path.exists(temp_image_path): + os.remove(temp_image_path) + if resized_path and os.path.exists(resized_path): + os.remove(resized_path) + return data + + +def ingest_single_media_url(project_id: str, input_url: str, origin: str = "url") -> List[Dict]: + """Ingest media from a URL and persist SourceClip entries.""" + + media_urls = resolve_media_urls_from_input(input_url) + created_clips: List[Dict] = [] + + with db_session() as session: + project = session.query(Project).filter_by(id=project_id).one_or_none() + if project is None: + raise ValueError(f"Project {project_id} does not exist") + + for media_url in media_urls: + local_path = download_media_file(media_url) + try: + metadata = extract_video_metadata(local_path) + thumbnail_bytes = generate_thumbnail(local_path) + + clip_id = str(uuid.uuid4()) + extension = Path(local_path).suffix or ".mp4" + storage_dest = f"videos/{project_id}/{clip_id}{extension}" + thumb_dest = f"thumbnails/{project_id}/{clip_id}.jpg" + + with open(local_path, "rb") as infile: + storage_path = storage.upload_file(infile, storage_dest) + thumbnail_path = storage.upload_bytes(thumbnail_bytes, thumb_dest, content_type="image/jpeg") + + now = datetime.utcnow() + clip = SourceClip( + id=clip_id, + project_id=project_id, + origin=origin, + original_url=input_url, + storage_path=storage_path, + thumbnail_path=thumbnail_path, + duration_seconds=metadata.get("duration_seconds"), + width=metadata.get("width"), + height=metadata.get("height"), + fps=metadata.get("fps"), + created_at=now, + updated_at=now, + ) + session.add(clip) + session.flush() + + created_clips.append( + { + "id": clip.id, + "project_id": clip.project_id, + "storage_path": clip.storage_path, + "thumbnail_path": clip.thumbnail_path, + "duration_seconds": clip.duration_seconds, + "width": clip.width, + "height": clip.height, + "fps": clip.fps, + "origin": clip.origin, + "original_url": clip.original_url, + } + ) + finally: + if os.path.exists(local_path): + os.remove(local_path) + session.commit() + + return created_clips diff --git a/backend/services/rendering.py b/backend/services/rendering.py new file mode 100644 index 0000000..84cade6 --- /dev/null +++ b/backend/services/rendering.py @@ -0,0 +1,97 @@ +from __future__ import annotations + +import logging +from typing import Dict, List + +import moviepy.editor as mpe + +logger = logging.getLogger(__name__) + + +def _parse_resolution(resolution: str) -> tuple[int, int]: + try: + width_str, height_str = resolution.lower().split("x") + return int(width_str), int(height_str) + except (ValueError, AttributeError) as exc: + raise ValueError(f"Invalid resolution format: {resolution}") from exc + + +def render_video( + audio_path: str, + timeline: List[Dict], + lyrics_timed_lines: List[Dict], + output_path: str, + resolution: str = "1080x1920", + fps: int = 30, +) -> None: + """Render the final video composition with lyrics overlays.""" + + width, height = _parse_resolution(resolution) + video_segments: List[mpe.VideoClip] = [] + + try: + for segment in timeline: + clip_path = segment["clip_path"] + video_start = float(segment.get("video_start", 0.0)) + video_end = float(segment.get("video_end")) if segment.get("video_end") is not None else None + song_start = float(segment.get("song_start", 0.0)) + + clip = mpe.VideoFileClip(clip_path) + if video_end is not None: + clip = clip.subclip(video_start, video_end) + else: + clip = clip.subclip(video_start) + clip = clip.resize(newsize=(width, height)).set_start(song_start) + video_segments.append(clip) + + if not video_segments: + raise ValueError("Timeline is empty; cannot render video") + + base_video = mpe.concatenate_videoclips(video_segments, method="compose") + + text_clips: List[mpe.VideoClip] = [] + for line in lyrics_timed_lines: + text = line.get("text", "").strip() + if not text: + continue + start = float(line["start"]) + end = float(line["end"]) + duration = max(end - start, 0.1) + text_clip = ( + mpe.TextClip( + txt=text, + fontsize=60, + color="white", + stroke_color="black", + stroke_width=2, + method="caption", + size=(width - 200, None), + ) + .set_duration(duration) + .set_start(start) + .set_position(("center", height - 150)) + ) + text_clips.append(text_clip) + + composite = mpe.CompositeVideoClip([base_video, *text_clips], size=(width, height)) + audio_clip = mpe.AudioFileClip(audio_path) + composite = composite.set_audio(audio_clip) + composite.write_videofile( + output_path, + codec="libx264", + audio_codec="aac", + fps=fps, + preset="medium", + ) + finally: + for clip in video_segments: + clip.close() + if "base_video" in locals(): + base_video.close() # type: ignore[union-attr] + if "text_clips" in locals(): + for clip in text_clips: + clip.close() + if "composite" in locals(): + composite.close() # type: ignore[union-attr] + if "audio_clip" in locals(): + audio_clip.close() # type: ignore[union-attr] diff --git a/backend/services/storage.py b/backend/services/storage.py new file mode 100644 index 0000000..6dc9314 --- /dev/null +++ b/backend/services/storage.py @@ -0,0 +1,48 @@ +from __future__ import annotations + +import shutil +import tempfile +from pathlib import Path +from typing import BinaryIO + +from ..config import settings + + +def _resolve_destination(dest_path: str) -> Path: + base_path = Path(settings.storage_base_path) + full_path = base_path / dest_path + full_path.parent.mkdir(parents=True, exist_ok=True) + return full_path + + +def upload_file(file_obj: BinaryIO, dest_path: str) -> str: + """Upload a file-like object to object storage.""" + + destination = _resolve_destination(dest_path) + with open(destination, "wb") as out_file: + shutil.copyfileobj(file_obj, out_file) + return dest_path + + +def upload_bytes(data: bytes, dest_path: str, content_type: str | None = None) -> str: + """Upload raw bytes to object storage.""" + + destination = _resolve_destination(dest_path) + with open(destination, "wb") as out_file: + out_file.write(data) + return dest_path + + +def download_to_temp(path: str) -> str: + """Download a file from storage to a temporary local file.""" + + source_path = Path(settings.storage_base_path) / path + if not source_path.exists(): + raise FileNotFoundError(f"Storage path does not exist: {path}") + + suffix = source_path.suffix + with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as temp_file: + with open(source_path, "rb") as in_file: + shutil.copyfileobj(in_file, temp_file) + temp_file_path = temp_file.name + return temp_file_path diff --git a/backend/workers/__init__.py b/backend/workers/__init__.py new file mode 100644 index 0000000..7527d63 --- /dev/null +++ b/backend/workers/__init__.py @@ -0,0 +1,5 @@ +"""Background workers and Celery tasks.""" + +from .tasks import celery_app + +__all__ = ["celery_app"] diff --git a/backend/workers/tasks.py b/backend/workers/tasks.py new file mode 100644 index 0000000..3eced44 --- /dev/null +++ b/backend/workers/tasks.py @@ -0,0 +1,114 @@ +from __future__ import annotations + +import os +import uuid +from datetime import datetime + +from celery import Celery + +from ..config import settings +from ..db import db_session +from ..models import AudioTrack, Lyrics, SourceClip +from ..services import audio_analysis, lyrics_from_audio, media_ingest, storage + +celery_app = Celery( + "beatmatchr", + broker=settings.celery_broker_url, + backend=settings.celery_result_backend, +) + + +@celery_app.task(name="media.ingest_url") +def task_ingest_url(project_id: str, input_url: str, origin: str = "url") -> None: + """Ingest media from a URL for the specified project.""" + + media_ingest.ingest_single_media_url(project_id=project_id, input_url=input_url, origin=origin) + + +@celery_app.task(name="media.process_uploaded_video") +def task_process_uploaded_video(source_clip_id: str) -> None: + """Extract metadata and thumbnails for an uploaded video clip.""" + + with db_session() as session: + clip = session.query(SourceClip).filter_by(id=source_clip_id).one() + local_video = storage.download_to_temp(clip.storage_path) + try: + meta = media_ingest.extract_video_metadata(local_video) + thumb_bytes = media_ingest.generate_thumbnail(local_video, time_seconds=0.5) + + thumb_dest = f"thumbnails/{clip.project_id}/{clip.id}.jpg" + thumb_path = storage.upload_bytes(thumb_bytes, thumb_dest, content_type="image/jpeg") + + clip.duration_seconds = meta.get("duration_seconds") + clip.width = meta.get("width") + clip.height = meta.get("height") + clip.fps = meta.get("fps") + clip.thumbnail_path = thumb_path + clip.updated_at = datetime.utcnow() + + session.commit() + finally: + if os.path.exists(local_video): + os.remove(local_video) + + +@celery_app.task(name="audio.analyze") +def task_analyze_audio(audio_track_id: str) -> None: + """Analyze audio track to compute BPM and beat grid.""" + + with db_session() as session: + audio = session.query(AudioTrack).filter_by(id=audio_track_id).one() + local_path = storage.download_to_temp(audio.storage_path) + try: + result = audio_analysis.analyze_audio(local_path) + + audio.duration_seconds = result.get("duration_seconds") + audio.bpm = result.get("bpm") + audio.beat_grid = result.get("beat_grid") + audio.updated_at = datetime.utcnow() + + session.commit() + finally: + if os.path.exists(local_path): + os.remove(local_path) + + +@celery_app.task(name="lyrics.transcribe") +def task_transcribe_lyrics(project_id: str, audio_track_id: str) -> None: + """Transcribe lyrics from the project's audio track.""" + + with db_session() as session: + audio = session.query(AudioTrack).filter_by(id=audio_track_id).one() + local_path = storage.download_to_temp(audio.storage_path) + try: + result = lyrics_from_audio.transcribe_audio_to_lyrics(local_path) + raw_text = result["raw_text"] + words = result.get("words", []) + lines = result.get("lines", []) + + existing = session.query(Lyrics).filter_by(project_id=project_id).one_or_none() + now = datetime.utcnow() + + if existing is None: + lyrics = Lyrics( + id=str(uuid.uuid4()), + project_id=project_id, + source="audio_transcription", + raw_text=raw_text, + timed_words=words, + timed_lines=lines, + created_at=now, + updated_at=now, + ) + session.add(lyrics) + else: + existing.source = "audio_transcription" + existing.raw_text = raw_text + existing.timed_words = words + existing.timed_lines = lines + existing.updated_at = now + + session.commit() + finally: + if os.path.exists(local_path): + os.remove(local_path)