Add Last.fm import support

Users can import their top tracks from Last.fm by entering their username.
No OAuth required - uses the public Last.fm API with user.getTopTracks and
user.getInfo endpoints. Includes a preview feature to verify the username
and see sample tracks before importing. Supports configurable time periods
(all time, 7 days, 1/3/6/12 months). Free tier playlist limit enforced.
This commit is contained in:
root
2026-03-30 22:49:13 -05:00
parent d0ab1755bb
commit 90945932ad
4 changed files with 251 additions and 0 deletions

View File

@@ -10,4 +10,5 @@ STRIPE_SECRET_KEY=sk_test_your-stripe-secret-key
STRIPE_PUBLISHABLE_KEY=pk_test_your-stripe-publishable-key STRIPE_PUBLISHABLE_KEY=pk_test_your-stripe-publishable-key
STRIPE_PRICE_ID=price_your-pro-plan-price-id STRIPE_PRICE_ID=price_your-pro-plan-price-id
STRIPE_WEBHOOK_SECRET=whsec_your-webhook-signing-secret STRIPE_WEBHOOK_SECRET=whsec_your-webhook-signing-secret
LASTFM_API_KEY=your-lastfm-api-key
FRONTEND_URL=https://deepcutsai.com FRONTEND_URL=https://deepcutsai.com

View File

