from fastapi import APIRouter, Depends, HTTPException, Query from fastapi.responses import PlainTextResponse from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from app.core.config import settings from app.core.database import get_db from app.core.security import get_current_user from app.models.user import User from app.models.playlist import Playlist from app.models.track import Track from app.schemas.playlist import ( PlaylistResponse, PlaylistDetailResponse, SpotifyPlaylistItem, ImportSpotifyRequest, ) from app.services.spotify import get_user_playlists, get_playlist_tracks, get_audio_features from app.services.recommender import build_taste_profile router = APIRouter(prefix="/playlists", tags=["playlists"]) @router.get("", response_model=list[PlaylistResponse]) async def list_playlists( user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): result = await db.execute( select(Playlist).where(Playlist.user_id == user.id).order_by(Playlist.imported_at.desc()) ) return result.scalars().all() @router.get("/{playlist_id}", response_model=PlaylistDetailResponse) async def get_playlist( playlist_id: int, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): result = await db.execute( select(Playlist) .options(selectinload(Playlist.tracks)) .where(Playlist.id == playlist_id, Playlist.user_id == user.id) ) playlist = result.scalar_one_or_none() if not playlist: raise HTTPException(status_code=404, detail="Playlist not found") return playlist @router.get("/{playlist_id}/export") async def export_playlist( playlist_id: int, format: str = Query("text", pattern="^(text|csv)$"), user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): result = await db.execute( select(Playlist).options(selectinload(Playlist.tracks)) .where(Playlist.id == playlist_id, Playlist.user_id == user.id) ) playlist = result.scalar_one_or_none() if not playlist: raise HTTPException(status_code=404, detail="Playlist not found") if format == "csv": lines = ["Title,Artist,Album"] for t in playlist.tracks: # Escape commas in CSV title = f'"{t.title}"' if ',' in t.title else t.title artist = f'"{t.artist}"' if ',' in t.artist else t.artist album = f'"{t.album}"' if t.album and ',' in t.album else (t.album or '') lines.append(f"{title},{artist},{album}") return PlainTextResponse("\n".join(lines), media_type="text/csv", headers={"Content-Disposition": f'attachment; filename="{playlist.name}.csv"'}) else: lines = [f"{playlist.name}", "=" * len(playlist.name), ""] for i, t in enumerate(playlist.tracks, 1): lines.append(f"{i}. {t.artist} - {t.title}") return PlainTextResponse("\n".join(lines), media_type="text/plain", headers={"Content-Disposition": f'attachment; filename="{playlist.name}.txt"'}) @router.delete("/{playlist_id}") async def delete_playlist( playlist_id: int, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): result = await db.execute( select(Playlist).where(Playlist.id == playlist_id, Playlist.user_id == user.id) ) playlist = result.scalar_one_or_none() if not playlist: raise HTTPException(status_code=404, detail="Playlist not found") await db.delete(playlist) return {"ok": True} @router.get("/spotify/available", response_model=list[SpotifyPlaylistItem]) async def list_spotify_playlists(user: User = Depends(get_current_user)): if not user.spotify_access_token: raise HTTPException(status_code=400, detail="Spotify not connected") playlists = await get_user_playlists(user.spotify_access_token) return playlists @router.post("/spotify/import", response_model=PlaylistDetailResponse) async def import_spotify_playlist( data: ImportSpotifyRequest, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db), ): if not user.spotify_access_token: raise HTTPException(status_code=400, detail="Spotify not connected") # Free tier limit if not user.is_pro: result = await db.execute( select(Playlist).where(Playlist.user_id == user.id) ) existing = list(result.scalars().all()) if len(existing) >= settings.FREE_MAX_PLAYLISTS: raise HTTPException(status_code=403, detail="Free tier limited to 1 playlist. Upgrade to Pro for unlimited.") # Fetch tracks from Spotify raw_tracks = await get_playlist_tracks(user.spotify_access_token, data.playlist_id) # Get playlist name from Spotify playlists spotify_playlists = await get_user_playlists(user.spotify_access_token) playlist_name = data.playlist_id for sp in spotify_playlists: if sp["id"] == data.playlist_id: playlist_name = sp["name"] break # Create playlist playlist = Playlist( user_id=user.id, name=playlist_name, platform_source="spotify", external_id=data.playlist_id, track_count=len(raw_tracks), ) db.add(playlist) await db.flush() # Create tracks tracks = [] for rt in raw_tracks: track = Track( playlist_id=playlist.id, title=rt["title"], artist=rt["artist"], album=rt.get("album"), spotify_id=rt.get("spotify_id"), isrc=rt.get("isrc"), preview_url=rt.get("preview_url"), image_url=rt.get("image_url"), ) db.add(track) tracks.append(track) await db.flush() # Fetch audio features spotify_ids = [t.spotify_id for t in tracks if t.spotify_id] if spotify_ids: features = await get_audio_features(user.spotify_access_token, spotify_ids) features_map = {f["id"]: f for f in features if f} for track in tracks: if track.spotify_id and track.spotify_id in features_map: f = features_map[track.spotify_id] track.tempo = f.get("tempo") track.energy = f.get("energy") track.danceability = f.get("danceability") track.valence = f.get("valence") track.acousticness = f.get("acousticness") track.instrumentalness = f.get("instrumentalness") # Build taste profile playlist.taste_profile = build_taste_profile(tracks) playlist.tracks = tracks return playlist