Backend (FastAPI): - User auth with email/password and Spotify OAuth - Spotify playlist import with audio feature extraction - AI recommendation engine using Claude API with taste profiling - Save/bookmark recommendations - Rate limiting for free tier (10 recs/day, 1 playlist) - PostgreSQL models with Alembic migrations - Redis-ready configuration Frontend (React 19 + TypeScript + Vite + Tailwind): - Landing page, auth flows (email + Spotify OAuth) - Dashboard with stats and quick discover - Playlist management and import from Spotify - Discover page with custom query support - Recommendation cards with explanations and save toggle - Taste profile visualization - Responsive layout with mobile navigation - PWA-ready configuration Infrastructure: - Docker Compose with PostgreSQL, Redis, backend, frontend - Environment-based configuration
130 lines
4.6 KiB
Python
130 lines
4.6 KiB
Python
import httpx
|
|
from urllib.parse import urlencode
|
|
|
|
from app.core.config import settings
|
|
|
|
SPOTIFY_AUTH_URL = "https://accounts.spotify.com/authorize"
|
|
SPOTIFY_TOKEN_URL = "https://accounts.spotify.com/api/token"
|
|
SPOTIFY_API_URL = "https://api.spotify.com/v1"
|
|
|
|
SCOPES = "user-read-private user-read-email playlist-read-private playlist-read-collaborative"
|
|
|
|
|
|
def get_spotify_auth_url(state: str) -> str:
|
|
params = {
|
|
"client_id": settings.SPOTIFY_CLIENT_ID,
|
|
"response_type": "code",
|
|
"redirect_uri": settings.SPOTIFY_REDIRECT_URI,
|
|
"scope": SCOPES,
|
|
"state": state,
|
|
}
|
|
return f"{SPOTIFY_AUTH_URL}?{urlencode(params)}"
|
|
|
|
|
|
async def exchange_spotify_code(code: str) -> dict:
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.post(
|
|
SPOTIFY_TOKEN_URL,
|
|
data={
|
|
"grant_type": "authorization_code",
|
|
"code": code,
|
|
"redirect_uri": settings.SPOTIFY_REDIRECT_URI,
|
|
"client_id": settings.SPOTIFY_CLIENT_ID,
|
|
"client_secret": settings.SPOTIFY_CLIENT_SECRET,
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
|
|
async def refresh_spotify_token(refresh_token: str) -> dict:
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.post(
|
|
SPOTIFY_TOKEN_URL,
|
|
data={
|
|
"grant_type": "refresh_token",
|
|
"refresh_token": refresh_token,
|
|
"client_id": settings.SPOTIFY_CLIENT_ID,
|
|
"client_secret": settings.SPOTIFY_CLIENT_SECRET,
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
|
|
async def get_spotify_user(access_token: str) -> dict:
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.get(
|
|
f"{SPOTIFY_API_URL}/me",
|
|
headers={"Authorization": f"Bearer {access_token}"},
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
|
|
async def get_user_playlists(access_token: str) -> list[dict]:
|
|
playlists = []
|
|
url = f"{SPOTIFY_API_URL}/me/playlists?limit=50"
|
|
async with httpx.AsyncClient() as client:
|
|
while url:
|
|
response = await client.get(
|
|
url, headers={"Authorization": f"Bearer {access_token}"}
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
for item in data["items"]:
|
|
playlists.append({
|
|
"id": item["id"],
|
|
"name": item["name"],
|
|
"track_count": item["tracks"]["total"],
|
|
"image_url": item["images"][0]["url"] if item.get("images") else None,
|
|
})
|
|
url = data.get("next")
|
|
return playlists
|
|
|
|
|
|
async def get_playlist_tracks(access_token: str, playlist_id: str) -> list[dict]:
|
|
tracks = []
|
|
url = f"{SPOTIFY_API_URL}/playlists/{playlist_id}/tracks?limit=100"
|
|
async with httpx.AsyncClient() as client:
|
|
while url:
|
|
response = await client.get(
|
|
url, headers={"Authorization": f"Bearer {access_token}"}
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
for item in data["items"]:
|
|
t = item.get("track")
|
|
if not t or not t.get("id"):
|
|
continue
|
|
tracks.append({
|
|
"spotify_id": t["id"],
|
|
"title": t["name"],
|
|
"artist": ", ".join(a["name"] for a in t["artists"]),
|
|
"album": t["album"]["name"] if t.get("album") else None,
|
|
"isrc": t.get("external_ids", {}).get("isrc"),
|
|
"preview_url": t.get("preview_url"),
|
|
"image_url": t["album"]["images"][0]["url"]
|
|
if t.get("album", {}).get("images")
|
|
else None,
|
|
})
|
|
url = data.get("next")
|
|
return tracks
|
|
|
|
|
|
async def get_audio_features(access_token: str, track_ids: list[str]) -> list[dict]:
|
|
features = []
|
|
async with httpx.AsyncClient() as client:
|
|
# Spotify allows max 100 IDs per request
|
|
for i in range(0, len(track_ids), 100):
|
|
batch = track_ids[i : i + 100]
|
|
response = await client.get(
|
|
f"{SPOTIFY_API_URL}/audio-features",
|
|
params={"ids": ",".join(batch)},
|
|
headers={"Authorization": f"Bearer {access_token}"},
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
features.extend(data.get("audio_features") or [])
|
|
return features
|