Improve stream reliability with stall detection and buffer management

Add video stall detection that monitors currentTime and auto-reconnects
after 15s of frozen video. Add MSE SourceBuffer trimming to prevent
QuotaExceededError from killing streams silently. Handle appendBuffer
errors with safe wrapper and SourceBuffer error listener. Wait for ICE
gathering before sending WebRTC offers. Add go2rtc stream availability
endpoint. Improve backend proxy error logging.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
root
2026-03-06 10:00:13 -06:00
parent c39ec4e954
commit 898af6022b
3 changed files with 164 additions and 9 deletions

View File

@@ -11,6 +11,20 @@ logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/go2rtc")
@router.get("/streams")
async def get_streams():
"""Fetch available streams from go2rtc."""
config = get_config()
target = f"{config.go2rtc.url}/api/streams"
try:
async with httpx.AsyncClient() as client:
resp = await client.get(target, timeout=5.0)
return resp.json()
except Exception:
logger.error("Failed to fetch go2rtc streams from %s", target)
return {}
@router.post("/webrtc")
async def proxy_webrtc(request: Request, src: str):
"""Proxy WebRTC SDP exchange to go2rtc."""
@@ -66,5 +80,11 @@ async def proxy_mse_ws(ws: WebSocket, src: str):
forward_to_client(),
forward_to_upstream(),
)
except (WebSocketDisconnect, websockets.ConnectionClosed, Exception):
pass
except WebSocketDisconnect:
logger.debug("Client disconnected from MSE proxy for %s", src)
except websockets.ConnectionClosed:
logger.debug("Upstream go2rtc closed for %s", src)
except ConnectionRefusedError:
logger.error("Cannot reach go2rtc at %s for stream %s", target, src)
except Exception:
logger.exception("MSE proxy error for stream %s", src)

View File