@@ -0,0 +1,147 @@
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.config import settings
from app.core.database import get_db
from app.core.security import get_current_user
from app.models.user import User
from app.models.playlist import Playlist
from app.models.track import Track
from app.services.lastfm import get_user_info, get_top_tracks
from app.services.recommender import build_taste_profile
from app.schemas.playlist import PlaylistDetailResponse
router = APIRouter(prefix="/lastfm", tags=["lastfm"])
class ImportLastfmRequest(BaseModel):
username: str
period: str = "overall"
class LastfmPreviewTrack(BaseModel):
title: str
artist: str
playcount: int
image_url: str | None = None
class LastfmPreviewResponse(BaseModel):
display_name: str
track_count: int
sample_tracks: list[LastfmPreviewTrack]
@router.get("/preview", response_model=LastfmPreviewResponse)
async def preview_lastfm(
username: str = Query(..., min_length=1),
user: User = Depends(get_current_user),
):
"""Preview a Last.fm user's top tracks without importing."""
if not settings.LASTFM_API_KEY:
raise HTTPException(status_code=500, detail="Last.fm API key not configured")
info = await get_user_info(username.strip())
if not info:
raise HTTPException(status_code=404, detail="Last.fm user not found")
try:
tracks = await get_top_tracks(username.strip(), period="overall", limit=50)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
sample = tracks[:5]
return LastfmPreviewResponse(
display_name=info["display_name"],
track_count=len(tracks),
sample_tracks=[LastfmPreviewTrack(**t) for t in sample],
)
@router.post("/import", response_model=PlaylistDetailResponse)
async def import_lastfm(
data: ImportLastfmRequest,
user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Import top tracks from a Last.fm user as a playlist."""
if not settings.LASTFM_API_KEY:
raise HTTPException(status_code=500, detail="Last.fm API key not configured")
username = data.username.strip()
if not username:
raise HTTPException(status_code=400, detail="Username is required")
# Free tier limit
if not user.is_pro:
result = await db.execute(
select(Playlist).where(Playlist.user_id == user.id)
)
existing = list(result.scalars().all())
if len(existing) >= settings.FREE_MAX_PLAYLISTS:
raise HTTPException(
status_code=403,
detail="Free tier limited to 1 playlist. Upgrade to Pro for unlimited.",
)
# Verify user exists
info = await get_user_info(username)
if not info:
raise HTTPException(status_code=404, detail="Last.fm user not found")
# Fetch top tracks
try:
raw_tracks = await get_top_tracks(username, period=data.period, limit=50)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
if not raw_tracks:
raise HTTPException(
status_code=400,
detail="No top tracks found for this user and time period.",
)
# Build playlist name from period
period_labels = {
"overall": "All Time",
"7day": "Last 7 Days",
"1month": "Last Month",
"3month": "Last 3 Months",
"6month": "Last 6 Months",
"12month": "Last Year",
}
period_label = period_labels.get(data.period, "All Time")
playlist_name = f"{info['display_name']}'s Top Tracks ({period_label})"
# Create playlist
playlist = Playlist(
user_id=user.id,
name=playlist_name,
platform_source="lastfm",
external_id=f"lastfm:{username}:{data.period}",
track_count=len(raw_tracks),
)
db.add(playlist)
await db.flush()
# Create tracks (no audio features from Last.fm)
tracks = []
for rt in raw_tracks:
track = Track(
playlist_id=playlist.id,
title=rt["title"],
artist=rt["artist"],
image_url=rt.get("image_url"),
)
db.add(track)
tracks.append(track)
await db.flush()
# Build taste profile
playlist.taste_profile = build_taste_profile(tracks)
playlist.tracks = tracks
return playlist

View File

@@ -28,6 +28,9 @@ class Settings(BaseSettings):
STRIPE_PRICE_ID: str = "" STRIPE_PRICE_ID: str = ""
STRIPE_WEBHOOK_SECRET: str = "" STRIPE_WEBHOOK_SECRET: str = ""
# Last.fm
LASTFM_API_KEY: str = ""
# Frontend # Frontend
FRONTEND_URL: str = "http://localhost:5173" FRONTEND_URL: str = "http://localhost:5173"

View File

@@ -0,0 +1,100 @@
import httpx
from app.core.config import settings
LASTFM_API_URL = "http://ws.audioscrobbler.com/2.0/"
async def get_user_info(username: str) -> dict | None:
"""Verify a Last.fm username exists and return basic info."""
params = {
"method": "user.getInfo",
"user": username,
"api_key": settings.LASTFM_API_KEY,
"format": "json",
}
async with httpx.AsyncClient(timeout=15) as client:
resp = await client.get(LASTFM_API_URL, params=params)
if resp.status_code != 200:
return None
data = resp.json()
if "error" in data:
return None
user = data.get("user", {})
image_url = None
images = user.get("image", [])
for img in images:
if img.get("size") == "large" and img.get("#text"):
image_url = img["#text"]
return {
"name": user.get("name", username),
"display_name": user.get("realname") or user.get("name", username),
"playcount": int(user.get("playcount", 0)),
"image_url": image_url,
"url": user.get("url", ""),
}
async def get_top_tracks(
username: str, period: str = "overall", limit: int = 50
) -> list[dict]:
"""Fetch a user's top tracks from Last.fm.
Args:
username: Last.fm username.
period: Time period - overall, 7day, 1month, 3month, 6month, 12month.
limit: Max number of tracks to return (max 1000).
Returns:
List of dicts with: title, artist, playcount, image_url.
"""
valid_periods = {"overall", "7day", "1month", "3month", "6month", "12month"}
if period not in valid_periods:
period = "overall"
params = {
"method": "user.getTopTracks",
"user": username,
"period": period,
"limit": min(limit, 1000),
"api_key": settings.LASTFM_API_KEY,
"format": "json",
}
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.get(LASTFM_API_URL, params=params)
if resp.status_code != 200:
raise ValueError(f"Last.fm API returned status {resp.status_code}")
data = resp.json()
if "error" in data:
raise ValueError(data.get("message", "Last.fm API error"))
raw_tracks = data.get("toptracks", {}).get("track", [])
tracks = []
for t in raw_tracks:
image_url = None
images = t.get("image", [])
for img in images:
if img.get("size") == "large" and img.get("#text"):
image_url = img["#text"]
artist_name = ""
artist = t.get("artist")
if isinstance(artist, dict):
artist_name = artist.get("name", "")
elif isinstance(artist, str):
artist_name = artist
tracks.append({
"title": t.get("name", "Unknown"),
"artist": artist_name or "Unknown",
"playcount": int(t.get("playcount", 0)),
"image_url": image_url,
})
return tracks