mirror of https://github.com/garrytan/gstack.git
extract createSseEndpoint helper with cleanup contract
browse/src/sse-helpers.ts owns the SSE cleanup invariant: cleanup runs on abort, enqueue failure, AND heartbeat failure, exactly once, regardless of which edge fires first. Pre-helper, /activity/stream and /inspector/events ran cleanup only on the req.signal.abort edge. If the underlying TCP died without firing abort (Chromium MV3 service-worker suspend, intermediate proxy half-close), the subscriber closure stayed in the Set capturing the ReadableStreamDefaultController plus any payloads queued behind it. Over a multi-day sidebar session this compounded into multi-MB of retained controllers per dead connection. Caller surface: initialReplay (optional, for gap replay or state snapshots), subscribe (live-event source), liveEventName (SSE event name for live wrap), heartbeatMs. send() helper handles JSON encoding with sanitizeReplacer + lone-surrogate stripping. Unit tests pin all three cleanup edges + idempotency + replay ordering + surrogate sanitization. Endpoint refactors land in the next commit. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
efc0c0c64e
commit
6e6d7c2efc
|
|
@ -0,0 +1,154 @@
|
||||||
|
// SSE endpoint helper — shared cleanup contract for stream endpoints.
|
||||||
|
//
|
||||||
|
// Pre-helper, /activity/stream and /inspector/events implemented the same
|
||||||
|
// pattern in parallel and both leaked subscribers when enqueue failed
|
||||||
|
// without a corresponding abort signal (e.g. Chromium MV3 service-worker
|
||||||
|
// suspend dropped the TCP without an abort edge). The subscriber closure
|
||||||
|
// stayed in the Set, capturing the ReadableStreamDefaultController plus
|
||||||
|
// any payloads queued behind it. Over a multi-day sidebar session this
|
||||||
|
// compounded into multi-MB of retained controllers per dead connection.
|
||||||
|
//
|
||||||
|
// Centralizing the cleanup contract here means any future SSE endpoint
|
||||||
|
// inherits the invariant — cleanup runs on abort, enqueue failure, AND
|
||||||
|
// heartbeat failure, exactly once, regardless of which edge fires first.
|
||||||
|
|
||||||
|
import { stripLoneSurrogates } from './sanitize';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* JSON.stringify replacer that strips lone UTF-16 surrogates from string
|
||||||
|
* values before they get escape-encoded. Pair with stringify when the
|
||||||
|
* consumer will JSON.parse the payload back into JS strings (SSE clients
|
||||||
|
* do this). Required at every SSE egress that ships page-content-derived
|
||||||
|
* fields — see CLAUDE.md "Unicode sanitization at server egress".
|
||||||
|
*/
|
||||||
|
function sanitizeReplacer(_key: string, value: unknown): unknown {
|
||||||
|
return typeof value === 'string' ? stripLoneSurrogates(value) : value;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Send an SSE event. Handles JSON encoding + lone-surrogate sanitization. */
|
||||||
|
export type SseSender = (event: string, data: unknown) => void;
|
||||||
|
|
||||||
|
export interface SseEndpointConfig<T> {
|
||||||
|
/**
|
||||||
|
* Optional. Runs once after the stream opens, before subscribing for live
|
||||||
|
* events. Use for initial event replay (activity gap detection, history
|
||||||
|
* burst) or a current-state snapshot (inspector). The `send` helper
|
||||||
|
* handles JSON encoding with sanitizeReplacer and SSE framing; pass
|
||||||
|
* any event name and any payload object.
|
||||||
|
*/
|
||||||
|
initialReplay?: (send: SseSender) => void;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Subscribe to the live event source. Receives a `notify` callback;
|
||||||
|
* returns an unsubscribe function. The callback routes through the
|
||||||
|
* helper's safeEnqueue + cleanup-on-throw, so a dead consumer ends up
|
||||||
|
* removed from the subscriber set on the very next event (instead of
|
||||||
|
* waiting for an abort that may never fire).
|
||||||
|
*/
|
||||||
|
subscribe: (notify: (entry: T) => void) => () => void;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* SSE event name for live events. `data: <JSON.stringify(entry)>\n\n`
|
||||||
|
* is wrapped automatically. /activity/stream uses 'activity';
|
||||||
|
* /inspector/events uses 'inspector'.
|
||||||
|
*/
|
||||||
|
liveEventName: string;
|
||||||
|
|
||||||
|
/** Heartbeat interval in ms. Default: 15000. */
|
||||||
|
heartbeatMs?: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Build a streaming Response that owns the cleanup contract:
|
||||||
|
* - safeEnqueue catches enqueue throws → cleanup
|
||||||
|
* - 15s heartbeat catches dead peers; failure → cleanup
|
||||||
|
* - req.signal abort → cleanup
|
||||||
|
* - cleanup is idempotent (clearInterval + unsubscribe + try close)
|
||||||
|
*/
|
||||||
|
export function createSseEndpoint<T>(
|
||||||
|
req: Request,
|
||||||
|
config: SseEndpointConfig<T>,
|
||||||
|
): Response {
|
||||||
|
const heartbeatMs = config.heartbeatMs ?? 15000;
|
||||||
|
const encoder = new TextEncoder();
|
||||||
|
|
||||||
|
const stream = new ReadableStream({
|
||||||
|
start(controller) {
|
||||||
|
let cleanedUp = false;
|
||||||
|
let heartbeat: ReturnType<typeof setInterval> | null = null;
|
||||||
|
let unsubscribe: (() => void) | null = null;
|
||||||
|
|
||||||
|
const cleanup = (): void => {
|
||||||
|
if (cleanedUp) return;
|
||||||
|
cleanedUp = true;
|
||||||
|
if (heartbeat !== null) {
|
||||||
|
clearInterval(heartbeat);
|
||||||
|
heartbeat = null;
|
||||||
|
}
|
||||||
|
if (unsubscribe !== null) {
|
||||||
|
unsubscribe();
|
||||||
|
unsubscribe = null;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
controller.close();
|
||||||
|
} catch {
|
||||||
|
// Expected: stream already closed by the consumer.
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
const send: SseSender = (event, data) => {
|
||||||
|
if (cleanedUp) return;
|
||||||
|
try {
|
||||||
|
controller.enqueue(
|
||||||
|
encoder.encode(
|
||||||
|
`event: ${event}\ndata: ${JSON.stringify(data, sanitizeReplacer)}\n\n`,
|
||||||
|
),
|
||||||
|
);
|
||||||
|
} catch {
|
||||||
|
// Consumer disconnected mid-write. Tear down so this subscriber
|
||||||
|
// doesn't sit in the set forever.
|
||||||
|
cleanup();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Initial replay (caller-provided).
|
||||||
|
if (config.initialReplay) {
|
||||||
|
try {
|
||||||
|
config.initialReplay(send);
|
||||||
|
} catch {
|
||||||
|
cleanup();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (cleanedUp) return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Subscribe for live events.
|
||||||
|
unsubscribe = config.subscribe((entry) => {
|
||||||
|
send(config.liveEventName, entry);
|
||||||
|
});
|
||||||
|
|
||||||
|
// Heartbeat keeps NAT boxes and proxies from dropping idle SSE,
|
||||||
|
// and serves as a liveness probe: an enqueue failure here is the
|
||||||
|
// cheapest way to learn the consumer is gone without waiting for
|
||||||
|
// an abort signal that may never arrive.
|
||||||
|
heartbeat = setInterval(() => {
|
||||||
|
if (cleanedUp) return;
|
||||||
|
try {
|
||||||
|
controller.enqueue(encoder.encode(`: heartbeat\n\n`));
|
||||||
|
} catch {
|
||||||
|
cleanup();
|
||||||
|
}
|
||||||
|
}, heartbeatMs);
|
||||||
|
|
||||||
|
req.signal.addEventListener('abort', cleanup);
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
return new Response(stream, {
|
||||||
|
headers: {
|
||||||
|
'Content-Type': 'text/event-stream',
|
||||||
|
'Cache-Control': 'no-cache',
|
||||||
|
'Connection': 'keep-alive',
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,194 @@
|
||||||
|
import { describe, test, expect } from 'bun:test';
|
||||||
|
import { createSseEndpoint } from '../src/sse-helpers';
|
||||||
|
|
||||||
|
// Unit tests for the SSE cleanup contract introduced by D6 EXTRACT_HELPER.
|
||||||
|
//
|
||||||
|
// The pre-helper bug: /activity/stream and /inspector/events ran cleanup
|
||||||
|
// only on the `req.signal.abort` edge. If the underlying TCP died without
|
||||||
|
// firing abort (Chromium MV3 service-worker suspend, intermediate proxy
|
||||||
|
// half-close), the subscriber closure stayed in the Set capturing the
|
||||||
|
// ReadableStreamDefaultController and any payloads queued behind it.
|
||||||
|
//
|
||||||
|
// These tests pin the three cleanup edges:
|
||||||
|
// 1. abort signal → cleanup
|
||||||
|
// 2. enqueue throws (consumer gone) → cleanup
|
||||||
|
// 3. heartbeat enqueue throws → cleanup
|
||||||
|
// And the idempotency invariant: cleanup running twice is a no-op.
|
||||||
|
|
||||||
|
function makeRequest(): { req: Request; abort: () => void } {
|
||||||
|
const controller = new AbortController();
|
||||||
|
// Minimal Request — we only use req.signal here. URL is irrelevant.
|
||||||
|
const req = new Request('http://localhost/test', { signal: controller.signal });
|
||||||
|
return { req, abort: () => controller.abort() };
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Pull SSE bytes from a Response stream, return decoded text. */
|
||||||
|
async function readAll(res: Response, ms: number): Promise<string> {
|
||||||
|
if (!res.body) return '';
|
||||||
|
const reader = res.body.getReader();
|
||||||
|
const decoder = new TextDecoder();
|
||||||
|
let out = '';
|
||||||
|
const deadline = Date.now() + ms;
|
||||||
|
while (Date.now() < deadline) {
|
||||||
|
try {
|
||||||
|
const { value, done } = await Promise.race([
|
||||||
|
reader.read(),
|
||||||
|
new Promise<{ value: undefined; done: true }>((resolve) =>
|
||||||
|
setTimeout(() => resolve({ value: undefined, done: true }), deadline - Date.now()),
|
||||||
|
),
|
||||||
|
]);
|
||||||
|
if (done) break;
|
||||||
|
if (value) out += decoder.decode(value, { stream: true });
|
||||||
|
} catch {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
try { reader.cancel().catch(() => {}); } catch {}
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
|
||||||
|
describe('createSseEndpoint cleanup contract', () => {
|
||||||
|
test('1. abort signal triggers unsubscribe', async () => {
|
||||||
|
let unsubscribed = 0;
|
||||||
|
const { req, abort } = makeRequest();
|
||||||
|
const res = createSseEndpoint(req, {
|
||||||
|
subscribe: () => () => {
|
||||||
|
unsubscribed++;
|
||||||
|
},
|
||||||
|
liveEventName: 'test',
|
||||||
|
heartbeatMs: 60_000, // long enough that we don't see heartbeats in this test
|
||||||
|
});
|
||||||
|
// Start the stream by reading once, then abort.
|
||||||
|
const reader = res.body!.getReader();
|
||||||
|
// Yield to let start() run.
|
||||||
|
await Promise.resolve();
|
||||||
|
await Promise.resolve();
|
||||||
|
abort();
|
||||||
|
// Let the abort listener fire.
|
||||||
|
await new Promise((r) => setTimeout(r, 10));
|
||||||
|
expect(unsubscribed).toBe(1);
|
||||||
|
reader.cancel().catch(() => {});
|
||||||
|
});
|
||||||
|
|
||||||
|
test('2. enqueue throw triggers unsubscribe + heartbeat clear', async () => {
|
||||||
|
let unsubscribed = 0;
|
||||||
|
let notify: ((entry: { msg: string }) => void) | null = null;
|
||||||
|
const { req } = makeRequest();
|
||||||
|
const res = createSseEndpoint<{ msg: string }>(req, {
|
||||||
|
subscribe: (n) => {
|
||||||
|
notify = n;
|
||||||
|
return () => {
|
||||||
|
unsubscribed++;
|
||||||
|
};
|
||||||
|
},
|
||||||
|
liveEventName: 'test',
|
||||||
|
heartbeatMs: 60_000,
|
||||||
|
});
|
||||||
|
// Cancel the reader so subsequent enqueues throw.
|
||||||
|
const reader = res.body!.getReader();
|
||||||
|
await Promise.resolve();
|
||||||
|
await Promise.resolve();
|
||||||
|
expect(notify).not.toBeNull();
|
||||||
|
await reader.cancel(); // closes the consumer side
|
||||||
|
// Now fire a live event — enqueue should throw → cleanup → unsubscribe.
|
||||||
|
notify!({ msg: 'will fail to enqueue' });
|
||||||
|
await new Promise((r) => setTimeout(r, 10));
|
||||||
|
expect(unsubscribed).toBe(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
test('3. cleanup is idempotent (abort then enqueue-fail)', async () => {
|
||||||
|
let unsubscribed = 0;
|
||||||
|
let notify: ((entry: { msg: string }) => void) | null = null;
|
||||||
|
const { req, abort } = makeRequest();
|
||||||
|
const res = createSseEndpoint<{ msg: string }>(req, {
|
||||||
|
subscribe: (n) => {
|
||||||
|
notify = n;
|
||||||
|
return () => {
|
||||||
|
unsubscribed++;
|
||||||
|
};
|
||||||
|
},
|
||||||
|
liveEventName: 'test',
|
||||||
|
heartbeatMs: 60_000,
|
||||||
|
});
|
||||||
|
const reader = res.body!.getReader();
|
||||||
|
await Promise.resolve();
|
||||||
|
await Promise.resolve();
|
||||||
|
abort();
|
||||||
|
await new Promise((r) => setTimeout(r, 10));
|
||||||
|
// Second cleanup edge — should be a no-op.
|
||||||
|
notify!({ msg: 'no-op' });
|
||||||
|
await new Promise((r) => setTimeout(r, 10));
|
||||||
|
expect(unsubscribed).toBe(1);
|
||||||
|
reader.cancel().catch(() => {});
|
||||||
|
});
|
||||||
|
|
||||||
|
test('4. initialReplay events reach the client before live events', async () => {
|
||||||
|
let notify: ((entry: { msg: string }) => void) | null = null;
|
||||||
|
const { req } = makeRequest();
|
||||||
|
const res = createSseEndpoint<{ msg: string }>(req, {
|
||||||
|
initialReplay: (send) => {
|
||||||
|
send('replay', { msg: 'first' });
|
||||||
|
},
|
||||||
|
subscribe: (n) => {
|
||||||
|
notify = n;
|
||||||
|
return () => {};
|
||||||
|
},
|
||||||
|
liveEventName: 'live',
|
||||||
|
heartbeatMs: 60_000,
|
||||||
|
});
|
||||||
|
// Trigger one live event soon after stream starts.
|
||||||
|
setTimeout(() => notify?.({ msg: 'second' }), 5);
|
||||||
|
const text = await readAll(res, 50);
|
||||||
|
expect(text).toContain('event: replay');
|
||||||
|
expect(text).toContain('"msg":"first"');
|
||||||
|
expect(text).toContain('event: live');
|
||||||
|
expect(text).toContain('"msg":"second"');
|
||||||
|
// Replay must come before live.
|
||||||
|
expect(text.indexOf('"first"')).toBeLessThan(text.indexOf('"second"'));
|
||||||
|
});
|
||||||
|
|
||||||
|
test('5. initialReplay throw triggers cleanup without subscribing', async () => {
|
||||||
|
let subscribed = 0;
|
||||||
|
const { req } = makeRequest();
|
||||||
|
const res = createSseEndpoint(req, {
|
||||||
|
initialReplay: () => {
|
||||||
|
throw new Error('replay boom');
|
||||||
|
},
|
||||||
|
subscribe: () => {
|
||||||
|
subscribed++;
|
||||||
|
return () => {};
|
||||||
|
},
|
||||||
|
liveEventName: 'test',
|
||||||
|
heartbeatMs: 60_000,
|
||||||
|
});
|
||||||
|
// Drain — stream should close cleanly.
|
||||||
|
const text = await readAll(res, 30);
|
||||||
|
expect(text).toBe(''); // no events
|
||||||
|
expect(subscribed).toBe(0); // never reached subscribe()
|
||||||
|
});
|
||||||
|
|
||||||
|
test('6. lone surrogates in payload string are sanitized', async () => {
|
||||||
|
let notify: ((entry: { msg: string }) => void) | null = null;
|
||||||
|
const { req } = makeRequest();
|
||||||
|
const res = createSseEndpoint<{ msg: string }>(req, {
|
||||||
|
subscribe: (n) => {
|
||||||
|
notify = n;
|
||||||
|
return () => {};
|
||||||
|
},
|
||||||
|
liveEventName: 'test',
|
||||||
|
heartbeatMs: 60_000,
|
||||||
|
});
|
||||||
|
setTimeout(() => {
|
||||||
|
// Lone high surrogate (no matching low). JSON.stringify would emit
|
||||||
|
// \uD800 escape that breaks Claude API. Helper must strip it.
|
||||||
|
notify?.({ msg: 'hello \uD800 world' });
|
||||||
|
}, 5);
|
||||||
|
const text = await readAll(res, 50);
|
||||||
|
expect(text).toContain('event: test');
|
||||||
|
// JSON.stringify emits U+FFFD as the literal character, not as escape.
|
||||||
|
expect(text).toContain('<27>');
|
||||||
|
// The raw lone-surrogate escape MUST NOT survive — that's the failure
|
||||||
|
// mode that breaks the Claude API with HTTP 400.
|
||||||
|
expect(text.toLowerCase()).not.toContain('\\ud800');
|
||||||
|
});
|
||||||
|
});
|
||||||
Loading…
Reference in New Issue