""" Camera Viewer - Live camera grid with Frigate event-driven fullscreen """ import asyncio import json import logging from contextlib import asynccontextmanager from typing import AsyncGenerator import httpx import aiomqtt from fastapi import FastAPI, Request, Response from fastapi.responses import HTMLResponse, StreamingResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from .config import settings # HTTP client for proxying to go2rtc http_client = httpx.AsyncClient(timeout=10.0) logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Event queue for SSE clients event_queues: list[asyncio.Queue] = [] async def mqtt_listener(): """Subscribe to Frigate MQTT events and broadcast to SSE clients""" reconnect_interval = 5 while True: try: async with aiomqtt.Client( hostname=settings.MQTT_HOST, port=settings.MQTT_PORT, username=settings.MQTT_USER, password=settings.MQTT_PASSWORD, identifier="camera-viewer" ) as client: logger.info(f"Connected to MQTT broker at {settings.MQTT_HOST}") # Subscribe to Frigate events await client.subscribe("frigate/events") await client.subscribe("frigate/+/person") # Per-camera person topic async for message in client.messages: try: payload = json.loads(message.payload.decode()) # Handle frigate/events topic if message.topic.matches("frigate/events"): event_type = payload.get("type") after = payload.get("after", {}) camera = after.get("camera") label = after.get("label") # Check for person on alert camera if (event_type in ["new", "update"] and camera == settings.ALERT_CAMERA and label == "person"): event = { "type": "person_detected", "camera": camera, "score": after.get("score", 0), "event_id": after.get("id") } logger.info(f"Person detected on {camera}") # Broadcast to all SSE clients for queue in event_queues: await queue.put(event) except json.JSONDecodeError: pass except Exception as e: logger.error(f"Error processing MQTT message: {e}") except aiomqtt.MqttError as e: logger.error(f"MQTT connection error: {e}. Reconnecting in {reconnect_interval}s...") await asyncio.sleep(reconnect_interval) except Exception as e: logger.error(f"Unexpected error in MQTT listener: {e}") await asyncio.sleep(reconnect_interval) @asynccontextmanager async def lifespan(app: FastAPI): """Start MQTT listener on startup""" mqtt_task = asyncio.create_task(mqtt_listener()) logger.info("Camera Viewer started") yield mqtt_task.cancel() try: await mqtt_task except asyncio.CancelledError: pass app = FastAPI(title="Camera Viewer", lifespan=lifespan) app.mount("/static", StaticFiles(directory="app/static"), name="static") templates = Jinja2Templates(directory="app/templates") @app.get("/", response_class=HTMLResponse) async def index(request: Request): """Main camera grid view""" return templates.TemplateResponse("viewer.html", { "request": request, "cameras": settings.CAMERAS, "go2rtc_host": settings.GO2RTC_HOST, "alert_camera": settings.ALERT_CAMERA, "auto_dismiss_seconds": settings.AUTO_DISMISS_SECONDS }) @app.get("/events") async def events() -> StreamingResponse: """SSE endpoint for Frigate events""" queue: asyncio.Queue = asyncio.Queue() event_queues.append(queue) async def event_generator() -> AsyncGenerator[str, None]: try: # Send initial connection confirmation yield f"data: {json.dumps({'type': 'connected'})}\n\n" while True: event = await queue.get() yield f"data: {json.dumps(event)}\n\n" except asyncio.CancelledError: pass finally: event_queues.remove(queue) return StreamingResponse( event_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no" } ) @app.post("/api/webrtc") async def webrtc_proxy(request: Request, src: str): """Proxy WebRTC offers to go2rtc to avoid CORS issues""" try: body = await request.body() response = await http_client.post( f"http://{settings.GO2RTC_HOST}/api/webrtc?src={src}", content=body, headers={"Content-Type": "application/sdp"} ) return Response( content=response.content, status_code=response.status_code, media_type="application/sdp" ) except Exception as e: logger.error(f"WebRTC proxy error for {src}: {e}") return Response(content=str(e), status_code=500) @app.get("/api/streams") async def streams_proxy(): """Proxy streams list from go2rtc""" try: response = await http_client.get(f"http://{settings.GO2RTC_HOST}/api/streams") return Response( content=response.content, status_code=response.status_code, media_type="application/json" ) except Exception as e: logger.error(f"Streams proxy error: {e}") return Response(content=str(e), status_code=500) @app.get("/health") async def health(): """Health check endpoint""" return { "status": "ok", "mqtt_clients": len(event_queues), "cameras": len(settings.CAMERAS) }