#!/usr/bin/env python3 """Prometheus exporter for go2rtc and Frigate camera stream health.""" import logging import signal import sys import time from pathlib import Path import requests import yaml from prometheus_client import Gauge, start_http_server logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) log = logging.getLogger("go2rtc-exporter") # --- Metrics --- go2rtc_stream_up = Gauge( "go2rtc_stream_up", "Whether a go2rtc stream has active producers (1=up, 0=down)", ["camera"], ) go2rtc_stream_bytes_per_second = Gauge( "go2rtc_stream_bytes_per_second", "Bytes per second received by go2rtc producer", ["camera"], ) go2rtc_stream_consumers = Gauge( "go2rtc_stream_consumers", "Number of consumers connected to the stream", ["camera"], ) frigate_camera_fps = Gauge( "frigate_camera_fps", "Camera FPS reported by Frigate", ["camera"], ) go2rtc_up = Gauge( "go2rtc_up", "Whether go2rtc API is reachable (1=up, 0=down)", ) frigate_up = Gauge( "frigate_up", "Whether Frigate API is reachable (1=up, 0=down)", ) # --- State --- prev_bytes: dict[str, int] = {} prev_time: dict[str, float] = {} def load_config(path: str) -> dict: with open(path) as f: return yaml.safe_load(f) def poll_go2rtc(config: dict) -> None: url = f"{config['go2rtc_url']}/api/streams" timeout = config.get("request_timeout", 5) cameras = config["cameras"] try: resp = requests.get(url, timeout=timeout) resp.raise_for_status() streams = resp.json() go2rtc_up.set(1) except Exception as e: log.warning("go2rtc unreachable: %s", e) go2rtc_up.set(0) for cam in cameras: go2rtc_stream_up.labels(camera=cam).set(0) go2rtc_stream_bytes_per_second.labels(camera=cam).set(0) go2rtc_stream_consumers.labels(camera=cam).set(0) return now = time.monotonic() for cam in cameras: stream = streams.get(cam) if stream is None: go2rtc_stream_up.labels(camera=cam).set(0) go2rtc_stream_bytes_per_second.labels(camera=cam).set(0) go2rtc_stream_consumers.labels(camera=cam).set(0) continue producers = stream.get("producers") or [] consumers = stream.get("consumers") or [] # A stream is "up" if it has at least one producer has_producer = len(producers) > 0 go2rtc_stream_up.labels(camera=cam).set(1 if has_producer else 0) go2rtc_stream_consumers.labels(camera=cam).set(len(consumers)) # Calculate bytes/sec from delta total_bytes = sum(p.get("bytes_recv", 0) for p in producers) if cam in prev_bytes and cam in prev_time: dt = now - prev_time[cam] if dt > 0: dbytes = total_bytes - prev_bytes[cam] # Handle counter reset (go2rtc restart) if dbytes < 0: dbytes = total_bytes bps = dbytes / dt go2rtc_stream_bytes_per_second.labels(camera=cam).set(round(bps, 1)) else: go2rtc_stream_bytes_per_second.labels(camera=cam).set(0) else: # First poll — no delta yet go2rtc_stream_bytes_per_second.labels(camera=cam).set(0) prev_bytes[cam] = total_bytes prev_time[cam] = now def poll_frigate(config: dict) -> None: url = f"{config['frigate_url']}/api/stats" timeout = config.get("request_timeout", 5) cameras = config["cameras"] try: resp = requests.get(url, timeout=timeout) resp.raise_for_status() stats = resp.json() frigate_up.set(1) except Exception as e: log.warning("Frigate unreachable: %s", e) frigate_up.set(0) for cam in cameras: frigate_camera_fps.labels(camera=cam).set(0) return cam_stats = stats.get("cameras", {}) for cam in cameras: cs = cam_stats.get(cam) if cs and isinstance(cs, dict): frigate_camera_fps.labels(camera=cam).set(cs.get("camera_fps", 0)) else: frigate_camera_fps.labels(camera=cam).set(0) def main() -> None: config_path = Path(__file__).parent / "config.yaml" if len(sys.argv) > 1: config_path = Path(sys.argv[1]) config = load_config(str(config_path)) port = config.get("metrics_port", 9199) interval = config.get("poll_interval", 15) log.info("Starting go2rtc exporter on port %d (poll every %ds)", port, interval) log.info("Monitoring cameras: %s", ", ".join(config["cameras"])) start_http_server(port) shutdown = False def handle_signal(signum, frame): nonlocal shutdown log.info("Received signal %d, shutting down", signum) shutdown = True signal.signal(signal.SIGTERM, handle_signal) signal.signal(signal.SIGINT, handle_signal) while not shutdown: try: poll_go2rtc(config) poll_frigate(config) except Exception: log.exception("Unexpected error during poll") time.sleep(interval) log.info("Exporter stopped") if __name__ == "__main__": main()