Files
vynl/backend/app/api/endpoints/lastfm.py
root 90945932ad Add Last.fm import support
Users can import their top tracks from Last.fm by entering their username.
No OAuth required - uses the public Last.fm API with user.getTopTracks and
user.getInfo endpoints. Includes a preview feature to verify the username
and see sample tracks before importing. Supports configurable time periods
(all time, 7 days, 1/3/6/12 months). Free tier playlist limit enforced.
2026-03-30 22:49:13 -05:00

148 lines
4.4 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.config import settings
from app.core.database import get_db
from app.core.security import get_current_user
from app.models.user import User
from app.models.playlist import Playlist
from app.models.track import Track
from app.services.lastfm import get_user_info, get_top_tracks
from app.services.recommender import build_taste_profile
from app.schemas.playlist import PlaylistDetailResponse
router = APIRouter(prefix="/lastfm", tags=["lastfm"])
class ImportLastfmRequest(BaseModel):
username: str
period: str = "overall"
class LastfmPreviewTrack(BaseModel):
title: str
artist: str
playcount: int
image_url: str | None = None
class LastfmPreviewResponse(BaseModel):
display_name: str
track_count: int
sample_tracks: list[LastfmPreviewTrack]
@router.get("/preview", response_model=LastfmPreviewResponse)
async def preview_lastfm(
username: str = Query(..., min_length=1),
user: User = Depends(get_current_user),
):
"""Preview a Last.fm user's top tracks without importing."""
if not settings.LASTFM_API_KEY:
raise HTTPException(status_code=500, detail="Last.fm API key not configured")
info = await get_user_info(username.strip())
if not info:
raise HTTPException(status_code=404, detail="Last.fm user not found")
try:
tracks = await get_top_tracks(username.strip(), period="overall", limit=50)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
sample = tracks[:5]
return LastfmPreviewResponse(
display_name=info["display_name"],
track_count=len(tracks),
sample_tracks=[LastfmPreviewTrack(**t) for t in sample],
)
@router.post("/import", response_model=PlaylistDetailResponse)
async def import_lastfm(
data: ImportLastfmRequest,
user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Import top tracks from a Last.fm user as a playlist."""
if not settings.LASTFM_API_KEY:
raise HTTPException(status_code=500, detail="Last.fm API key not configured")
username = data.username.strip()
if not username:
raise HTTPException(status_code=400, detail="Username is required")
# Free tier limit
if not user.is_pro:
result = await db.execute(
select(Playlist).where(Playlist.user_id == user.id)
)
existing = list(result.scalars().all())
if len(existing) >= settings.FREE_MAX_PLAYLISTS:
raise HTTPException(
status_code=403,
detail="Free tier limited to 1 playlist. Upgrade to Pro for unlimited.",
)
# Verify user exists
info = await get_user_info(username)
if not info:
raise HTTPException(status_code=404, detail="Last.fm user not found")
# Fetch top tracks
try:
raw_tracks = await get_top_tracks(username, period=data.period, limit=50)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
if not raw_tracks:
raise HTTPException(
status_code=400,
detail="No top tracks found for this user and time period.",
)
# Build playlist name from period
period_labels = {
"overall": "All Time",
"7day": "Last 7 Days",
"1month": "Last Month",
"3month": "Last 3 Months",
"6month": "Last 6 Months",
"12month": "Last Year",
}
period_label = period_labels.get(data.period, "All Time")
playlist_name = f"{info['display_name']}'s Top Tracks ({period_label})"
# Create playlist
playlist = Playlist(
user_id=user.id,
name=playlist_name,
platform_source="lastfm",
external_id=f"lastfm:{username}:{data.period}",
track_count=len(raw_tracks),
)
db.add(playlist)
await db.flush()
# Create tracks (no audio features from Last.fm)
tracks = []
for rt in raw_tracks:
track = Track(
playlist_id=playlist.id,
title=rt["title"],
artist=rt["artist"],
image_url=rt.get("image_url"),
)
db.add(track)
tracks.append(track)
await db.flush()
# Build taste profile
playlist.taste_profile = build_taste_profile(tracks)
playlist.tracks = tracks
return playlist