import httpx from urllib.parse import urlencode from app.core.config import settings SPOTIFY_AUTH_URL = "https://accounts.spotify.com/authorize" SPOTIFY_TOKEN_URL = "https://accounts.spotify.com/api/token" SPOTIFY_API_URL = "https://api.spotify.com/v1" SCOPES = "user-read-private user-read-email playlist-read-private playlist-read-collaborative" def get_spotify_auth_url(state: str) -> str: params = { "client_id": settings.SPOTIFY_CLIENT_ID, "response_type": "code", "redirect_uri": settings.SPOTIFY_REDIRECT_URI, "scope": SCOPES, "state": state, } return f"{SPOTIFY_AUTH_URL}?{urlencode(params)}" async def exchange_spotify_code(code: str) -> dict: async with httpx.AsyncClient() as client: response = await client.post( SPOTIFY_TOKEN_URL, data={ "grant_type": "authorization_code", "code": code, "redirect_uri": settings.SPOTIFY_REDIRECT_URI, "client_id": settings.SPOTIFY_CLIENT_ID, "client_secret": settings.SPOTIFY_CLIENT_SECRET, }, ) response.raise_for_status() return response.json() async def refresh_spotify_token(refresh_token: str) -> dict: async with httpx.AsyncClient() as client: response = await client.post( SPOTIFY_TOKEN_URL, data={ "grant_type": "refresh_token", "refresh_token": refresh_token, "client_id": settings.SPOTIFY_CLIENT_ID, "client_secret": settings.SPOTIFY_CLIENT_SECRET, }, ) response.raise_for_status() return response.json() async def get_spotify_user(access_token: str) -> dict: async with httpx.AsyncClient() as client: response = await client.get( f"{SPOTIFY_API_URL}/me", headers={"Authorization": f"Bearer {access_token}"}, ) response.raise_for_status() return response.json() async def get_user_playlists(access_token: str) -> list[dict]: playlists = [] url = f"{SPOTIFY_API_URL}/me/playlists?limit=50" async with httpx.AsyncClient() as client: while url: response = await client.get( url, headers={"Authorization": f"Bearer {access_token}"} ) response.raise_for_status() data = response.json() for item in data["items"]: playlists.append({ "id": item["id"], "name": item["name"], "track_count": item["tracks"]["total"], "image_url": item["images"][0]["url"] if item.get("images") else None, }) url = data.get("next") return playlists async def get_playlist_tracks(access_token: str, playlist_id: str) -> list[dict]: tracks = [] url = f"{SPOTIFY_API_URL}/playlists/{playlist_id}/tracks?limit=100" async with httpx.AsyncClient() as client: while url: response = await client.get( url, headers={"Authorization": f"Bearer {access_token}"} ) response.raise_for_status() data = response.json() for item in data["items"]: t = item.get("track") if not t or not t.get("id"): continue tracks.append({ "spotify_id": t["id"], "title": t["name"], "artist": ", ".join(a["name"] for a in t["artists"]), "album": t["album"]["name"] if t.get("album") else None, "isrc": t.get("external_ids", {}).get("isrc"), "preview_url": t.get("preview_url"), "image_url": t["album"]["images"][0]["url"] if t.get("album", {}).get("images") else None, }) url = data.get("next") return tracks async def get_audio_features(access_token: str, track_ids: list[str]) -> list[dict]: features = [] async with httpx.AsyncClient() as client: # Spotify allows max 100 IDs per request for i in range(0, len(track_ids), 100): batch = track_ids[i : i + 100] response = await client.get( f"{SPOTIFY_API_URL}/audio-features", params={"ids": ",".join(batch)}, headers={"Authorization": f"Bearer {access_token}"}, ) response.raise_for_status() data = response.json() features.extend(data.get("audio_features") or []) return features