route /activity/stream + /inspector/events through createSseEndpoint

Both endpoints collapse from ~45 lines of in-line ReadableStream wiring
to ~8 lines of helper config. Behavior preserved bit-for-bit by the
new sse-helpers tests:
  - initial replay (activity gap + history, inspector state snapshot)
  - live event subscription
  - 15s heartbeat
  - SSE framing
  - sanitizeReplacer applied to every JSON.stringify

The leak fix is the cleanup contract: pre-refactor, both endpoints ran
cleanup only on req.signal.abort. If TCP died without firing abort
(Chromium MV3 SW suspend, intermediate proxy half-close), the
subscriber closure stayed in the Set forever capturing the
ReadableStreamDefaultController + queued payloads. Post-refactor, an
enqueue-failure or heartbeat-failure on a dead consumer triggers the
same idempotent cleanup as abort would.

Net: -83 / +15 in server.ts.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Garry Tan 2026-05-27 07:25:34 -07:00
parent 6e6d7c2efc
commit 336cb32434
No known key found for this signature in database
GPG Key ID: C1F69E85C74EFE1D
1 changed files with 24 additions and 108 deletions

View File

@ -38,6 +38,7 @@ import {
import { validateTempPath } from './path-security';
import { resolveConfig, ensureStateDir, readVersionHash, resolveChromiumProfile, cleanSingletonLocks } from './config';
import { emitActivity, subscribe, getActivityAfter, getActivityHistory, getSubscriberCount } from './activity';
import { createSseEndpoint } from './sse-helpers';
import { initAuditLog, writeAuditEntry } from './audit';
import { inspectElement, modifyStyle, resetModifications, getModificationHistory, detachSession, type InspectorResult } from './cdp-inspector';
// Bun.spawn used instead of child_process.spawn (compiled bun binaries
@ -2432,62 +2433,19 @@ export function buildFetchHandler(cfg: ServerConfig): ServerHandle {
});
}
const afterId = parseInt(url.searchParams.get('after') || '0', 10);
const encoder = new TextEncoder();
const stream = new ReadableStream({
start(controller) {
// SSE egress invariant: every JSON.stringify here ships page-content-derived
// fields (URLs, command args, errors) to the sidebar. Lone surrogates must
// be sanitized DURING stringify (via sanitizeReplacer) so they're cleaned
// before escape-encoding — post-stringify regex is ineffective because
// JSON.stringify has already converted \uD800 → "\\ud800".
// 1. Gap detection + replay
// Cleanup contract (abort + enqueue-fail + heartbeat-fail, all
// idempotent) lives in createSseEndpoint; sanitizeReplacer is
// applied to every JSON.stringify inside the helper, so
// page-content-derived fields (URLs, command args, errors)
// stay surrogate-safe per CLAUDE.md egress invariant.
return createSseEndpoint(req, {
initialReplay: (send) => {
const { entries, gap, gapFrom, availableFrom } = getActivityAfter(afterId);
if (gap) {
controller.enqueue(encoder.encode(`event: gap\ndata: ${JSON.stringify({ gapFrom, availableFrom }, sanitizeReplacer)}\n\n`));
}
for (const entry of entries) {
controller.enqueue(encoder.encode(`event: activity\ndata: ${JSON.stringify(entry, sanitizeReplacer)}\n\n`));
}
// 2. Subscribe for live events
const unsubscribe = subscribe((entry) => {
try {
controller.enqueue(encoder.encode(`event: activity\ndata: ${JSON.stringify(entry, sanitizeReplacer)}\n\n`));
} catch (err: any) {
console.debug('[browse] Activity SSE stream error, unsubscribing:', err.message);
unsubscribe();
}
});
// 3. Heartbeat every 15s
const heartbeat = setInterval(() => {
try {
controller.enqueue(encoder.encode(`: heartbeat\n\n`));
} catch (err: any) {
console.debug('[browse] Activity SSE heartbeat failed:', err.message);
clearInterval(heartbeat);
unsubscribe();
}
}, 15000);
// 4. Cleanup on disconnect
req.signal.addEventListener('abort', () => {
clearInterval(heartbeat);
unsubscribe();
try { controller.close(); } catch {
// Expected: stream already closed
}
});
},
});
return new Response(stream, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
if (gap) send('gap', { gapFrom, availableFrom });
for (const entry of entries) send('activity', entry);
},
subscribe,
liveEventName: 'activity',
});
}
@ -2806,62 +2764,20 @@ export function buildFetchHandler(cfg: ServerConfig): ServerHandle {
status: 401, headers: { 'Content-Type': 'application/json' },
});
}
const encoder = new TextEncoder();
const stream = new ReadableStream({
start(controller) {
// SSE egress invariant: inspectorData and CDP event payloads carry
// page-DOM strings (selectors, attribute values, console messages).
// sanitizeReplacer cleans lone surrogates DURING JSON.stringify so
// they're neutralized before escape-encoding (post-stringify regex
// is a no-op once \uD800 has become "\\ud800").
// Send current state immediately
if (inspectorData) {
controller.enqueue(encoder.encode(
`event: state\ndata: ${JSON.stringify({ data: inspectorData, timestamp: inspectorTimestamp }, sanitizeReplacer)}\n\n`
));
}
// Subscribe for live events
const notify: InspectorSubscriber = (event) => {
try {
controller.enqueue(encoder.encode(
`event: inspector\ndata: ${JSON.stringify(event, sanitizeReplacer)}\n\n`
));
} catch (err: any) {
console.debug('[browse] Inspector SSE stream error:', err.message);
inspectorSubscribers.delete(notify);
}
};
// Cleanup contract (abort + enqueue-fail + heartbeat-fail,
// idempotent) lives in createSseEndpoint; sanitizeReplacer is
// applied to every JSON.stringify inside the helper. The
// inspector subscriber set stays here because it's also written
// to by emitInspectorEvent above.
return createSseEndpoint(req, {
initialReplay: inspectorData
? (send) => send('state', { data: inspectorData, timestamp: inspectorTimestamp })
: undefined,
subscribe: (notify) => {
inspectorSubscribers.add(notify);
// Heartbeat every 15s
const heartbeat = setInterval(() => {
try {
controller.enqueue(encoder.encode(`: heartbeat\n\n`));
} catch (err: any) {
console.debug('[browse] Inspector SSE heartbeat failed:', err.message);
clearInterval(heartbeat);
inspectorSubscribers.delete(notify);
}
}, 15000);
// Cleanup on disconnect
req.signal.addEventListener('abort', () => {
clearInterval(heartbeat);
inspectorSubscribers.delete(notify);
try { controller.close(); } catch (err: any) {
// Expected: stream already closed
}
});
},
});
return new Response(stream, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
return () => inspectorSubscribers.delete(notify);
},
liveEventName: 'inspector',
});
}