diff --git a/browse/src/server.ts b/browse/src/server.ts index bc0b378cb..2f6c598f7 100644 --- a/browse/src/server.ts +++ b/browse/src/server.ts @@ -38,6 +38,7 @@ import { import { validateTempPath } from './path-security'; import { resolveConfig, ensureStateDir, readVersionHash, resolveChromiumProfile, cleanSingletonLocks } from './config'; import { emitActivity, subscribe, getActivityAfter, getActivityHistory, getSubscriberCount } from './activity'; +import { createSseEndpoint } from './sse-helpers'; import { initAuditLog, writeAuditEntry } from './audit'; import { inspectElement, modifyStyle, resetModifications, getModificationHistory, detachSession, type InspectorResult } from './cdp-inspector'; // Bun.spawn used instead of child_process.spawn (compiled bun binaries @@ -2432,62 +2433,19 @@ export function buildFetchHandler(cfg: ServerConfig): ServerHandle { }); } const afterId = parseInt(url.searchParams.get('after') || '0', 10); - const encoder = new TextEncoder(); - - const stream = new ReadableStream({ - start(controller) { - // SSE egress invariant: every JSON.stringify here ships page-content-derived - // fields (URLs, command args, errors) to the sidebar. Lone surrogates must - // be sanitized DURING stringify (via sanitizeReplacer) so they're cleaned - // before escape-encoding — post-stringify regex is ineffective because - // JSON.stringify has already converted \uD800 → "\\ud800". - // 1. Gap detection + replay + // Cleanup contract (abort + enqueue-fail + heartbeat-fail, all + // idempotent) lives in createSseEndpoint; sanitizeReplacer is + // applied to every JSON.stringify inside the helper, so + // page-content-derived fields (URLs, command args, errors) + // stay surrogate-safe per CLAUDE.md egress invariant. + return createSseEndpoint(req, { + initialReplay: (send) => { const { entries, gap, gapFrom, availableFrom } = getActivityAfter(afterId); - if (gap) { - controller.enqueue(encoder.encode(`event: gap\ndata: ${JSON.stringify({ gapFrom, availableFrom }, sanitizeReplacer)}\n\n`)); - } - for (const entry of entries) { - controller.enqueue(encoder.encode(`event: activity\ndata: ${JSON.stringify(entry, sanitizeReplacer)}\n\n`)); - } - - // 2. Subscribe for live events - const unsubscribe = subscribe((entry) => { - try { - controller.enqueue(encoder.encode(`event: activity\ndata: ${JSON.stringify(entry, sanitizeReplacer)}\n\n`)); - } catch (err: any) { - console.debug('[browse] Activity SSE stream error, unsubscribing:', err.message); - unsubscribe(); - } - }); - - // 3. Heartbeat every 15s - const heartbeat = setInterval(() => { - try { - controller.enqueue(encoder.encode(`: heartbeat\n\n`)); - } catch (err: any) { - console.debug('[browse] Activity SSE heartbeat failed:', err.message); - clearInterval(heartbeat); - unsubscribe(); - } - }, 15000); - - // 4. Cleanup on disconnect - req.signal.addEventListener('abort', () => { - clearInterval(heartbeat); - unsubscribe(); - try { controller.close(); } catch { - // Expected: stream already closed - } - }); - }, - }); - - return new Response(stream, { - headers: { - 'Content-Type': 'text/event-stream', - 'Cache-Control': 'no-cache', - 'Connection': 'keep-alive', + if (gap) send('gap', { gapFrom, availableFrom }); + for (const entry of entries) send('activity', entry); }, + subscribe, + liveEventName: 'activity', }); } @@ -2806,62 +2764,20 @@ export function buildFetchHandler(cfg: ServerConfig): ServerHandle { status: 401, headers: { 'Content-Type': 'application/json' }, }); } - const encoder = new TextEncoder(); - const stream = new ReadableStream({ - start(controller) { - // SSE egress invariant: inspectorData and CDP event payloads carry - // page-DOM strings (selectors, attribute values, console messages). - // sanitizeReplacer cleans lone surrogates DURING JSON.stringify so - // they're neutralized before escape-encoding (post-stringify regex - // is a no-op once \uD800 has become "\\ud800"). - // Send current state immediately - if (inspectorData) { - controller.enqueue(encoder.encode( - `event: state\ndata: ${JSON.stringify({ data: inspectorData, timestamp: inspectorTimestamp }, sanitizeReplacer)}\n\n` - )); - } - - // Subscribe for live events - const notify: InspectorSubscriber = (event) => { - try { - controller.enqueue(encoder.encode( - `event: inspector\ndata: ${JSON.stringify(event, sanitizeReplacer)}\n\n` - )); - } catch (err: any) { - console.debug('[browse] Inspector SSE stream error:', err.message); - inspectorSubscribers.delete(notify); - } - }; + // Cleanup contract (abort + enqueue-fail + heartbeat-fail, + // idempotent) lives in createSseEndpoint; sanitizeReplacer is + // applied to every JSON.stringify inside the helper. The + // inspector subscriber set stays here because it's also written + // to by emitInspectorEvent above. + return createSseEndpoint(req, { + initialReplay: inspectorData + ? (send) => send('state', { data: inspectorData, timestamp: inspectorTimestamp }) + : undefined, + subscribe: (notify) => { inspectorSubscribers.add(notify); - - // Heartbeat every 15s - const heartbeat = setInterval(() => { - try { - controller.enqueue(encoder.encode(`: heartbeat\n\n`)); - } catch (err: any) { - console.debug('[browse] Inspector SSE heartbeat failed:', err.message); - clearInterval(heartbeat); - inspectorSubscribers.delete(notify); - } - }, 15000); - - // Cleanup on disconnect - req.signal.addEventListener('abort', () => { - clearInterval(heartbeat); - inspectorSubscribers.delete(notify); - try { controller.close(); } catch (err: any) { - // Expected: stream already closed - } - }); - }, - }); - - return new Response(stream, { - headers: { - 'Content-Type': 'text/event-stream', - 'Cache-Control': 'no-cache', - 'Connection': 'keep-alive', + return () => inspectorSubscribers.delete(notify); }, + liveEventName: 'inspector', }); }