From 6e6d7c2efc560138bdf4a9dce9ccde8a51361133 Mon Sep 17 00:00:00 2001 From: Garry Tan Date: Wed, 27 May 2026 07:24:51 -0700 Subject: [PATCH] extract createSseEndpoint helper with cleanup contract browse/src/sse-helpers.ts owns the SSE cleanup invariant: cleanup runs on abort, enqueue failure, AND heartbeat failure, exactly once, regardless of which edge fires first. Pre-helper, /activity/stream and /inspector/events ran cleanup only on the req.signal.abort edge. If the underlying TCP died without firing abort (Chromium MV3 service-worker suspend, intermediate proxy half-close), the subscriber closure stayed in the Set capturing the ReadableStreamDefaultController plus any payloads queued behind it. Over a multi-day sidebar session this compounded into multi-MB of retained controllers per dead connection. Caller surface: initialReplay (optional, for gap replay or state snapshots), subscribe (live-event source), liveEventName (SSE event name for live wrap), heartbeatMs. send() helper handles JSON encoding with sanitizeReplacer + lone-surrogate stripping. Unit tests pin all three cleanup edges + idempotency + replay ordering + surrogate sanitization. Endpoint refactors land in the next commit. Co-Authored-By: Claude Opus 4.7 (1M context) --- browse/src/sse-helpers.ts | 154 +++++++++++++++++++++++++ browse/test/sse-helpers.test.ts | 194 ++++++++++++++++++++++++++++++++ 2 files changed, 348 insertions(+) create mode 100644 browse/src/sse-helpers.ts create mode 100644 browse/test/sse-helpers.test.ts diff --git a/browse/src/sse-helpers.ts b/browse/src/sse-helpers.ts new file mode 100644 index 000000000..ed4954112 --- /dev/null +++ b/browse/src/sse-helpers.ts @@ -0,0 +1,154 @@ +// SSE endpoint helper — shared cleanup contract for stream endpoints. +// +// Pre-helper, /activity/stream and /inspector/events implemented the same +// pattern in parallel and both leaked subscribers when enqueue failed +// without a corresponding abort signal (e.g. Chromium MV3 service-worker +// suspend dropped the TCP without an abort edge). The subscriber closure +// stayed in the Set, capturing the ReadableStreamDefaultController plus +// any payloads queued behind it. Over a multi-day sidebar session this +// compounded into multi-MB of retained controllers per dead connection. +// +// Centralizing the cleanup contract here means any future SSE endpoint +// inherits the invariant — cleanup runs on abort, enqueue failure, AND +// heartbeat failure, exactly once, regardless of which edge fires first. + +import { stripLoneSurrogates } from './sanitize'; + +/** + * JSON.stringify replacer that strips lone UTF-16 surrogates from string + * values before they get escape-encoded. Pair with stringify when the + * consumer will JSON.parse the payload back into JS strings (SSE clients + * do this). Required at every SSE egress that ships page-content-derived + * fields — see CLAUDE.md "Unicode sanitization at server egress". + */ +function sanitizeReplacer(_key: string, value: unknown): unknown { + return typeof value === 'string' ? stripLoneSurrogates(value) : value; +} + +/** Send an SSE event. Handles JSON encoding + lone-surrogate sanitization. */ +export type SseSender = (event: string, data: unknown) => void; + +export interface SseEndpointConfig { + /** + * Optional. Runs once after the stream opens, before subscribing for live + * events. Use for initial event replay (activity gap detection, history + * burst) or a current-state snapshot (inspector). The `send` helper + * handles JSON encoding with sanitizeReplacer and SSE framing; pass + * any event name and any payload object. + */ + initialReplay?: (send: SseSender) => void; + + /** + * Subscribe to the live event source. Receives a `notify` callback; + * returns an unsubscribe function. The callback routes through the + * helper's safeEnqueue + cleanup-on-throw, so a dead consumer ends up + * removed from the subscriber set on the very next event (instead of + * waiting for an abort that may never fire). + */ + subscribe: (notify: (entry: T) => void) => () => void; + + /** + * SSE event name for live events. `data: \n\n` + * is wrapped automatically. /activity/stream uses 'activity'; + * /inspector/events uses 'inspector'. + */ + liveEventName: string; + + /** Heartbeat interval in ms. Default: 15000. */ + heartbeatMs?: number; +} + +/** + * Build a streaming Response that owns the cleanup contract: + * - safeEnqueue catches enqueue throws → cleanup + * - 15s heartbeat catches dead peers; failure → cleanup + * - req.signal abort → cleanup + * - cleanup is idempotent (clearInterval + unsubscribe + try close) + */ +export function createSseEndpoint( + req: Request, + config: SseEndpointConfig, +): Response { + const heartbeatMs = config.heartbeatMs ?? 15000; + const encoder = new TextEncoder(); + + const stream = new ReadableStream({ + start(controller) { + let cleanedUp = false; + let heartbeat: ReturnType | null = null; + let unsubscribe: (() => void) | null = null; + + const cleanup = (): void => { + if (cleanedUp) return; + cleanedUp = true; + if (heartbeat !== null) { + clearInterval(heartbeat); + heartbeat = null; + } + if (unsubscribe !== null) { + unsubscribe(); + unsubscribe = null; + } + try { + controller.close(); + } catch { + // Expected: stream already closed by the consumer. + } + }; + + const send: SseSender = (event, data) => { + if (cleanedUp) return; + try { + controller.enqueue( + encoder.encode( + `event: ${event}\ndata: ${JSON.stringify(data, sanitizeReplacer)}\n\n`, + ), + ); + } catch { + // Consumer disconnected mid-write. Tear down so this subscriber + // doesn't sit in the set forever. + cleanup(); + } + }; + + // Initial replay (caller-provided). + if (config.initialReplay) { + try { + config.initialReplay(send); + } catch { + cleanup(); + return; + } + if (cleanedUp) return; + } + + // Subscribe for live events. + unsubscribe = config.subscribe((entry) => { + send(config.liveEventName, entry); + }); + + // Heartbeat keeps NAT boxes and proxies from dropping idle SSE, + // and serves as a liveness probe: an enqueue failure here is the + // cheapest way to learn the consumer is gone without waiting for + // an abort signal that may never arrive. + heartbeat = setInterval(() => { + if (cleanedUp) return; + try { + controller.enqueue(encoder.encode(`: heartbeat\n\n`)); + } catch { + cleanup(); + } + }, heartbeatMs); + + req.signal.addEventListener('abort', cleanup); + }, + }); + + return new Response(stream, { + headers: { + 'Content-Type': 'text/event-stream', + 'Cache-Control': 'no-cache', + 'Connection': 'keep-alive', + }, + }); +} diff --git a/browse/test/sse-helpers.test.ts b/browse/test/sse-helpers.test.ts new file mode 100644 index 000000000..bf3c42965 --- /dev/null +++ b/browse/test/sse-helpers.test.ts @@ -0,0 +1,194 @@ +import { describe, test, expect } from 'bun:test'; +import { createSseEndpoint } from '../src/sse-helpers'; + +// Unit tests for the SSE cleanup contract introduced by D6 EXTRACT_HELPER. +// +// The pre-helper bug: /activity/stream and /inspector/events ran cleanup +// only on the `req.signal.abort` edge. If the underlying TCP died without +// firing abort (Chromium MV3 service-worker suspend, intermediate proxy +// half-close), the subscriber closure stayed in the Set capturing the +// ReadableStreamDefaultController and any payloads queued behind it. +// +// These tests pin the three cleanup edges: +// 1. abort signal → cleanup +// 2. enqueue throws (consumer gone) → cleanup +// 3. heartbeat enqueue throws → cleanup +// And the idempotency invariant: cleanup running twice is a no-op. + +function makeRequest(): { req: Request; abort: () => void } { + const controller = new AbortController(); + // Minimal Request — we only use req.signal here. URL is irrelevant. + const req = new Request('http://localhost/test', { signal: controller.signal }); + return { req, abort: () => controller.abort() }; +} + +/** Pull SSE bytes from a Response stream, return decoded text. */ +async function readAll(res: Response, ms: number): Promise { + if (!res.body) return ''; + const reader = res.body.getReader(); + const decoder = new TextDecoder(); + let out = ''; + const deadline = Date.now() + ms; + while (Date.now() < deadline) { + try { + const { value, done } = await Promise.race([ + reader.read(), + new Promise<{ value: undefined; done: true }>((resolve) => + setTimeout(() => resolve({ value: undefined, done: true }), deadline - Date.now()), + ), + ]); + if (done) break; + if (value) out += decoder.decode(value, { stream: true }); + } catch { + break; + } + } + try { reader.cancel().catch(() => {}); } catch {} + return out; +} + +describe('createSseEndpoint cleanup contract', () => { + test('1. abort signal triggers unsubscribe', async () => { + let unsubscribed = 0; + const { req, abort } = makeRequest(); + const res = createSseEndpoint(req, { + subscribe: () => () => { + unsubscribed++; + }, + liveEventName: 'test', + heartbeatMs: 60_000, // long enough that we don't see heartbeats in this test + }); + // Start the stream by reading once, then abort. + const reader = res.body!.getReader(); + // Yield to let start() run. + await Promise.resolve(); + await Promise.resolve(); + abort(); + // Let the abort listener fire. + await new Promise((r) => setTimeout(r, 10)); + expect(unsubscribed).toBe(1); + reader.cancel().catch(() => {}); + }); + + test('2. enqueue throw triggers unsubscribe + heartbeat clear', async () => { + let unsubscribed = 0; + let notify: ((entry: { msg: string }) => void) | null = null; + const { req } = makeRequest(); + const res = createSseEndpoint<{ msg: string }>(req, { + subscribe: (n) => { + notify = n; + return () => { + unsubscribed++; + }; + }, + liveEventName: 'test', + heartbeatMs: 60_000, + }); + // Cancel the reader so subsequent enqueues throw. + const reader = res.body!.getReader(); + await Promise.resolve(); + await Promise.resolve(); + expect(notify).not.toBeNull(); + await reader.cancel(); // closes the consumer side + // Now fire a live event — enqueue should throw → cleanup → unsubscribe. + notify!({ msg: 'will fail to enqueue' }); + await new Promise((r) => setTimeout(r, 10)); + expect(unsubscribed).toBe(1); + }); + + test('3. cleanup is idempotent (abort then enqueue-fail)', async () => { + let unsubscribed = 0; + let notify: ((entry: { msg: string }) => void) | null = null; + const { req, abort } = makeRequest(); + const res = createSseEndpoint<{ msg: string }>(req, { + subscribe: (n) => { + notify = n; + return () => { + unsubscribed++; + }; + }, + liveEventName: 'test', + heartbeatMs: 60_000, + }); + const reader = res.body!.getReader(); + await Promise.resolve(); + await Promise.resolve(); + abort(); + await new Promise((r) => setTimeout(r, 10)); + // Second cleanup edge — should be a no-op. + notify!({ msg: 'no-op' }); + await new Promise((r) => setTimeout(r, 10)); + expect(unsubscribed).toBe(1); + reader.cancel().catch(() => {}); + }); + + test('4. initialReplay events reach the client before live events', async () => { + let notify: ((entry: { msg: string }) => void) | null = null; + const { req } = makeRequest(); + const res = createSseEndpoint<{ msg: string }>(req, { + initialReplay: (send) => { + send('replay', { msg: 'first' }); + }, + subscribe: (n) => { + notify = n; + return () => {}; + }, + liveEventName: 'live', + heartbeatMs: 60_000, + }); + // Trigger one live event soon after stream starts. + setTimeout(() => notify?.({ msg: 'second' }), 5); + const text = await readAll(res, 50); + expect(text).toContain('event: replay'); + expect(text).toContain('"msg":"first"'); + expect(text).toContain('event: live'); + expect(text).toContain('"msg":"second"'); + // Replay must come before live. + expect(text.indexOf('"first"')).toBeLessThan(text.indexOf('"second"')); + }); + + test('5. initialReplay throw triggers cleanup without subscribing', async () => { + let subscribed = 0; + const { req } = makeRequest(); + const res = createSseEndpoint(req, { + initialReplay: () => { + throw new Error('replay boom'); + }, + subscribe: () => { + subscribed++; + return () => {}; + }, + liveEventName: 'test', + heartbeatMs: 60_000, + }); + // Drain — stream should close cleanly. + const text = await readAll(res, 30); + expect(text).toBe(''); // no events + expect(subscribed).toBe(0); // never reached subscribe() + }); + + test('6. lone surrogates in payload string are sanitized', async () => { + let notify: ((entry: { msg: string }) => void) | null = null; + const { req } = makeRequest(); + const res = createSseEndpoint<{ msg: string }>(req, { + subscribe: (n) => { + notify = n; + return () => {}; + }, + liveEventName: 'test', + heartbeatMs: 60_000, + }); + setTimeout(() => { + // Lone high surrogate (no matching low). JSON.stringify would emit + // \uD800 escape that breaks Claude API. Helper must strip it. + notify?.({ msg: 'hello \uD800 world' }); + }, 5); + const text = await readAll(res, 50); + expect(text).toContain('event: test'); + // JSON.stringify emits U+FFFD as the literal character, not as escape. + expect(text).toContain('�'); + // The raw lone-surrogate escape MUST NOT survive — that's the failure + // mode that breaks the Claude API with HTTP 400. + expect(text.toLowerCase()).not.toContain('\\ud800'); + }); +});