""" Camera Viewer - Live camera grid with Frigate event-driven fullscreen """ import asyncio import json import logging from contextlib import asynccontextmanager from typing import AsyncGenerator import aiomqtt from fastapi import FastAPI, Request from fastapi.responses import HTMLResponse, StreamingResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from .config import settings logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Event queue for SSE clients event_queues: list[asyncio.Queue] = [] async def mqtt_listener(): """Subscribe to Frigate MQTT events and broadcast to SSE clients""" reconnect_interval = 5 while True: try: async with aiomqtt.Client( hostname=settings.MQTT_HOST, port=settings.MQTT_PORT, username=settings.MQTT_USER, password=settings.MQTT_PASSWORD, identifier="camera-viewer" ) as client: logger.info(f"Connected to MQTT broker at {settings.MQTT_HOST}") # Subscribe to Frigate events await client.subscribe("frigate/events") await client.subscribe("frigate/+/person") # Per-camera person topic async for message in client.messages: try: payload = json.loads(message.payload.decode()) # Handle frigate/events topic if message.topic.matches("frigate/events"): event_type = payload.get("type") after = payload.get("after", {}) camera = after.get("camera") label = after.get("label") # Check for person on alert camera if (event_type in ["new", "update"] and camera == settings.ALERT_CAMERA and label == "person"): event = { "type": "person_detected", "camera": camera, "score": after.get("score", 0), "event_id": after.get("id") } logger.info(f"Person detected on {camera}") # Broadcast to all SSE clients for queue in event_queues: await queue.put(event) except json.JSONDecodeError: pass except Exception as e: logger.error(f"Error processing MQTT message: {e}") except aiomqtt.MqttError as e: logger.error(f"MQTT connection error: {e}. Reconnecting in {reconnect_interval}s...") await asyncio.sleep(reconnect_interval) except Exception as e: logger.error(f"Unexpected error in MQTT listener: {e}") await asyncio.sleep(reconnect_interval) @asynccontextmanager async def lifespan(app: FastAPI): """Start MQTT listener on startup""" mqtt_task = asyncio.create_task(mqtt_listener()) logger.info("Camera Viewer started") yield mqtt_task.cancel() try: await mqtt_task except asyncio.CancelledError: pass app = FastAPI(title="Camera Viewer", lifespan=lifespan) app.mount("/static", StaticFiles(directory="app/static"), name="static") templates = Jinja2Templates(directory="app/templates") @app.get("/", response_class=HTMLResponse) async def index(request: Request): """Main camera grid view""" return templates.TemplateResponse("viewer.html", { "request": request, "cameras": settings.CAMERAS, "go2rtc_host": settings.GO2RTC_HOST, "alert_camera": settings.ALERT_CAMERA, "auto_dismiss_seconds": settings.AUTO_DISMISS_SECONDS }) @app.get("/events") async def events() -> StreamingResponse: """SSE endpoint for Frigate events""" queue: asyncio.Queue = asyncio.Queue() event_queues.append(queue) async def event_generator() -> AsyncGenerator[str, None]: try: # Send initial connection confirmation yield f"data: {json.dumps({'type': 'connected'})}\n\n" while True: event = await queue.get() yield f"data: {json.dumps(event)}\n\n" except asyncio.CancelledError: pass finally: event_queues.remove(queue) return StreamingResponse( event_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" } ) @app.get("/health") async def health(): """Health check endpoint""" return { "status": "ok", "mqtt_clients": len(event_queues), "cameras": len(settings.CAMERAS) }