From 898af6022b6040ae10fa7b2ab7dc89422db41fce Mon Sep 17 00:00:00 2001 From: root Date: Fri, 6 Mar 2026 10:00:13 -0600 Subject: [PATCH] Improve stream reliability with stall detection and buffer management Add video stall detection that monitors currentTime and auto-reconnects after 15s of frozen video. Add MSE SourceBuffer trimming to prevent QuotaExceededError from killing streams silently. Handle appendBuffer errors with safe wrapper and SourceBuffer error listener. Wait for ICE gathering before sending WebRTC offers. Add go2rtc stream availability endpoint. Improve backend proxy error logging. Co-Authored-By: Claude Opus 4.6 --- backend/routes/go2rtc_proxy.py | 24 ++++++++- frontend/src/hooks/useStream.ts | 57 +++++++++++++++++++- frontend/src/services/go2rtc.ts | 92 ++++++++++++++++++++++++++++++--- 3 files changed, 164 insertions(+), 9 deletions(-) diff --git a/backend/routes/go2rtc_proxy.py b/backend/routes/go2rtc_proxy.py index 4d7860a..3932ab8 100644 --- a/backend/routes/go2rtc_proxy.py +++ b/backend/routes/go2rtc_proxy.py @@ -11,6 +11,20 @@ logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/go2rtc") +@router.get("/streams") +async def get_streams(): + """Fetch available streams from go2rtc.""" + config = get_config() + target = f"{config.go2rtc.url}/api/streams" + try: + async with httpx.AsyncClient() as client: + resp = await client.get(target, timeout=5.0) + return resp.json() + except Exception: + logger.error("Failed to fetch go2rtc streams from %s", target) + return {} + + @router.post("/webrtc") async def proxy_webrtc(request: Request, src: str): """Proxy WebRTC SDP exchange to go2rtc.""" @@ -66,5 +80,11 @@ async def proxy_mse_ws(ws: WebSocket, src: str): forward_to_client(), forward_to_upstream(), ) - except (WebSocketDisconnect, websockets.ConnectionClosed, Exception): - pass + except WebSocketDisconnect: + logger.debug("Client disconnected from MSE proxy for %s", src) + except websockets.ConnectionClosed: + logger.debug("Upstream go2rtc closed for %s", src) + except ConnectionRefusedError: + logger.error("Cannot reach go2rtc at %s for stream %s", target, src) + except Exception: + logger.exception("MSE proxy error for stream %s", src) diff --git a/frontend/src/hooks/useStream.ts b/frontend/src/hooks/useStream.ts index 45ec1f2..624fd03 100644 --- a/frontend/src/hooks/useStream.ts +++ b/frontend/src/hooks/useStream.ts @@ -3,6 +3,9 @@ import { Go2RTCWebRTC, Go2RTCMSE } from '@/services/go2rtc'; const RECONNECT_DELAY = 3000; const MAX_RECONNECT_DELAY = 30000; +const MSE_READY_TIMEOUT = 5000; +const STALL_CHECK_INTERVAL = 5000; +const STALL_THRESHOLD = 3; // consecutive stall checks before reconnecting interface UseStreamOptions { streamName: string; @@ -24,6 +27,9 @@ export function useStream({ streamName, delayMs = 0, enabled = true }: UseStream const reconnectTimer = useRef>(); const reconnectDelay = useRef(RECONNECT_DELAY); const mountedRef = useRef(true); + const stallDetector = useRef>(); + const lastPlayTime = useRef(0); + const stallCount = useRef(0); const [isConnecting, setIsConnecting] = useState(true); const [error, setError] = useState(null); const [retryCount, setRetryCount] = useState(0); @@ -48,10 +54,17 @@ export function useStream({ streamName, delayMs = 0, enabled = true }: UseStream mountedRef.current = true; reconnectDelay.current = RECONNECT_DELAY; + stallCount.current = 0; + lastPlayTime.current = 0; let initTimer: ReturnType; let readyCheck: ReturnType; + let mseTimeout: ReturnType; const cleanup = () => { + if (stallDetector.current) { + clearInterval(stallDetector.current); + stallDetector.current = undefined; + } mseRef.current?.disconnect(); mseRef.current = null; webrtcRef.current?.disconnect(); @@ -65,6 +78,32 @@ export function useStream({ streamName, delayMs = 0, enabled = true }: UseStream scheduleReconnect(); }; + const startStallDetection = () => { + if (stallDetector.current) clearInterval(stallDetector.current); + stallCount.current = 0; + lastPlayTime.current = videoRef.current?.currentTime ?? 0; + + stallDetector.current = setInterval(() => { + if (!mountedRef.current) return; + const v = videoRef.current; + if (!v || v.paused) return; + + const currentTime = v.currentTime; + if (currentTime === lastPlayTime.current && currentTime > 0) { + stallCount.current++; + if (stallCount.current >= STALL_THRESHOLD) { + console.warn(`Stream stalled for ${streamName}, reconnecting...`); + cleanup(); + setError('Stream stalled'); + scheduleReconnect(); + } + } else { + stallCount.current = 0; + } + lastPlayTime.current = currentTime; + }, STALL_CHECK_INTERVAL); + }; + const connectMSE = async () => { try { setIsConnecting(true); @@ -77,20 +116,34 @@ export function useStream({ streamName, delayMs = 0, enabled = true }: UseStream if (!videoRef.current) return; await mse.connect(videoRef.current, onDisconnect); - // Poll for video readiness + // Poll for video readiness with timeout fallback to WebRTC const checkReady = () => { if (!mountedRef.current) return; const v = videoRef.current; if (v && v.readyState >= 2) { + clearTimeout(mseTimeout); setIsConnecting(false); setError(null); reconnectDelay.current = RECONNECT_DELAY; + startStallDetection(); } else { readyCheck = setTimeout(checkReady, 300); } }; readyCheck = setTimeout(checkReady, 300); + // Timeout: if MSE doesn't produce video, fall back to WebRTC + mseTimeout = setTimeout(() => { + if (!mountedRef.current) return; + const v = videoRef.current; + if (!v || v.readyState < 2) { + console.warn(`MSE timeout for ${streamName}, trying WebRTC...`); + clearTimeout(readyCheck); + cleanup(); + connectWebRTC(); + } + }, MSE_READY_TIMEOUT); + } catch (err) { if (!mountedRef.current) return; console.warn(`MSE failed for ${streamName}, trying WebRTC...`, err); @@ -114,6 +167,7 @@ export function useStream({ streamName, delayMs = 0, enabled = true }: UseStream setIsConnecting(false); setError(null); reconnectDelay.current = RECONNECT_DELAY; + startStallDetection(); } }, onDisconnect, @@ -137,6 +191,7 @@ export function useStream({ streamName, delayMs = 0, enabled = true }: UseStream mountedRef.current = false; clearTimeout(initTimer); clearTimeout(readyCheck); + clearTimeout(mseTimeout); clearTimeout(reconnectTimer.current); cleanup(); }; diff --git a/frontend/src/services/go2rtc.ts b/frontend/src/services/go2rtc.ts index 24d4ef8..c25e727 100644 --- a/frontend/src/services/go2rtc.ts +++ b/frontend/src/services/go2rtc.ts @@ -32,8 +32,6 @@ export class Go2RTCWebRTC { } }; - this.pc.onicecandidate = () => {}; - this.pc.onconnectionstatechange = () => { const state = this.pc?.connectionState; if (!this.disposed && (state === 'disconnected' || state === 'failed')) { @@ -47,12 +45,15 @@ export class Go2RTCWebRTC { const offer = await this.pc.createOffer(); await this.pc.setLocalDescription(offer); + // Wait for ICE gathering to complete (or timeout after 2s) + const localDesc = await this.waitForIceGathering(2000); + // Use backend proxy instead of direct go2rtc const url = `/api/go2rtc/webrtc?src=${encodeURIComponent(this.streamName)}`; const res = await fetch(url, { method: 'POST', headers: { 'Content-Type': 'application/sdp' }, - body: offer.sdp, + body: localDesc.sdp, }); if (!res.ok) { @@ -63,12 +64,39 @@ export class Go2RTCWebRTC { await this.pc.setRemoteDescription({ type: 'answer', sdp: answerSdp }); } + private waitForIceGathering(timeoutMs: number): Promise { + return new Promise((resolve) => { + if (!this.pc) { + resolve(this.pc!.localDescription!); + return; + } + + if (this.pc.iceGatheringState === 'complete') { + resolve(this.pc.localDescription!); + return; + } + + const timeout = setTimeout(() => { + if (this.pc?.localDescription) { + resolve(this.pc.localDescription); + } + }, timeoutMs); + + this.pc.onicegatheringstatechange = () => { + if (this.pc?.iceGatheringState === 'complete') { + clearTimeout(timeout); + resolve(this.pc.localDescription!); + } + }; + }); + } + private cleanup(): void { this.mediaStream?.getTracks().forEach((t) => t.stop()); this.mediaStream = null; if (this.pc) { this.pc.ontrack = null; - this.pc.onicecandidate = null; + this.pc.onicegatheringstatechange = null; this.pc.onconnectionstatechange = null; this.pc.close(); this.pc = null; @@ -87,6 +115,9 @@ export class Go2RTCWebRTC { } } +const MAX_BUFFER_SECONDS = 30; +const BUFFER_TRIM_SECONDS = 15; + export class Go2RTCMSE { private mediaSource: MediaSource | null = null; private sourceBuffer: SourceBuffer | null = null; @@ -96,6 +127,7 @@ export class Go2RTCMSE { private queue: ArrayBuffer[] = []; private onDisconnectCb: (() => void) | null = null; private disposed = false; + private bufferTrimInterval: ReturnType | null = null; constructor(streamName: string, _go2rtcUrl?: string) { this.streamName = streamName; @@ -145,10 +177,13 @@ export class Go2RTCMSE { if (this.sourceBuffer.updating) { this.queue.push(event.data); } else { - this.sourceBuffer.appendBuffer(event.data); + this.safeAppend(event.data); } } }; + + // Periodically trim buffer to prevent unbounded growth + this.bufferTrimInterval = setInterval(() => this.trimBuffer(), 10000); } private initSourceBuffer(codec: string): void { @@ -158,7 +193,13 @@ export class Go2RTCMSE { this.sourceBuffer.mode = 'segments'; this.sourceBuffer.addEventListener('updateend', () => { if (this.queue.length > 0 && this.sourceBuffer && !this.sourceBuffer.updating) { - this.sourceBuffer.appendBuffer(this.queue.shift()!); + this.safeAppend(this.queue.shift()!); + } + }); + this.sourceBuffer.addEventListener('error', () => { + console.error(`SourceBuffer error for ${this.streamName}`); + if (!this.disposed) { + this.onDisconnectCb?.(); } }); } catch (e) { @@ -166,8 +207,47 @@ export class Go2RTCMSE { } } + private safeAppend(data: ArrayBuffer): void { + try { + this.sourceBuffer!.appendBuffer(data); + } catch (e) { + if (e instanceof DOMException && e.name === 'QuotaExceededError') { + console.warn(`Buffer quota exceeded for ${this.streamName}, trimming...`); + this.queue.unshift(data); + this.trimBuffer(); + } else { + console.error(`appendBuffer error for ${this.streamName}:`, e); + if (!this.disposed) { + this.onDisconnectCb?.(); + } + } + } + } + + private trimBuffer(): void { + if (!this.sourceBuffer || this.sourceBuffer.updating || !this.videoElement) return; + const buffered = this.sourceBuffer.buffered; + if (buffered.length === 0) return; + + const currentTime = this.videoElement.currentTime; + const bufferStart = buffered.start(0); + const bufferedAmount = currentTime - bufferStart; + + if (bufferedAmount > MAX_BUFFER_SECONDS) { + try { + this.sourceBuffer.remove(bufferStart, currentTime - BUFFER_TRIM_SECONDS); + } catch { + // remove() can throw if already updating + } + } + } + disconnect(): void { this.disposed = true; + if (this.bufferTrimInterval) { + clearInterval(this.bufferTrimInterval); + this.bufferTrimInterval = null; + } this.ws?.close(); this.ws = null; if (this.mediaSource?.readyState === 'open') {