@@ -3,6 +3,9 @@ import { Go2RTCWebRTC, Go2RTCMSE } from '@/services/go2rtc';
const RECONNECT_DELAY = 3000;
const MAX_RECONNECT_DELAY = 30000;
const MSE_READY_TIMEOUT = 5000;
const STALL_CHECK_INTERVAL = 5000;
const STALL_THRESHOLD = 3; // consecutive stall checks before reconnecting
interface UseStreamOptions {
streamName: string;
@@ -24,6 +27,9 @@ export function useStream({ streamName, delayMs = 0, enabled = true }: UseStream
const reconnectTimer = useRef<ReturnType<typeof setTimeout>>();
const reconnectDelay = useRef(RECONNECT_DELAY);
const mountedRef = useRef(true);
const stallDetector = useRef<ReturnType<typeof setInterval>>();
const lastPlayTime = useRef(0);
const stallCount = useRef(0);
const [isConnecting, setIsConnecting] = useState(true);
const [error, setError] = useState<string | null>(null);
const [retryCount, setRetryCount] = useState(0);
@@ -48,10 +54,17 @@ export function useStream({ streamName, delayMs = 0, enabled = true }: UseStream
mountedRef.current = true;
reconnectDelay.current = RECONNECT_DELAY;
stallCount.current = 0;
lastPlayTime.current = 0;
let initTimer: ReturnType<typeof setTimeout>;
let readyCheck: ReturnType<typeof setTimeout>;
let mseTimeout: ReturnType<typeof setTimeout>;
const cleanup = () => {
if (stallDetector.current) {
clearInterval(stallDetector.current);
stallDetector.current = undefined;
}
mseRef.current?.disconnect();
mseRef.current = null;
webrtcRef.current?.disconnect();
@@ -65,6 +78,32 @@ export function useStream({ streamName, delayMs = 0, enabled = true }: UseStream
scheduleReconnect();
};
const startStallDetection = () => {
if (stallDetector.current) clearInterval(stallDetector.current);
stallCount.current = 0;
lastPlayTime.current = videoRef.current?.currentTime ?? 0;
stallDetector.current = setInterval(() => {
if (!mountedRef.current) return;
const v = videoRef.current;
if (!v || v.paused) return;
const currentTime = v.currentTime;
if (currentTime === lastPlayTime.current && currentTime > 0) {
stallCount.current++;
if (stallCount.current >= STALL_THRESHOLD) {
console.warn(`Stream stalled for ${streamName}, reconnecting...`);
cleanup();
setError('Stream stalled');
scheduleReconnect();
}
} else {
stallCount.current = 0;
}
lastPlayTime.current = currentTime;
}, STALL_CHECK_INTERVAL);
};
const connectMSE = async () => {
try {
setIsConnecting(true);
@@ -77,20 +116,34 @@ export function useStream({ streamName, delayMs = 0, enabled = true }: UseStream
if (!videoRef.current) return;
await mse.connect(videoRef.current, onDisconnect);
// Poll for video readiness
// Poll for video readiness with timeout fallback to WebRTC
const checkReady = () => {
if (!mountedRef.current) return;
const v = videoRef.current;
if (v && v.readyState >= 2) {
clearTimeout(mseTimeout);
setIsConnecting(false);
setError(null);
reconnectDelay.current = RECONNECT_DELAY;
startStallDetection();
} else {
readyCheck = setTimeout(checkReady, 300);
}
};
readyCheck = setTimeout(checkReady, 300);
// Timeout: if MSE doesn't produce video, fall back to WebRTC
mseTimeout = setTimeout(() => {
if (!mountedRef.current) return;
const v = videoRef.current;
if (!v || v.readyState < 2) {
console.warn(`MSE timeout for ${streamName}, trying WebRTC...`);
clearTimeout(readyCheck);
cleanup();
connectWebRTC();
}
}, MSE_READY_TIMEOUT);
} catch (err) {
if (!mountedRef.current) return;
console.warn(`MSE failed for ${streamName}, trying WebRTC...`, err);
@@ -114,6 +167,7 @@ export function useStream({ streamName, delayMs = 0, enabled = true }: UseStream
setIsConnecting(false);
setError(null);
reconnectDelay.current = RECONNECT_DELAY;
startStallDetection();
}
},
onDisconnect,
@@ -137,6 +191,7 @@ export function useStream({ streamName, delayMs = 0, enabled = true }: UseStream
mountedRef.current = false;
clearTimeout(initTimer);
clearTimeout(readyCheck);
clearTimeout(mseTimeout);
clearTimeout(reconnectTimer.current);
cleanup();
};

View File

@@ -32,8 +32,6 @@ export class Go2RTCWebRTC {
}
};
this.pc.onicecandidate = () => {};
this.pc.onconnectionstatechange = () => {
const state = this.pc?.connectionState;
if (!this.disposed && (state === 'disconnected' || state === 'failed')) {
@@ -47,12 +45,15 @@ export class Go2RTCWebRTC {
const offer = await this.pc.createOffer();
await this.pc.setLocalDescription(offer);
// Wait for ICE gathering to complete (or timeout after 2s)
const localDesc = await this.waitForIceGathering(2000);
// Use backend proxy instead of direct go2rtc
const url = `/api/go2rtc/webrtc?src=${encodeURIComponent(this.streamName)}`;
const res = await fetch(url, {
method: 'POST',
headers: { 'Content-Type': 'application/sdp' },
body: offer.sdp,
body: localDesc.sdp,
});
if (!res.ok) {
@@ -63,12 +64,39 @@ export class Go2RTCWebRTC {
await this.pc.setRemoteDescription({ type: 'answer', sdp: answerSdp });
}
private waitForIceGathering(timeoutMs: number): Promise<RTCSessionDescription> {
return new Promise((resolve) => {
if (!this.pc) {
resolve(this.pc!.localDescription!);
return;
}
if (this.pc.iceGatheringState === 'complete') {
resolve(this.pc.localDescription!);
return;
}
const timeout = setTimeout(() => {
if (this.pc?.localDescription) {
resolve(this.pc.localDescription);
}
}, timeoutMs);
this.pc.onicegatheringstatechange = () => {
if (this.pc?.iceGatheringState === 'complete') {
clearTimeout(timeout);
resolve(this.pc.localDescription!);
}
};
});
}
private cleanup(): void {
this.mediaStream?.getTracks().forEach((t) => t.stop());
this.mediaStream = null;
if (this.pc) {
this.pc.ontrack = null;
this.pc.onicecandidate = null;
this.pc.onicegatheringstatechange = null;
this.pc.onconnectionstatechange = null;
this.pc.close();
this.pc = null;
@@ -87,6 +115,9 @@ export class Go2RTCWebRTC {
}
}
const MAX_BUFFER_SECONDS = 30;
const BUFFER_TRIM_SECONDS = 15;
export class Go2RTCMSE {
private mediaSource: MediaSource | null = null;
private sourceBuffer: SourceBuffer | null = null;
@@ -96,6 +127,7 @@ export class Go2RTCMSE {
private queue: ArrayBuffer[] = [];
private onDisconnectCb: (() => void) | null = null;
private disposed = false;
private bufferTrimInterval: ReturnType<typeof setInterval> | null = null;
constructor(streamName: string, _go2rtcUrl?: string) {
this.streamName = streamName;
@@ -145,10 +177,13 @@ export class Go2RTCMSE {
if (this.sourceBuffer.updating) {
this.queue.push(event.data);
} else {
this.sourceBuffer.appendBuffer(event.data);
this.safeAppend(event.data);
}
}
};
// Periodically trim buffer to prevent unbounded growth
this.bufferTrimInterval = setInterval(() => this.trimBuffer(), 10000);
}
private initSourceBuffer(codec: string): void {
@@ -158,7 +193,13 @@ export class Go2RTCMSE {
this.sourceBuffer.mode = 'segments';
this.sourceBuffer.addEventListener('updateend', () => {
if (this.queue.length > 0 && this.sourceBuffer && !this.sourceBuffer.updating) {
this.sourceBuffer.appendBuffer(this.queue.shift()!);
this.safeAppend(this.queue.shift()!);
}
});
this.sourceBuffer.addEventListener('error', () => {
console.error(`SourceBuffer error for ${this.streamName}`);
if (!this.disposed) {
this.onDisconnectCb?.();
}
});
} catch (e) {
@@ -166,8 +207,47 @@ export class Go2RTCMSE {
}
}
private safeAppend(data: ArrayBuffer): void {
try {
this.sourceBuffer!.appendBuffer(data);
} catch (e) {
if (e instanceof DOMException && e.name === 'QuotaExceededError') {
console.warn(`Buffer quota exceeded for ${this.streamName}, trimming...`);
this.queue.unshift(data);
this.trimBuffer();
} else {
console.error(`appendBuffer error for ${this.streamName}:`, e);
if (!this.disposed) {
this.onDisconnectCb?.();
}
}
}
}
private trimBuffer(): void {
if (!this.sourceBuffer || this.sourceBuffer.updating || !this.videoElement) return;
const buffered = this.sourceBuffer.buffered;
if (buffered.length === 0) return;
const currentTime = this.videoElement.currentTime;
const bufferStart = buffered.start(0);
const bufferedAmount = currentTime - bufferStart;
if (bufferedAmount > MAX_BUFFER_SECONDS) {
try {
this.sourceBuffer.remove(bufferStart, currentTime - BUFFER_TRIM_SECONDS);
} catch {
// remove() can throw if already updating
}
}
}
disconnect(): void {
this.disposed = true;
if (this.bufferTrimInterval) {
clearInterval(this.bufferTrimInterval);
this.bufferTrimInterval = null;
}
this.ws?.close();
this.ws = null;
if (this.mediaSource?.readyState === 'open') {