import asyncio import json import logging from typing import Any import aiomqtt from config import get_config logger = logging.getLogger(__name__) # Connected WebSocket clients _ws_clients: set[Any] = set() _mqtt_task: asyncio.Task | None = None def register_ws_client(ws): _ws_clients.add(ws) def unregister_ws_client(ws): _ws_clients.discard(ws) async def broadcast(message: dict): global _ws_clients data = json.dumps(message) disconnected = set() for ws in _ws_clients: try: await ws.send_text(data) except Exception: disconnected.add(ws) if disconnected: _ws_clients -= disconnected async def mqtt_listener(): config = get_config() if not config.alerts.enabled: logger.info("Alerts disabled, MQTT listener not started") return mqtt = config.mqtt topic = f"{mqtt.topic_prefix}/events" while True: try: async with aiomqtt.Client( hostname=mqtt.host, port=mqtt.port, username=mqtt.username or None, password=mqtt.password or None, ) as client: logger.info(f"Connected to MQTT broker at {mqtt.host}:{mqtt.port}") await client.subscribe(topic) async for message in client.messages: try: payload = json.loads(message.payload) await handle_frigate_event(payload) except json.JSONDecodeError: pass except Exception as e: logger.error(f"Error handling MQTT message: {e}") except aiomqtt.MqttError as e: logger.warning(f"MQTT connection lost: {e}, reconnecting in 5s...") await asyncio.sleep(5) except Exception as e: logger.error(f"MQTT error: {e}, reconnecting in 10s...") await asyncio.sleep(10) async def handle_frigate_event(payload: dict): config = get_config() alert_config = config.alerts if not alert_config.enabled: return event_type = payload.get("type") after = payload.get("after", {}) label = after.get("label", "") camera = after.get("camera", "") has_clip = after.get("has_clip", False) has_snapshot = after.get("has_snapshot", False) # Only process new events with matching detection types if event_type != "new": return if label not in alert_config.detection_types: return if alert_config.cameras and camera not in alert_config.cameras: return event = { "type": "alert", "camera": camera, "label": label, "event_id": after.get("id", ""), "has_snapshot": has_snapshot, "has_clip": has_clip, "timestamp": after.get("start_time", 0), } logger.info(f"Person detected on {camera}") await broadcast(event) def start_mqtt(loop: asyncio.AbstractEventLoop | None = None): global _mqtt_task _mqtt_task = asyncio.create_task(mqtt_listener()) return _mqtt_task def get_mqtt_status() -> dict: return { "enabled": get_config().alerts.enabled, "connected": _mqtt_task is not None and not _mqtt_task.done(), "clients": len(_ws_clients), }