Track estimated Anthropic API costs per request across all Claude API call sites (recommender, analyze, artist-dive, generate-playlist, crate, rabbit-hole, playlist-fix, timeline, compatibility). Log token usage and estimated cost to the app logger. Aggregate costs in admin stats endpoint and display total/today costs and token usage in the admin dashboard.
226 lines
7.5 KiB
Python
226 lines
7.5 KiB
Python
import json
|
|
import logging
|
|
|
|
import anthropic
|
|
|
|
api_logger = logging.getLogger("app")
|
|
from fastapi import APIRouter, Depends, HTTPException
|
|
from pydantic import BaseModel
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.core.config import settings
|
|
from app.core.database import get_db
|
|
from app.core.security import get_current_user
|
|
from app.models.user import User
|
|
from app.models.playlist import Playlist
|
|
from app.models.track import Track
|
|
from app.services.recommender import build_taste_profile
|
|
|
|
router = APIRouter(prefix="/profile", tags=["profile"])
|
|
|
|
|
|
class CompatibilityRequest(BaseModel):
|
|
friend_email: str
|
|
|
|
|
|
class CompatibilityResponse(BaseModel):
|
|
friend_name: str
|
|
compatibility_score: int
|
|
shared_genres: list[str]
|
|
unique_to_you: list[str]
|
|
unique_to_them: list[str]
|
|
shared_artists: list[str]
|
|
insight: str
|
|
recommendations: list[dict]
|
|
|
|
|
|
async def _get_user_tracks(db: AsyncSession, user_id: int) -> list[Track]:
|
|
"""Load all tracks across all playlists for a user."""
|
|
result = await db.execute(
|
|
select(Playlist).where(Playlist.user_id == user_id)
|
|
)
|
|
playlists = list(result.scalars().all())
|
|
|
|
all_tracks = []
|
|
for p in playlists:
|
|
result = await db.execute(select(Track).where(Track.playlist_id == p.id))
|
|
all_tracks.extend(result.scalars().all())
|
|
return all_tracks
|
|
|
|
|
|
def _extract_genres(tracks: list[Track]) -> set[str]:
|
|
"""Get the set of genres from a user's tracks."""
|
|
genres = set()
|
|
for t in tracks:
|
|
if t.genres:
|
|
for g in t.genres:
|
|
genres.add(g)
|
|
return genres
|
|
|
|
|
|
def _extract_artists(tracks: list[Track]) -> set[str]:
|
|
"""Get the set of artists from a user's tracks."""
|
|
return {t.artist for t in tracks}
|
|
|
|
|
|
def _audio_feature_avg(tracks: list[Track], attr: str) -> float:
|
|
"""Calculate the average of an audio feature across tracks."""
|
|
vals = [getattr(t, attr) for t in tracks if getattr(t, attr) is not None]
|
|
return sum(vals) / len(vals) if vals else 0.0
|
|
|
|
|
|
def _calculate_compatibility(
|
|
my_tracks: list[Track],
|
|
their_tracks: list[Track],
|
|
) -> tuple[int, list[str], list[str], list[str], list[str]]:
|
|
"""Calculate a weighted compatibility score between two users.
|
|
|
|
Returns (score, shared_genres, unique_to_you, unique_to_them, shared_artists).
|
|
"""
|
|
my_genres = _extract_genres(my_tracks)
|
|
their_genres = _extract_genres(their_tracks)
|
|
my_artists = _extract_artists(my_tracks)
|
|
their_artists = _extract_artists(their_tracks)
|
|
|
|
shared_genres = sorted(my_genres & their_genres)
|
|
unique_to_you = sorted(my_genres - their_genres)
|
|
unique_to_them = sorted(their_genres - my_genres)
|
|
shared_artists = sorted(my_artists & their_artists)
|
|
|
|
# Genre overlap (40% weight)
|
|
all_genres = my_genres | their_genres
|
|
genre_score = (len(shared_genres) / len(all_genres) * 100) if all_genres else 0
|
|
|
|
# Shared artists (30% weight)
|
|
all_artists = my_artists | their_artists
|
|
artist_score = (len(shared_artists) / len(all_artists) * 100) if all_artists else 0
|
|
|
|
# Audio feature similarity (30% weight)
|
|
feature_diffs = []
|
|
for attr in ("energy", "valence", "danceability"):
|
|
my_avg = _audio_feature_avg(my_tracks, attr)
|
|
their_avg = _audio_feature_avg(their_tracks, attr)
|
|
feature_diffs.append(abs(my_avg - their_avg))
|
|
avg_diff = sum(feature_diffs) / len(feature_diffs) if feature_diffs else 0
|
|
feature_score = max(0, (1 - avg_diff) * 100)
|
|
|
|
score = int(genre_score * 0.4 + artist_score * 0.3 + feature_score * 0.3)
|
|
score = max(0, min(100, score))
|
|
|
|
return score, shared_genres, unique_to_you, unique_to_them, shared_artists
|
|
|
|
|
|
async def _generate_ai_insight(
|
|
profile1: dict,
|
|
profile2: dict,
|
|
score: int,
|
|
shared_genres: list[str],
|
|
shared_artists: list[str],
|
|
) -> tuple[str, list[dict]]:
|
|
"""Call Claude to generate an insight and shared recommendations."""
|
|
prompt = f"""Two music lovers want to know their taste compatibility.
|
|
|
|
User 1 taste profile:
|
|
{json.dumps(profile1, indent=2)}
|
|
|
|
User 2 taste profile:
|
|
{json.dumps(profile2, indent=2)}
|
|
|
|
Their compatibility score is {score}%.
|
|
Shared genres: {", ".join(shared_genres) if shared_genres else "None"}
|
|
Shared artists: {", ".join(shared_artists) if shared_artists else "None"}
|
|
|
|
Respond with JSON:
|
|
{{
|
|
"insight": "A fun 2-3 sentence description of their musical relationship",
|
|
"recommendations": [
|
|
{{"title": "...", "artist": "...", "reason": "Why both would love this"}}
|
|
]
|
|
}}
|
|
Return ONLY the JSON. Include exactly 5 recommendations."""
|
|
|
|
client = anthropic.Anthropic(api_key=settings.ANTHROPIC_API_KEY)
|
|
message = client.messages.create(
|
|
model="claude-haiku-4-5-20251001",
|
|
max_tokens=1024,
|
|
messages=[{"role": "user", "content": prompt}],
|
|
)
|
|
|
|
# Track API cost (Haiku: $0.80/M input, $4/M output)
|
|
input_tokens = message.usage.input_tokens
|
|
output_tokens = message.usage.output_tokens
|
|
cost = (input_tokens * 0.80 / 1_000_000) + (output_tokens * 4 / 1_000_000)
|
|
api_logger.info(f"API_COST|model=claude-haiku|input={input_tokens}|output={output_tokens}|cost=${cost:.4f}|user=system|endpoint=compatibility")
|
|
|
|
try:
|
|
text = message.content[0].text.strip()
|
|
if text.startswith("```"):
|
|
text = text.split("\n", 1)[1].rsplit("```", 1)[0].strip()
|
|
data = json.loads(text)
|
|
return data.get("insight", ""), data.get("recommendations", [])
|
|
except (json.JSONDecodeError, IndexError, KeyError):
|
|
return "These two listeners have an interesting musical connection!", []
|
|
|
|
|
|
@router.post("/compatibility", response_model=CompatibilityResponse)
|
|
async def check_compatibility(
|
|
data: CompatibilityRequest,
|
|
user: User = Depends(get_current_user),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
"""Compare your taste profile with another user."""
|
|
if data.friend_email.lower() == user.email.lower():
|
|
raise HTTPException(status_code=400, detail="You can't compare with yourself!")
|
|
|
|
# Look up the friend
|
|
result = await db.execute(
|
|
select(User).where(User.email == data.friend_email.lower())
|
|
)
|
|
friend = result.scalar_one_or_none()
|
|
if not friend:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail="No user found with that email. They need to have a Vynl account first!",
|
|
)
|
|
|
|
# Load tracks for both users
|
|
my_tracks = await _get_user_tracks(db, user.id)
|
|
their_tracks = await _get_user_tracks(db, friend.id)
|
|
|
|
if not my_tracks:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="You need to import some playlists first!",
|
|
)
|
|
if not their_tracks:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Your friend hasn't imported any playlists yet!",
|
|
)
|
|
|
|
# Calculate compatibility
|
|
score, shared_genres, unique_to_you, unique_to_them, shared_artists = (
|
|
_calculate_compatibility(my_tracks, their_tracks)
|
|
)
|
|
|
|
# Build taste profiles for AI
|
|
profile1 = build_taste_profile(my_tracks)
|
|
profile2 = build_taste_profile(their_tracks)
|
|
|
|
# Generate AI insight and recommendations
|
|
insight, recommendations = await _generate_ai_insight(
|
|
profile1, profile2, score, shared_genres[:10], shared_artists[:10]
|
|
)
|
|
|
|
return CompatibilityResponse(
|
|
friend_name=friend.name,
|
|
compatibility_score=score,
|
|
shared_genres=shared_genres[:15],
|
|
unique_to_you=unique_to_you[:10],
|
|
unique_to_them=unique_to_them[:10],
|
|
shared_artists=shared_artists[:15],
|
|
insight=insight,
|
|
recommendations=recommendations,
|
|
)
|