Files
vynl/backend/app/api/endpoints/admin.py

152 lines
5.0 KiB
Python

import logging
import io
from datetime import datetime, timezone, timedelta
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_db
from app.core.security import get_current_user
from app.models.user import User
from app.models.playlist import Playlist
from app.models.track import Track
from app.models.recommendation import Recommendation
router = APIRouter(prefix="/admin", tags=["admin"])
ADMIN_EMAILS = ["chris.ryan@deepcutsai.com"]
# In-memory log buffer for admin viewing
LOG_BUFFER_SIZE = 500
_log_buffer: list[dict] = []
class AdminLogHandler(logging.Handler):
"""Captures log records into an in-memory buffer for the admin UI."""
def emit(self, record):
entry = {
"timestamp": datetime.fromtimestamp(record.created, tz=timezone.utc).isoformat(),
"level": record.levelname,
"logger": record.name,
"message": self.format(record),
}
_log_buffer.append(entry)
if len(_log_buffer) > LOG_BUFFER_SIZE:
_log_buffer.pop(0)
# Attach handler to root logger and uvicorn
_handler = AdminLogHandler()
_handler.setLevel(logging.INFO)
_handler.setFormatter(logging.Formatter("%(message)s"))
logging.getLogger().addHandler(_handler)
logging.getLogger("uvicorn.access").addHandler(_handler)
logging.getLogger("uvicorn.error").addHandler(_handler)
logging.getLogger("app").addHandler(_handler)
# Create app logger for explicit logging
app_logger = logging.getLogger("app")
app_logger.setLevel(logging.INFO)
@router.get("/stats")
async def get_stats(
user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
if user.email not in ADMIN_EMAILS:
raise HTTPException(status_code=403, detail="Admin only")
now = datetime.now(timezone.utc)
today = now.replace(hour=0, minute=0, second=0, microsecond=0)
week_ago = now - timedelta(days=7)
month_ago = now - timedelta(days=30)
# User stats
total_users = (await db.execute(select(func.count(User.id)))).scalar() or 0
pro_users = (await db.execute(select(func.count(User.id)).where(User.is_pro == True))).scalar() or 0
# Playlist stats
total_playlists = (await db.execute(select(func.count(Playlist.id)))).scalar() or 0
total_tracks = (await db.execute(select(func.count(Track.id)))).scalar() or 0
# Recommendation stats
total_recs = (await db.execute(select(func.count(Recommendation.id)))).scalar() or 0
recs_today = (await db.execute(
select(func.count(Recommendation.id)).where(Recommendation.created_at >= today)
)).scalar() or 0
recs_this_week = (await db.execute(
select(func.count(Recommendation.id)).where(Recommendation.created_at >= week_ago)
)).scalar() or 0
recs_this_month = (await db.execute(
select(func.count(Recommendation.id)).where(Recommendation.created_at >= month_ago)
)).scalar() or 0
saved_recs = (await db.execute(
select(func.count(Recommendation.id)).where(Recommendation.saved == True)
)).scalar() or 0
disliked_recs = (await db.execute(
select(func.count(Recommendation.id)).where(Recommendation.disliked == True)
)).scalar() or 0
# Per-user breakdown
user_stats_result = await db.execute(
select(
User.id, User.name, User.email, User.is_pro, User.created_at,
func.count(Recommendation.id).label("rec_count"),
)
.outerjoin(Recommendation, Recommendation.user_id == User.id)
.group_by(User.id)
.order_by(User.id)
)
user_breakdown = [
{
"id": row.id,
"name": row.name,
"email": row.email,
"is_pro": row.is_pro,
"created_at": row.created_at.isoformat() if row.created_at else None,
"recommendation_count": row.rec_count,
}
for row in user_stats_result.all()
]
return {
"users": {
"total": total_users,
"pro": pro_users,
"free": total_users - pro_users,
},
"playlists": {
"total": total_playlists,
"total_tracks": total_tracks,
},
"recommendations": {
"total": total_recs,
"today": recs_today,
"this_week": recs_this_week,
"this_month": recs_this_month,
"saved": saved_recs,
"disliked": disliked_recs,
},
"user_breakdown": user_breakdown,
}
@router.get("/logs")
async def get_logs(
user: User = Depends(get_current_user),
level: str = Query("ALL", description="Filter by level: ALL, ERROR, WARNING, INFO"),
limit: int = Query(100, description="Number of log entries"),
):
if user.email not in ADMIN_EMAILS:
raise HTTPException(status_code=403, detail="Admin only")
logs = _log_buffer[-limit:]
if level != "ALL":
logs = [l for l in logs if l["level"] == level.upper()]
return {"logs": list(reversed(logs)), "total": len(_log_buffer)}