diff --git a/backend/routes/go2rtc_proxy.py b/backend/routes/go2rtc_proxy.py index 4d7860a..3932ab8 100644 --- a/backend/routes/go2rtc_proxy.py +++ b/backend/routes/go2rtc_proxy.py @@ -11,6 +11,20 @@ logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/go2rtc") +@router.get("/streams") +async def get_streams(): + """Fetch available streams from go2rtc.""" + config = get_config() + target = f"{config.go2rtc.url}/api/streams" + try: + async with httpx.AsyncClient() as client: + resp = await client.get(target, timeout=5.0) + return resp.json() + except Exception: + logger.error("Failed to fetch go2rtc streams from %s", target) + return {} + + @router.post("/webrtc") async def proxy_webrtc(request: Request, src: str): """Proxy WebRTC SDP exchange to go2rtc.""" @@ -66,5 +80,11 @@ async def proxy_mse_ws(ws: WebSocket, src: str): forward_to_client(), forward_to_upstream(), ) - except (WebSocketDisconnect, websockets.ConnectionClosed, Exception): - pass + except WebSocketDisconnect: + logger.debug("Client disconnected from MSE proxy for %s", src) + except websockets.ConnectionClosed: + logger.debug("Upstream go2rtc closed for %s", src) + except ConnectionRefusedError: + logger.error("Cannot reach go2rtc at %s for stream %s", target, src) + except Exception: + logger.exception("MSE proxy error for stream %s", src) diff --git a/frontend/src/hooks/useStream.ts b/frontend/src/hooks/useStream.ts index 45ec1f2..624fd03 100644 --- a/frontend/src/hooks/useStream.ts +++ b/frontend/src/hooks/useStream.ts @@ -3,6 +3,9 @@ import { Go2RTCWebRTC, Go2RTCMSE } from '@/services/go2rtc'; const RECONNECT_DELAY = 3000; const MAX_RECONNECT_DELAY = 30000; +const MSE_READY_TIMEOUT = 5000; +const STALL_CHECK_INTERVAL = 5000; +const STALL_THRESHOLD = 3; // consecutive stall checks before reconnecting interface UseStreamOptions { streamName: string; @@ -24,6 +27,9 @@ export function useStream({ streamName, delayMs = 0, enabled = true }: UseStream const reconnectTimer = useRef>(); const reconnectDelay = useRef(RECONNECT_DELAY); const mountedRef = useRef(true); + const stallDetector = useRef>(); + const lastPlayTime = useRef(0); + const stallCount = useRef(0); const [isConnecting, setIsConnecting] = useState(true); const [error, setError] = useState(null); const [retryCount, setRetryCount] = useState(0); @@ -48,10 +54,17 @@ export function useStream({ streamName, delayMs = 0, enabled = true }: UseStream mountedRef.current = true; reconnectDelay.current = RECONNECT_DELAY; + stallCount.current = 0; + lastPlayTime.current = 0; let initTimer: ReturnType; let readyCheck: ReturnType; + let mseTimeout: ReturnType; const cleanup = () => { + if (stallDetector.current) { + clearInterval(stallDetector.current); + stallDetector.current = undefined; + } mseRef.current?.disconnect(); mseRef.current = null; webrtcRef.current?.disconnect(); @@ -65,6 +78,32 @@ export function useStream({ streamName, delayMs = 0, enabled = true }: UseStream scheduleReconnect(); }; + const startStallDetection = () => { + if (stallDetector.current) clearInterval(stallDetector.current); + stallCount.current = 0; + lastPlayTime.current = videoRef.current?.currentTime ?? 0; + + stallDetector.current = setInterval(() => { + if (!mountedRef.current) return; + const v = videoRef.current; + if (!v || v.paused) return; + + const currentTime = v.currentTime; + if (currentTime === lastPlayTime.current && currentTime > 0) { + stallCount.current++; + if (stallCount.current >= STALL_THRESHOLD) { + console.warn(`Stream stalled for ${streamName}, reconnecting...`); + cleanup(); + setError('Stream stalled'); + scheduleReconnect(); + } + } else { + stallCount.current = 0; + } + lastPlayTime.current = currentTime; + }, STALL_CHECK_INTERVAL); + }; + const connectMSE = async () => { try { setIsConnecting(true); @@ -77,20 +116,34 @@ export function useStream({ streamName, delayMs = 0, enabled = true }: UseStream if (!videoRef.current) return; await mse.connect(videoRef.current, onDisconnect); - // Poll for video readiness + // Poll for video readiness with timeout fallback to WebRTC const checkReady = () => { if (!mountedRef.current) return; const v = videoRef.current; if (v && v.readyState >= 2) { + clearTimeout(mseTimeout); setIsConnecting(false); setError(null); reconnectDelay.current = RECONNECT_DELAY; + startStallDetection(); } else { readyCheck = setTimeout(checkReady, 300); } }; readyCheck = setTimeout(checkReady, 300); + // Timeout: if MSE doesn't produce video, fall back to WebRTC + mseTimeout = setTimeout(() => { + if (!mountedRef.current) return; + const v = videoRef.current; + if (!v || v.readyState < 2) { + console.warn(`MSE timeout for ${streamName}, trying WebRTC...`); + clearTimeout(readyCheck); + cleanup(); + connectWebRTC(); + } + }, MSE_READY_TIMEOUT); + } catch (err) { if (!mountedRef.current) return; console.warn(`MSE failed for ${streamName}, trying WebRTC...`, err); @@ -114,6 +167,7 @@ export function useStream({ streamName, delayMs = 0, enabled = true }: UseStream setIsConnecting(false); setError(null); reconnectDelay.current = RECONNECT_DELAY; + startStallDetection(); } }, onDisconnect, @@ -137,6 +191,7 @@ export function useStream({ streamName, delayMs = 0, enabled = true }: UseStream mountedRef.current = false; clearTimeout(initTimer); clearTimeout(readyCheck); + clearTimeout(mseTimeout); clearTimeout(reconnectTimer.current); cleanup(); }; diff --git a/frontend/src/services/go2rtc.ts b/frontend/src/services/go2rtc.ts index 24d4ef8..c25e727 100644 --- a/frontend/src/services/go2rtc.ts +++ b/frontend/src/services/go2rtc.ts @@ -32,8 +32,6 @@ export class Go2RTCWebRTC { } }; - this.pc.onicecandidate = () => {}; - this.pc.onconnectionstatechange = () => { const state = this.pc?.connectionState; if (!this.disposed && (state === 'disconnected' || state === 'failed')) { @@ -47,12 +45,15 @@ export class Go2RTCWebRTC { const offer = await this.pc.createOffer(); await this.pc.setLocalDescription(offer); + // Wait for ICE gathering to complete (or timeout after 2s) + const localDesc = await this.waitForIceGathering(2000); + // Use backend proxy instead of direct go2rtc const url = `/api/go2rtc/webrtc?src=${encodeURIComponent(this.streamName)}`; const res = await fetch(url, { method: 'POST', headers: { 'Content-Type': 'application/sdp' }, - body: offer.sdp, + body: localDesc.sdp, }); if (!res.ok) { @@ -63,12 +64,39 @@ export class Go2RTCWebRTC { await this.pc.setRemoteDescription({ type: 'answer', sdp: answerSdp }); } + private waitForIceGathering(timeoutMs: number): Promise { + return new Promise((resolve) => { + if (!this.pc) { + resolve(this.pc!.localDescription!); + return; + } + + if (this.pc.iceGatheringState === 'complete') { + resolve(this.pc.localDescription!); + return; + } + + const timeout = setTimeout(() => { + if (this.pc?.localDescription) { + resolve(this.pc.localDescription); + } + }, timeoutMs); + + this.pc.onicegatheringstatechange = () => { + if (this.pc?.iceGatheringState === 'complete') { + clearTimeout(timeout); + resolve(this.pc.localDescription!); + } + }; + }); + } + private cleanup(): void { this.mediaStream?.getTracks().forEach((t) => t.stop()); this.mediaStream = null; if (this.pc) { this.pc.ontrack = null; - this.pc.onicecandidate = null; + this.pc.onicegatheringstatechange = null; this.pc.onconnectionstatechange = null; this.pc.close(); this.pc = null; @@ -87,6 +115,9 @@ export class Go2RTCWebRTC { } } +const MAX_BUFFER_SECONDS = 30; +const BUFFER_TRIM_SECONDS = 15; + export class Go2RTCMSE { private mediaSource: MediaSource | null = null; private sourceBuffer: SourceBuffer | null = null; @@ -96,6 +127,7 @@ export class Go2RTCMSE { private queue: ArrayBuffer[] = []; private onDisconnectCb: (() => void) | null = null; private disposed = false; + private bufferTrimInterval: ReturnType | null = null; constructor(streamName: string, _go2rtcUrl?: string) { this.streamName = streamName; @@ -145,10 +177,13 @@ export class Go2RTCMSE { if (this.sourceBuffer.updating) { this.queue.push(event.data); } else { - this.sourceBuffer.appendBuffer(event.data); + this.safeAppend(event.data); } } }; + + // Periodically trim buffer to prevent unbounded growth + this.bufferTrimInterval = setInterval(() => this.trimBuffer(), 10000); } private initSourceBuffer(codec: string): void { @@ -158,7 +193,13 @@ export class Go2RTCMSE { this.sourceBuffer.mode = 'segments'; this.sourceBuffer.addEventListener('updateend', () => { if (this.queue.length > 0 && this.sourceBuffer && !this.sourceBuffer.updating) { - this.sourceBuffer.appendBuffer(this.queue.shift()!); + this.safeAppend(this.queue.shift()!); + } + }); + this.sourceBuffer.addEventListener('error', () => { + console.error(`SourceBuffer error for ${this.streamName}`); + if (!this.disposed) { + this.onDisconnectCb?.(); } }); } catch (e) { @@ -166,8 +207,47 @@ export class Go2RTCMSE { } } + private safeAppend(data: ArrayBuffer): void { + try { + this.sourceBuffer!.appendBuffer(data); + } catch (e) { + if (e instanceof DOMException && e.name === 'QuotaExceededError') { + console.warn(`Buffer quota exceeded for ${this.streamName}, trimming...`); + this.queue.unshift(data); + this.trimBuffer(); + } else { + console.error(`appendBuffer error for ${this.streamName}:`, e); + if (!this.disposed) { + this.onDisconnectCb?.(); + } + } + } + } + + private trimBuffer(): void { + if (!this.sourceBuffer || this.sourceBuffer.updating || !this.videoElement) return; + const buffered = this.sourceBuffer.buffered; + if (buffered.length === 0) return; + + const currentTime = this.videoElement.currentTime; + const bufferStart = buffered.start(0); + const bufferedAmount = currentTime - bufferStart; + + if (bufferedAmount > MAX_BUFFER_SECONDS) { + try { + this.sourceBuffer.remove(bufferStart, currentTime - BUFFER_TRIM_SECONDS); + } catch { + // remove() can throw if already updating + } + } + } + disconnect(): void { this.disposed = true; + if (this.bufferTrimInterval) { + clearInterval(this.bufferTrimInterval); + this.bufferTrimInterval = null; + } this.ws?.close(); this.ws = null; if (this.mediaSource?.readyState === 'open') {