import asyncio import logging import httpx import websockets from fastapi import APIRouter, Request, WebSocket, WebSocketDisconnect, Response from config import get_config logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/go2rtc") @router.get("/streams") async def get_streams(): """Fetch available streams from go2rtc.""" config = get_config() target = f"{config.go2rtc.url}/api/streams" try: async with httpx.AsyncClient() as client: resp = await client.get(target, timeout=5.0) return resp.json() except Exception: logger.error("Failed to fetch go2rtc streams from %s", target) return {} @router.post("/webrtc") async def proxy_webrtc(request: Request, src: str): """Proxy WebRTC SDP exchange to go2rtc.""" config = get_config() target = f"{config.go2rtc.url}/api/webrtc?src={src}" body = await request.body() async with httpx.AsyncClient() as client: resp = await client.post( target, content=body, headers={"Content-Type": "application/sdp"}, timeout=10.0, ) return Response( content=resp.content, status_code=resp.status_code, media_type="application/sdp", ) @router.websocket("/ws") async def proxy_mse_ws(ws: WebSocket, src: str): """Proxy MSE WebSocket to go2rtc.""" config = get_config() go2rtc_ws_url = config.go2rtc.url.replace("http", "ws") target = f"{go2rtc_ws_url}/api/ws?src={src}" await ws.accept() try: async with websockets.connect( target, ping_interval=None, ping_timeout=None, close_timeout=5, ) as upstream: async def forward_to_client(): async for msg in upstream: if isinstance(msg, bytes): await ws.send_bytes(msg) else: await ws.send_text(msg) async def forward_to_upstream(): while True: data = await ws.receive_text() await upstream.send(data) await asyncio.gather( forward_to_client(), forward_to_upstream(), ) except WebSocketDisconnect: logger.debug("Client disconnected from MSE proxy for %s", src) except websockets.ConnectionClosed: logger.debug("Upstream go2rtc closed for %s", src) except ConnectionRefusedError: logger.error("Cannot reach go2rtc at %s for stream %s", target, src) except Exception: logger.exception("MSE proxy error for stream %s", src)