Files
camera-viewer/app/main.py

189 lines
6.3 KiB
Python

"""
Camera Viewer - Live camera grid with Frigate event-driven fullscreen
"""
import asyncio
import json
import logging
from contextlib import asynccontextmanager
from typing import AsyncGenerator
import httpx
import aiomqtt
from fastapi import FastAPI, Request, Response
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from .config import settings
# HTTP client for proxying to go2rtc
http_client = httpx.AsyncClient(timeout=10.0)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Event queue for SSE clients
event_queues: list[asyncio.Queue] = []
async def mqtt_listener():
"""Subscribe to Frigate MQTT events and broadcast to SSE clients"""
reconnect_interval = 5
while True:
try:
async with aiomqtt.Client(
hostname=settings.MQTT_HOST,
port=settings.MQTT_PORT,
username=settings.MQTT_USER,
password=settings.MQTT_PASSWORD,
identifier="camera-viewer"
) as client:
logger.info(f"Connected to MQTT broker at {settings.MQTT_HOST}")
# Subscribe to Frigate events
await client.subscribe("frigate/events")
await client.subscribe("frigate/+/person") # Per-camera person topic
async for message in client.messages:
try:
payload = json.loads(message.payload.decode())
# Handle frigate/events topic
if message.topic.matches("frigate/events"):
event_type = payload.get("type")
after = payload.get("after", {})
camera = after.get("camera")
label = after.get("label")
# Check for person on alert camera
if (event_type in ["new", "update"] and
camera == settings.ALERT_CAMERA and
label == "person"):
event = {
"type": "person_detected",
"camera": camera,
"score": after.get("score", 0),
"event_id": after.get("id")
}
logger.info(f"Person detected on {camera}")
# Broadcast to all SSE clients
for queue in event_queues:
await queue.put(event)
except json.JSONDecodeError:
pass
except Exception as e:
logger.error(f"Error processing MQTT message: {e}")
except aiomqtt.MqttError as e:
logger.error(f"MQTT connection error: {e}. Reconnecting in {reconnect_interval}s...")
await asyncio.sleep(reconnect_interval)
except Exception as e:
logger.error(f"Unexpected error in MQTT listener: {e}")
await asyncio.sleep(reconnect_interval)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Start MQTT listener on startup"""
mqtt_task = asyncio.create_task(mqtt_listener())
logger.info("Camera Viewer started")
yield
mqtt_task.cancel()
try:
await mqtt_task
except asyncio.CancelledError:
pass
app = FastAPI(title="Camera Viewer", lifespan=lifespan)
app.mount("/static", StaticFiles(directory="app/static"), name="static")
templates = Jinja2Templates(directory="app/templates")
@app.get("/", response_class=HTMLResponse)
async def index(request: Request):
"""Main camera grid view"""
return templates.TemplateResponse("viewer.html", {
"request": request,
"cameras": settings.CAMERAS,
"go2rtc_host": settings.GO2RTC_HOST,
"alert_camera": settings.ALERT_CAMERA,
"auto_dismiss_seconds": settings.AUTO_DISMISS_SECONDS
})
@app.get("/events")
async def events() -> StreamingResponse:
"""SSE endpoint for Frigate events"""
queue: asyncio.Queue = asyncio.Queue()
event_queues.append(queue)
async def event_generator() -> AsyncGenerator[str, None]:
try:
# Send initial connection confirmation
yield f"data: {json.dumps({'type': 'connected'})}\n\n"
while True:
event = await queue.get()
yield f"data: {json.dumps(event)}\n\n"
except asyncio.CancelledError:
pass
finally:
event_queues.remove(queue)
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no"
}
)
@app.post("/api/webrtc")
async def webrtc_proxy(request: Request, src: str):
"""Proxy WebRTC offers to go2rtc to avoid CORS issues"""
try:
body = await request.body()
response = await http_client.post(
f"http://{settings.GO2RTC_HOST}/api/webrtc?src={src}",
content=body,
headers={"Content-Type": "application/sdp"}
)
return Response(
content=response.content,
status_code=response.status_code,
media_type="application/sdp"
)
except Exception as e:
logger.error(f"WebRTC proxy error for {src}: {e}")
return Response(content=str(e), status_code=500)
@app.get("/api/streams")
async def streams_proxy():
"""Proxy streams list from go2rtc"""
try:
response = await http_client.get(f"http://{settings.GO2RTC_HOST}/api/streams")
return Response(
content=response.content,
status_code=response.status_code,
media_type="application/json"
)
except Exception as e:
logger.error(f"Streams proxy error: {e}")
return Response(content=str(e), status_code=500)
@app.get("/health")
async def health():
"""Health check endpoint"""
return {
"status": "ok",
"mqtt_clients": len(event_queues),
"cameras": len(settings.CAMERAS)
}