From 9607a37616cfd35a37dbd3218c15d838f0a56b6e Mon Sep 17 00:00:00 2001 From: Garry Tan Date: Tue, 26 May 2026 23:06:40 -0700 Subject: [PATCH] feat(brain): concurrent-refresh lockfile dedup (T15 / D3) When autoplan dispatches 4 planning skills back-to-back and they all hit a cold-miss on the same digest, only ONE actually fetches from the brain. The rest dedup via the project-scoped lockfile at ~/.gstack/projects//brain-cache/.refresh.lock. Reuses the 5-min stale-takeover convention from /sync-gbrain. Lock is taken over when: - File is older than CACHE_REFRESH_LOCK_TIMEOUT_MS - PID is on the same host and dead (process.kill(pid, 0) fails) - Lock file is corrupt (defensive) withRefreshLock(projectSlug, fn) returns either the callback's value or the literal 'dedup'. The CLI emits exit code 3 + diagnostic stderr on dedup, so callers can choose to wait + retry (resolver does this) or fall through to stale-but-usable behavior. test/cache-concurrent-refresh.test.ts: 7 tests covering acquire/release, stale-takeover, dead-PID takeover, corrupt-lock recovery, error-path release, and cross-project lock location. Co-Authored-By: Claude Opus 4.7 (1M context) --- bin/gstack-brain-cache | 129 ++++++++++++++++++++-- test/cache-concurrent-refresh.test.ts | 153 ++++++++++++++++++++++++++ 2 files changed, 274 insertions(+), 8 deletions(-) create mode 100644 test/cache-concurrent-refresh.test.ts diff --git a/bin/gstack-brain-cache b/bin/gstack-brain-cache index 8cbdb4e76..740f7ea19 100755 --- a/bin/gstack-brain-cache +++ b/bin/gstack-brain-cache @@ -19,12 +19,13 @@ * unreachable. Concurrent-refresh dedup is a follow-up commit (T15). */ -import { existsSync, mkdirSync, readFileSync, writeFileSync, renameSync, statSync, unlinkSync, readdirSync } from 'fs'; +import { existsSync, mkdirSync, readFileSync, writeFileSync, renameSync, statSync, unlinkSync, readdirSync, openSync, closeSync } from 'fs'; import { join, dirname } from 'path'; -import { homedir } from 'os'; +import { homedir, hostname } from 'os'; import { execGbrainJson, spawnGbrain } from '../lib/gbrain-exec'; import { BRAIN_CACHE_ENTITIES, + CACHE_REFRESH_LOCK_TIMEOUT_MS, GSTACK_SCHEMA_PACK_NAME, GSTACK_SCHEMA_PACK_VERSION, type BrainCacheEntity, @@ -215,6 +216,107 @@ export function cmdGet(entityName: string, projectSlug: string | null): GetResul // Subcommand: refresh // ────────────────────────────────────────────────────────────────────────── +// ────────────────────────────────────────────────────────────────────────── +// Lockfile dedup (T15 / D3) +// ────────────────────────────────────────────────────────────────────────── + +/** + * Returns the lock file path for a project scope. Cross-project entities + * still lock per-project (the project triggering the refresh holds the lock); + * concurrent attempts from different projects on cross-project entities + * serialize naturally because they're rare and the lock window is short. + */ +function lockPath(projectSlug: string | null): string { + const dir = projectSlug + ? join(GSTACK_HOME, 'projects', projectSlug, 'brain-cache') + : join(GSTACK_HOME, 'brain-cache'); + return join(dir, '.refresh.lock'); +} + +interface LockHandle { + fd: number; + path: string; +} + +/** + * Try to acquire the refresh lock. Returns null when another process holds it + * (and the lock is fresh). Stale locks (process dead OR older than the + * timeout) are taken over. + */ +function tryAcquireLock(projectSlug: string | null): LockHandle | null { + const path = lockPath(projectSlug); + mkdirSync(dirname(path), { recursive: true }); + + // If a lock exists, see if it's stale + if (existsSync(path)) { + try { + const raw = readFileSync(path, 'utf-8'); + const lock = JSON.parse(raw) as { pid: number; host: string; ts: number }; + const age = Date.now() - lock.ts; + const sameHost = lock.host === hostname(); + const processGone = sameHost && lock.pid > 0 && !isPidAlive(lock.pid); + if (age <= CACHE_REFRESH_LOCK_TIMEOUT_MS && !processGone) { + return null; // someone else holds a fresh lock + } + // Stale: take over + } catch { + // Corrupt lock file → take over + } + } + + // Write our lock (best-effort O_EXCL via tmp+rename for atomic creation) + const payload = JSON.stringify({ pid: process.pid, host: hostname(), ts: Date.now() }); + const tmp = `${path}.tmp.${process.pid}.${Date.now()}`; + try { + writeFileSync(tmp, payload); + renameSync(tmp, path); + } catch (err) { + return null; + } + + // Race: another process may have raced us. Re-read and verify ownership. + try { + const raw = readFileSync(path, 'utf-8'); + const lock = JSON.parse(raw) as { pid: number; host: string }; + if (lock.pid !== process.pid || lock.host !== hostname()) { + return null; + } + } catch { + return null; + } + return { fd: -1, path }; +} + +function releaseLock(handle: LockHandle): void { + try { unlinkSync(handle.path); } catch { /* best effort */ } +} + +function isPidAlive(pid: number): boolean { + try { + process.kill(pid, 0); + return true; + } catch (err: any) { + if (err?.code === 'EPERM') return true; // exists but we don't own it + return false; + } +} + +/** + * Run a refresh callback under the project-scoped lock. If another refresh is + * already in flight, returns 'dedup' and the caller can either wait + retry + * (the resolver does this) or fall through to stale-but-usable. Stale locks + * (process dead, or older than CACHE_REFRESH_LOCK_TIMEOUT_MS) are taken over. + */ +export function withRefreshLock(projectSlug: string | null, fn: () => T): T | 'dedup' { + const handle = tryAcquireLock(projectSlug); + if (!handle) return 'dedup'; + try { + return fn(); + } finally { + releaseLock(handle); + } +} + /** Refreshes one entity from the brain. Returns true on success. */ export function refreshEntity(entityName: string, projectSlug: string | null): boolean { const entity = BRAIN_CACHE_ENTITIES[entityName]; @@ -532,14 +634,25 @@ async function main(): Promise { return 0; } case 'refresh': { + // D3: dedup concurrent refreshes via lockfile. Skipped (dedup) when + // another process is already mid-refresh on the same project. if (flags.entity) { - const ok = refreshEntity(String(flags.entity), projectSlug); - process.stdout.write(ok ? `refreshed ${flags.entity}\n` : `failed to refresh ${flags.entity}\n`); - return ok ? 0 : 1; + const entityName = String(flags.entity); + const result = withRefreshLock(projectSlug, () => refreshEntity(entityName, projectSlug)); + if (result === 'dedup') { + process.stderr.write(`(dedup: another refresh in flight)\n`); + return 3; + } + process.stdout.write(result ? `refreshed ${entityName}\n` : `failed to refresh ${entityName}\n`); + return result ? 0 : 1; } - const { success, failed } = refreshAll(projectSlug); - process.stdout.write(`refreshed=${success} failed=${failed}\n`); - return failed > 0 ? 1 : 0; + const allResult = withRefreshLock(projectSlug, () => refreshAll(projectSlug)); + if (allResult === 'dedup') { + process.stderr.write(`(dedup: another refresh in flight)\n`); + return 3; + } + process.stdout.write(`refreshed=${allResult.success} failed=${allResult.failed}\n`); + return allResult.failed > 0 ? 1 : 0; } case 'invalidate': { const entityName = positional[0]; diff --git a/test/cache-concurrent-refresh.test.ts b/test/cache-concurrent-refresh.test.ts new file mode 100644 index 000000000..ef453edb0 --- /dev/null +++ b/test/cache-concurrent-refresh.test.ts @@ -0,0 +1,153 @@ +/** + * Concurrent-refresh lockfile dedup (T15 / D3). + * + * When autoplan dispatches 4 planning skills back-to-back and they all hit a + * cold-miss on the same digest, only ONE should actually fetch from the brain; + * the rest dedup via the project-scoped lockfile at + * ~/.gstack/projects//brain-cache/.refresh.lock. Stale locks (process + * dead, or older than CACHE_REFRESH_LOCK_TIMEOUT_MS) are taken over. + * + * Gate-tier, free, pure file-IO. Uses tmp GSTACK_HOME. + */ + +import { describe, test, expect, beforeEach, afterEach } from 'bun:test'; +import { mkdtempSync, existsSync, writeFileSync, readFileSync, rmSync, mkdirSync, unlinkSync } from 'fs'; +import { join } from 'path'; +import { tmpdir, hostname } from 'os'; + +let TMP_HOME: string; +const ORIGINAL_HOME = process.env.GSTACK_HOME; + +beforeEach(() => { + TMP_HOME = mkdtempSync(join(tmpdir(), 'gstack-lock-test-')); + process.env.GSTACK_HOME = TMP_HOME; + delete require.cache[require.resolve('../bin/gstack-brain-cache')]; +}); + +afterEach(() => { + if (ORIGINAL_HOME) process.env.GSTACK_HOME = ORIGINAL_HOME; + else delete process.env.GSTACK_HOME; + try { rmSync(TMP_HOME, { recursive: true, force: true }); } catch { /* best effort */ } +}); + +async function importCache(): Promise { + return (await import('../bin/gstack-brain-cache')) as typeof import('../bin/gstack-brain-cache'); +} + +describe('concurrent-refresh lockfile dedup', () => { + test('first caller acquires lock; second concurrent caller deduplicates', async () => { + const mod = await importCache(); + // Pre-create dirs to avoid Race On First Use. + mkdirSync(join(TMP_HOME, 'projects', 'helsinki', 'brain-cache'), { recursive: true }); + + let callbackRan = 0; + // Hold the lock by entering withRefreshLock and stalling inside the callback. + let outerResolve: (() => void) | null = null; + const outer = new Promise((r) => { outerResolve = r; }); + + const outerCall = (async () => { + const result = mod.withRefreshLock('helsinki', () => { + callbackRan++; + // Block until the test signals release. + const start = Date.now(); + while (!outerResolve) { /* spin briefly */ if (Date.now() - start > 100) break; } + return 'first'; + }); + return result; + })(); + + // Give outer call a tick to acquire lock. + await new Promise((r) => setTimeout(r, 10)); + + // Inner call should dedup since the lock file exists with a fresh ts. + // Manually verify by writing a fake lock and checking tryAcquireLock returns dedup. + const lockFile = join(TMP_HOME, 'projects', 'helsinki', 'brain-cache', '.refresh.lock'); + // Outer call already completed since the sync callback returns immediately. + // Stand up an artificial lock to simulate concurrent in-flight refresh. + writeFileSync(lockFile, JSON.stringify({ + pid: 999999, // unlikely-to-exist pid on host + host: 'some-other-host', + ts: Date.now(), + })); + const innerResult = mod.withRefreshLock('helsinki', () => 'inner'); + expect(innerResult).toBe('dedup'); + + // Cleanup + try { unlinkSync(lockFile); } catch { /* best effort */ } + + await outerCall; + }); + + test('stale lock (older than timeout) is taken over', async () => { + const mod = await importCache(); + mkdirSync(join(TMP_HOME, 'projects', 'helsinki', 'brain-cache'), { recursive: true }); + const lockFile = join(TMP_HOME, 'projects', 'helsinki', 'brain-cache', '.refresh.lock'); + // Lock is 10 minutes old — way past the 5-min timeout. + writeFileSync(lockFile, JSON.stringify({ + pid: 999999, + host: 'some-other-host', + ts: Date.now() - 10 * 60_000, + })); + const result = mod.withRefreshLock('helsinki', () => 'took-over'); + expect(result).toBe('took-over'); + }); + + test('lock from same host with dead PID is taken over', async () => { + const mod = await importCache(); + mkdirSync(join(TMP_HOME, 'projects', 'helsinki', 'brain-cache'), { recursive: true }); + const lockFile = join(TMP_HOME, 'projects', 'helsinki', 'brain-cache', '.refresh.lock'); + // Same host, but PID 999999 which is unlikely to exist. + writeFileSync(lockFile, JSON.stringify({ + pid: 999999, + host: hostname(), + ts: Date.now(), + })); + const result = mod.withRefreshLock('helsinki', () => 'took-over-dead-pid'); + expect(result).toBe('took-over-dead-pid'); + }); + + test('lock is released after callback runs', async () => { + const mod = await importCache(); + mkdirSync(join(TMP_HOME, 'projects', 'helsinki', 'brain-cache'), { recursive: true }); + const lockFile = join(TMP_HOME, 'projects', 'helsinki', 'brain-cache', '.refresh.lock'); + + mod.withRefreshLock('helsinki', () => 'done'); + + expect(existsSync(lockFile)).toBe(false); + }); + + test('lock is released even when callback throws', async () => { + const mod = await importCache(); + mkdirSync(join(TMP_HOME, 'projects', 'helsinki', 'brain-cache'), { recursive: true }); + const lockFile = join(TMP_HOME, 'projects', 'helsinki', 'brain-cache', '.refresh.lock'); + + expect(() => { + mod.withRefreshLock('helsinki', () => { + throw new Error('callback failed'); + }); + }).toThrow(); + + expect(existsSync(lockFile)).toBe(false); + }); + + test('corrupt lock file is taken over (defensive)', async () => { + const mod = await importCache(); + mkdirSync(join(TMP_HOME, 'projects', 'helsinki', 'brain-cache'), { recursive: true }); + const lockFile = join(TMP_HOME, 'projects', 'helsinki', 'brain-cache', '.refresh.lock'); + writeFileSync(lockFile, 'not valid json {{{'); + + const result = mod.withRefreshLock('helsinki', () => 'recovered'); + expect(result).toBe('recovered'); + }); + + test('cross-project lock uses ~/.gstack/brain-cache/.refresh.lock', async () => { + const mod = await importCache(); + mkdirSync(join(TMP_HOME, 'brain-cache'), { recursive: true }); + const lockFile = join(TMP_HOME, 'brain-cache', '.refresh.lock'); + + mod.withRefreshLock(null, () => 'cross-project'); + + // Lock file was created and then released + expect(existsSync(lockFile)).toBe(false); // released + }); +});