diff --git a/bin/gstack-memory-ingest.ts b/bin/gstack-memory-ingest.ts index 56b072b2b..7d5f06661 100644 --- a/bin/gstack-memory-ingest.ts +++ b/bin/gstack-memory-ingest.ts @@ -47,9 +47,14 @@ import { statSync, mkdirSync, appendFileSync, + renameSync, + openSync, + readSync, + closeSync, + rmSync, } from "fs"; import { join, basename, dirname } from "path"; -import { execSync, execFileSync } from "child_process"; +import { execSync, execFileSync, spawnSync, spawn, type ChildProcess } from "child_process"; import { homedir } from "os"; import { createHash } from "crypto"; @@ -73,6 +78,12 @@ interface CliArgs { sources: Set; limit: number | null; noWrite: boolean; + /** + * Opt-in per-file gitleaks scan during the prepare phase. Off by + * default — the cross-machine boundary (gstack-brain-sync, git push) + * has its own scanner. Setting this adds ~4-8 min to cold runs. + */ + scanSecrets: boolean; } type MemoryType = @@ -137,6 +148,14 @@ interface BulkResult { failed: number; duration_ms: number; partial_pages: number; + /** + * D6: when set, indicates a process-level failure (gbrain CLI missing + * or `gbrain import` crashed). Per-file errors (FILE_TOO_LARGE etc.) + * land in `failed` but do NOT set this flag — the orchestrator should + * still treat the run as OK with summary mentioning the failure count. + * Only when this is set does the verdict become ERR. + */ + system_error?: string; } // ── Constants ────────────────────────────────────────────────────────────── @@ -176,6 +195,9 @@ Options: --limit Stop after N pages written (smoke testing). --no-write Skip gbrain put_page calls (still updates state file). Used by tests + dry runs without actual ingest. + --scan-secrets Opt-in per-file gitleaks scan during prepare. Off by + default; gstack-brain-sync already gates the git-push + boundary. Adds ~4-8 min to cold runs. --help This text. `); } @@ -190,6 +212,7 @@ function parseArgs(): CliArgs { let limit: number | null = null; let sources: Set = new Set(ALL_TYPES); let noWrite = process.env.GSTACK_MEMORY_INGEST_NO_WRITE === "1"; + let scanSecrets = process.env.GSTACK_MEMORY_INGEST_SCAN_SECRETS === "1"; for (let i = 0; i < args.length; i++) { const a = args[i]; @@ -202,6 +225,7 @@ function parseArgs(): CliArgs { case "--include-unattributed": includeUnattributed = true; break; case "--all-history": allHistory = true; break; case "--no-write": noWrite = true; break; + case "--scan-secrets": scanSecrets = true; break; case "--limit": limit = parseInt(args[++i] || "0", 10); if (!Number.isFinite(limit) || limit <= 0) { @@ -229,7 +253,7 @@ function parseArgs(): CliArgs { } } - return { mode, quiet, benchmark, includeUnattributed, allHistory, sources, limit, noWrite }; + return { mode, quiet, benchmark, includeUnattributed, allHistory, sources, limit, noWrite, scanSecrets }; } // ── State file ───────────────────────────────────────────────────────────── @@ -268,9 +292,14 @@ function loadState(): IngestState { } function saveState(state: IngestState): void { + // F6 (Codex finding 6): tmp+rename atomic write so a crash mid-write + // never leaves a truncated/corrupt state file. Matches the pattern + // in gstack-gbrain-sync.ts:saveSyncState. try { mkdirSync(dirname(STATE_PATH), { recursive: true }); - writeFileSync(STATE_PATH, JSON.stringify(state, null, 2), "utf-8"); + const tmp = `${STATE_PATH}.tmp.${process.pid}`; + writeFileSync(tmp, JSON.stringify(state, null, 2), "utf-8"); + renameSync(tmp, STATE_PATH); } catch (err) { console.error(`[state] write failed: ${(err as Error).message}`); } @@ -278,12 +307,15 @@ function saveState(state: IngestState): void { // ── File hash + change detection ─────────────────────────────────────────── -function fileSha256(path: string, maxBytes = 1024 * 1024): string { - // Hash the first 1MB only; sufficient for change detection on big JSONL. +function fileSha256(path: string): string { + // F9 (Codex finding 9): full-file hash. The prior 1MB cap silently + // missed tail edits to long partial transcripts — exactly the + // recovery case this pipeline needs to handle correctly. Realistic + // max for an ingest source is ~50MB (long JSONL); fine to load in + // memory for hashing. try { - const fd = readFileSync(path); - const slice = fd.length > maxBytes ? fd.subarray(0, maxBytes) : fd; - return createHash("sha256").update(slice).digest("hex"); + const buf = readFileSync(path); + return createHash("sha256").update(buf).digest("hex"); } catch { return ""; } @@ -753,51 +785,66 @@ function buildArtifactPage(path: string, type: MemoryType): PageRecord { }; } -// ── Writer (calls `gbrain put`) ──────────────────────────────────────────── +// ── Writer (batch via `gbrain import `) ─────────────────────────────── +// +// Architecture (post plan-eng-review + Codex outside-voice): +// +// walkAllSources(ctx) +// → for each path: mtime-skip / source-file gitleaks (D3) / parse / buildPage +// → renderPageBody injects title/type/tags into YAML frontmatter +// → writeStaged: mkdir -p slug subdirs (D1), write ${slug}.md +// → snapshot ~/.gbrain/sync-failures.jsonl byte-offset (D7) +// → spawnSync `gbrain import --no-embed --json` (D6) +// → parseImportJson(stdout) → { imported, skipped, errors, ... } (D6 OK/ERR) +// → readNewFailures(preImportOffset, slugMap) → Set (D7) +// → state.sessions[path] = { ... } for prepared files NOT in failed set +// → saveStateAtomic (F6 tmp+rename) + cleanupStagingDir +// +// We trust gbrain's content_hash idempotency (verified in +// ~/git/gbrain/src/core/import-file.ts:242-243, :478) — repeated imports +// of identical content are cheap. So we do NOT track per-file skip_reasons, +// do NOT keep a SIGTERM checkpoint, and do NOT advance a three-state verdict. let _gbrainAvailability: boolean | null = null; function gbrainAvailable(): boolean { if (_gbrainAvailability !== null) return _gbrainAvailability; try { execSync("command -v gbrain", { stdio: "ignore" }); - // gbrain v0.27 retired the legacy `put_page` flag-form for `put ` - // (content via stdin, metadata as YAML frontmatter). Probe `--help` for - // the `put` subcommand so we surface a single clean error here rather - // than failing every page with "Unknown command: put_page". The regex - // anchors on the indented subcommand format gbrain's help actually uses - // (" put ..."), not any whitespace-bordered "put" word in prose. + // Probe `--help` for the `import` subcommand. gbrain v0.20.0+ ships + // `import ` (batch markdown import via path-authoritative slugs). + // If absent, we surface a single clean error here rather than failing + // the whole stage with a confusing usage message from gbrain itself. const help = execFileSync("gbrain", ["--help"], { encoding: "utf-8", timeout: 5000, stdio: ["ignore", "pipe", "pipe"], }); - _gbrainAvailability = /^\s+put\s/m.test(help); + _gbrainAvailability = /^\s+import\s/m.test(help); } catch { _gbrainAvailability = false; } return _gbrainAvailability; } -function gbrainPutPage(page: PageRecord): { ok: boolean; error?: string } { - if (!gbrainAvailable()) { - return { ok: false, error: "gbrain CLI not in PATH or missing `put` subcommand" }; - } - // gbrain v0.27+ uses `put ` (positional, content via stdin) instead - // of the legacy `put_page` flag form. Metadata rides as YAML frontmatter: - // - When the page body already starts with frontmatter (transcripts), inject - // title/type/tags into the existing block so gbrain's frontmatter parser - // picks them up. - // - When the page body has no frontmatter (raw artifacts: design-docs, - // learnings, builder-profile-entries), wrap with a fresh frontmatter - // carrying the same fields. Without this branch, artifact pages would - // land in gbrain with empty title/type/tags. +/** + * Build the markdown body with YAML frontmatter (title/type/tags) injected. + * + * Two cases: + * - Page body already starts with `---\n` (transcripts) — inject into the + * existing frontmatter block before its close fence so gbrain's frontmatter + * parser picks up the fields alongside any session-level metadata the + * transcript builder already wrote (session_id, cwd, git_remote, etc.). + * - No leading frontmatter (raw artifacts: design-docs, learnings, etc.) — + * wrap with a fresh frontmatter block carrying title/type/tags. Without + * this branch, artifact pages would land in gbrain with empty metadata. + * + * gbrain enforces slug = path-derived (slugifyPath in gbrain's sync.ts). + * We do NOT set `slug:` in frontmatter — the staging-dir filename is the + * source of truth and gbrain rejects mismatches. + */ +function renderPageBody(page: PageRecord): string { let body = page.body; if (body.startsWith("---\n")) { - // Locate the closing --- delimiter. buildTranscriptPage joins with "\n" - // and does not append a trailing newline, so the close fence looks like - // "...\n---" followed directly by body content (no "\n---\n" pattern). - // Match the close on "\n---" only — the inject lands BEFORE the close - // fence, inside the frontmatter block, regardless of what follows it. const end = body.indexOf("\n---", 4); if (end > 0) { const inject = [ @@ -819,25 +866,151 @@ function gbrainPutPage(page: PageRecord): { ok: boolean; error?: string } { body, ].join("\n"); } - try { - execFileSync("gbrain", ["put", page.slug], { - input: body, - encoding: "utf-8", - // Bumped from 30s: auto-link reconciliation on dense transcripts hits - // 30s once the brain has a few hundred existing pages. - timeout: 60000, - // Bumped from default 1MB: without this, gbrain's actual stderr gets - // truncated and callers see only "Command failed:" with no detail. - maxBuffer: 16 * 1024 * 1024, - stdio: ["pipe", "pipe", "pipe"], - }); - return { ok: true }; - } catch (err: any) { - const stderr = err?.stderr?.toString?.() ?? ""; - const stdout = err?.stdout?.toString?.() ?? ""; - const detail = stderr || stdout || (err instanceof Error ? err.message : String(err)); - return { ok: false, error: detail.split("\n")[0].slice(0, 300) }; + return body; +} + +interface PreparedPage { + /** Page slug (path-shaped, e.g. "transcripts/claude-code/foo"). */ + slug: string; + /** Original source file on disk (e.g. ~/.claude/projects/.../foo.jsonl). */ + source_path: string; + /** Full markdown including frontmatter — ready to write. */ + rendered_body: string; + /** Carry-through fields for state recording on success. */ + page_slug: string; + partial: boolean; +} + +interface StagingResult { + staging_dir: string; + written: number; + errors: Array<{ slug: string; error: string }>; + /** Map from staging-dir-relative path (e.g. "transcripts/foo.md") → source path. */ + stagedPathToSource: Map; +} + +/** + * Write prepared pages to a staging dir, mirroring slug hierarchy. + * + * D1: gbrain's `slugifyPath` (sync.ts:260) derives the slug from the + * directory-aware relative path inside the import dir, so slugs containing + * slashes (e.g. "transcripts/claude-code/foo") must live in matching + * subdirectories of the staging dir. Otherwise the slug becomes flattened + * or rejected by gbrain's path-vs-frontmatter slug check (import-file.ts:429). + * + * Filename = `${slug}.md`. mkdir is recursive. Existing files overwrite. + * Errors per-file are collected; the whole batch is best-effort. + */ +function writeStaged(prepared: PreparedPage[], stagingDir: string): StagingResult { + mkdirSync(stagingDir, { recursive: true }); + const stagedPathToSource = new Map(); + const errors: Array<{ slug: string; error: string }> = []; + let written = 0; + for (const p of prepared) { + const relPath = `${p.slug}.md`; + const absPath = join(stagingDir, relPath); + try { + mkdirSync(dirname(absPath), { recursive: true }); + writeFileSync(absPath, p.rendered_body, "utf-8"); + stagedPathToSource.set(relPath, p.source_path); + written++; + } catch (err) { + errors.push({ slug: p.slug, error: (err as Error).message }); + } } + return { staging_dir: stagingDir, written, errors, stagedPathToSource }; +} + +interface ImportJsonResult { + status?: string; + duration_s?: number; + imported?: number; + skipped?: number; + errors?: number; + chunks?: number; + total_files?: number; +} + +/** + * Parse the `gbrain import --json` stdout payload (single JSON object on + * the last non-empty line per commands/import.ts:271-275). + * + * Returns parsed counts on success, or `null` to signal "unparseable" — the + * caller treats null as ERR (system_error) rather than silently passing + * through as zeros. Pre-2026-05-11 this returned zeros on parse failure, + * which silently masked gbrain crashes as "0 imported, 0 failed = OK". + */ +function parseImportJson(stdout: string): ImportJsonResult | null { + const lines = stdout.split("\n").map((s) => s.trim()).filter(Boolean); + for (let i = lines.length - 1; i >= 0; i--) { + const line = lines[i]; + if (line.startsWith("{") && line.endsWith("}")) { + try { + const parsed = JSON.parse(line); + if (typeof parsed === "object" && parsed && "imported" in parsed) { + return parsed as ImportJsonResult; + } + } catch { + // try next line up + } + } + } + return null; +} + +/** + * Read failures appended to ~/.gbrain/sync-failures.jsonl since the + * snapshotted byte offset, and map them back to source paths. + * + * D7: gbrain import writes per-file failures to sync-failures.jsonl + * (commands/import.ts:308-310) explicitly so "callers can gate state + * advances" (comment at :28). We snapshot the file size before import + * and read only the appended bytes after, so we never confuse new + * entries with prior-run leftovers. + * + * Each line is `{ path, error, code, commit, ts }`. The `path` is the + * staging-dir-relative filename gbrain saw (e.g. "transcripts/foo.md"). + * stagedPathToSource maps that back to the original source file. + */ +function readNewFailures( + syncFailuresPath: string, + preImportOffset: number, + stagedPathToSource: Map, +): Set { + const failed = new Set(); + try { + if (!existsSync(syncFailuresPath)) return failed; + const stat = statSync(syncFailuresPath); + if (stat.size <= preImportOffset) return failed; + // Read appended bytes only. readSync with a positional offset works + // synchronously without slurping the whole file. + const fd = openSync(syncFailuresPath, "r"); + try { + const buf = Buffer.alloc(stat.size - preImportOffset); + readSync(fd, buf, 0, buf.length, preImportOffset); + const text = buf.toString("utf-8"); + for (const line of text.split("\n")) { + const trimmed = line.trim(); + if (!trimmed) continue; + try { + const entry = JSON.parse(trimmed) as { path?: string }; + if (entry.path) { + const source = stagedPathToSource.get(entry.path); + if (source) failed.add(source); + } + } catch { + // ignore malformed line + } + } + } finally { + closeSync(fd); + } + } catch { + // Best-effort. If we can't read failures, we conservatively assume + // none — caller will state-record all prepared files. Worst case: + // failed files get a retry-on-next-run shot anyway via content_hash. + } + return failed; } // ── Main ingest passes ───────────────────────────────────────────────────── @@ -896,34 +1069,72 @@ async function probeMode(args: CliArgs): Promise { }; } -async function ingestPass(args: CliArgs): Promise { - const t0 = Date.now(); - const state = loadState(); - const ctx = makeWalkContext(args, state); - - let written = 0; +/** + * Prepare phase: walk sources, apply incremental + optional-secret-scan filters, + * parse transcripts/artifacts into PageRecord, render bodies with + * frontmatter. Returns the PreparedPage[] to stage + counts of files + * filtered at each gate. + * + * Secret scanning policy (post 2026-05-10 perf review): + * + * The actual cross-machine exfiltration boundary is `gstack-brain-sync`, + * which runs a regex-based secret scanner on the staged diff before + * `git commit` (see bin/gstack-brain-sync:78-110: AWS keys, GitHub + * tokens, OpenAI keys, PEM blocks, JWTs, bearer-token-in-JSON). That's + * the right place — it gates content leaving the machine. + * + * memory-ingest, by contrast, moves data from one local file to a + * local PGLite database. Scanning every source file at ingest time + * doesn't change exposure (the secret already lives in plaintext + * where the user keeps their transcripts and artifacts) but costs + * ~470s on cold runs. We removed the per-file gitleaks gate as + * redundant defense-in-depth and made it opt-in via `--scan-secrets` + * for users who want belt-and-suspenders. + */ +function preparePages( + args: CliArgs, + ctx: WalkContext, + state: IngestState, +): { + prepared: PreparedPage[]; + skippedSecret: number; + skippedDedup: number; + skippedUnattributed: number; + parseFailed: number; + partialPages: number; +} { + const prepared: PreparedPage[] = []; let skippedSecret = 0; let skippedDedup = 0; let skippedUnattributed = 0; - let failed = 0; + let parseFailed = 0; let partialPages = 0; for (const { path, type } of walkAllSources(ctx)) { - if (args.limit !== null && written >= args.limit) break; + if (args.limit !== null && prepared.length >= args.limit) break; if (args.mode === "incremental" && !fileChangedSinceState(path, state)) { skippedDedup++; continue; } - // Secret scan first - const scan = secretScanFile(path); - if (scan.scanner === "gitleaks" && scan.findings.length > 0) { - skippedSecret++; - if (!args.quiet) { - console.error(`[secret-scan match] ${path} (${scan.findings.length} finding${scan.findings.length === 1 ? "" : "s"}); skipped`); + // Optional belt-and-suspenders: when --scan-secrets is set, scan the + // source file with gitleaks and skip dirty ones. Off by default + // because gstack-brain-sync already gates the cross-machine boundary + // and per-file gitleaks costs ~256ms/file (4-8 min on a real corpus). + if (args.scanSecrets) { + const scan = secretScanFile(path); + if (scan.scanner === "gitleaks" && scan.findings.length > 0) { + skippedSecret++; + if (!args.quiet) { + console.error( + `[secret-scan match] ${path} (${scan.findings.length} finding${ + scan.findings.length === 1 ? "" : "s" + }); skipped`, + ); + } + continue; } - continue; } let page: PageRecord; @@ -931,7 +1142,7 @@ async function ingestPass(args: CliArgs): Promise { if (type === "transcript") { const session = parseTranscriptJsonl(path); if (!session) { - failed++; + parseFailed++; continue; } if (!args.includeUnattributed && !session.cwd) { @@ -948,38 +1159,373 @@ async function ingestPass(args: CliArgs): Promise { page = buildArtifactPage(path, type); } } catch (err) { - failed++; + parseFailed++; console.error(`[parse-error] ${path}: ${(err as Error).message}`); continue; } - const result = args.noWrite - ? { ok: true } - : await withErrorContext( - `put_page:${page.slug}`, - async () => gbrainPutPage(page), - "gstack-memory-ingest" - ); - if (!result.ok) { - failed++; - if (!args.quiet) { - console.error(`[put-error] ${page.slug}: ${result.error || "unknown"}`); + prepared.push({ + slug: page.slug, + source_path: path, + rendered_body: renderPageBody(page), + page_slug: page.slug, + partial: page.partial ?? false, + }); + } + + return { + prepared, + skippedSecret, + skippedDedup, + skippedUnattributed, + parseFailed, + partialPages, + }; +} + +/** + * Make a per-run staging directory at ~/.gstack/.staging-ingest--/ + * The pid+ts namespace avoids collisions when two ingest passes run + * concurrently (the orchestrator's lock should prevent this, but + * defense-in-depth). + */ +function makeStagingDir(): string { + const dir = join(GSTACK_HOME, `.staging-ingest-${process.pid}-${Date.now()}`); + mkdirSync(dir, { recursive: true }); + return dir; +} + +/** + * Best-effort recursive cleanup. Failures swallowed — at worst we leak a + * staging dir to disk; the next run uses a new one and they age out via + * normal disk hygiene. We deliberately do NOT crash the pipeline on + * cleanup failure. + */ +function cleanupStagingDir(dir: string): void { + try { + rmSync(dir, { recursive: true, force: true }); + } catch { + // best-effort + } +} + +/** + * Track the currently-running gbrain import child + active staging dir so + * SIGTERM/SIGINT on the parent process can: + * 1. forward the signal to the child (otherwise gbrain orphans, holds the + * PGLite write lock, and burns CPU — observed during 2026-05-10 cold-run + * testing) + * 2. synchronously clean up the staging dir BEFORE process.exit (otherwise + * finally blocks in async callers don't run after process.exit from + * inside a signal handler, leaking the staging dir on every interrupt) + */ +let _activeImportChild: ChildProcess | null = null; +let _activeStagingDir: string | null = null; +let _signalHandlersInstalled = false; +function installSignalForwarder(): void { + if (_signalHandlersInstalled) return; + _signalHandlersInstalled = true; + const forward = (signal: NodeJS.Signals) => () => { + if (_activeImportChild && _activeImportChild.pid && !_activeImportChild.killed) { + try { + process.kill(_activeImportChild.pid, signal); + } catch { + // child may have already exited between the alive-check and the kill + } + } + // Synchronously clean up the active staging dir before exiting. The async + // `finally` blocks in ingestPass never run after process.exit fires from + // inside this handler, so cleanup has to happen here. + if (_activeStagingDir) { + cleanupStagingDir(_activeStagingDir); + _activeStagingDir = null; + } + // Re-raise to default action so the parent actually exits. Without this, + // a SIGTERM handler that doesn't exit holds the process alive. + process.exit(signal === "SIGINT" ? 130 : 143); + }; + process.on("SIGTERM", forward("SIGTERM")); + process.on("SIGINT", forward("SIGINT")); +} + +/** + * Run gbrain import as an async child so we can install signal handlers + * that kill the child on parent SIGTERM/SIGINT. Returns the same shape as + * spawnSync's result so the caller doesn't care which mode was used. + */ +function runGbrainImport( + stagingDir: string, + timeoutMs: number, +): Promise<{ status: number | null; stdout: string; stderr: string }> { + installSignalForwarder(); + return new Promise((resolve) => { + const child = spawn( + "gbrain", + ["import", stagingDir, "--no-embed", "--json"], + { stdio: ["ignore", "pipe", "pipe"] }, + ); + _activeImportChild = child; + let stdout = ""; + let stderr = ""; + let timedOut = false; + const timer = setTimeout(() => { + timedOut = true; + try { + if (child.pid) process.kill(child.pid, "SIGTERM"); + } catch { + // already gone + } + }, timeoutMs); + child.stdout?.on("data", (chunk) => { + stdout += chunk.toString("utf-8"); + }); + child.stderr?.on("data", (chunk) => { + stderr += chunk.toString("utf-8"); + }); + child.on("close", (status) => { + clearTimeout(timer); + _activeImportChild = null; + resolve({ + status: timedOut ? null : status, + stdout, + stderr, + }); + }); + child.on("error", (err) => { + clearTimeout(timer); + _activeImportChild = null; + resolve({ + status: null, + stdout, + stderr: stderr + `\n[spawn-error] ${(err as Error).message}`, + }); + }); + }); +} + +async function ingestPass(args: CliArgs): Promise { + const t0 = Date.now(); + const state = loadState(); + const ctx = makeWalkContext(args, state); + + // Phase 1: prepare (parse + secret-scan + filter + render frontmatter). + const prep = preparePages(args, ctx, state); + + let written = 0; + let failed = 0; + + if (args.noWrite) { + // --no-write: skip the gbrain import call but still record state for + // prepared pages (treat them as ingested for dedup purposes). Matches + // the prior contract from --help: "Skip gbrain put_page calls (still + // updates state file)". + const nowIso = new Date().toISOString(); + for (const p of prep.prepared) { + try { + state.sessions[p.source_path] = { + mtime_ns: Math.floor(statSync(p.source_path).mtimeMs * 1e6), + sha256: fileSha256(p.source_path), + ingested_at: nowIso, + page_slug: p.page_slug, + partial: p.partial, + }; + written++; + } catch { + // best-effort state record + } + } + state.last_full_walk = new Date().toISOString(); + state.last_writer = "gstack-memory-ingest"; + saveState(state); + return { + written, + skipped_secret: prep.skippedSecret, + skipped_dedup: prep.skippedDedup, + skipped_unattributed: prep.skippedUnattributed, + failed: prep.parseFailed, + duration_ms: Date.now() - t0, + partial_pages: prep.partialPages, + }; + } + + if (prep.prepared.length === 0) { + // Nothing to import — still touch state.last_full_walk and exit. + state.last_full_walk = new Date().toISOString(); + state.last_writer = "gstack-memory-ingest"; + saveState(state); + return { + written: 0, + skipped_secret: prep.skippedSecret, + skipped_dedup: prep.skippedDedup, + skipped_unattributed: prep.skippedUnattributed, + failed: prep.parseFailed, + duration_ms: Date.now() - t0, + partial_pages: prep.partialPages, + }; + } + + if (!gbrainAvailable()) { + const msg = + "gbrain CLI not in PATH or missing `import` subcommand. Run /setup-gbrain."; + console.error(`[memory-ingest] ERR: ${msg}`); + return { + written: 0, + skipped_secret: prep.skippedSecret, + skipped_dedup: prep.skippedDedup, + skipped_unattributed: prep.skippedUnattributed, + failed: prep.parseFailed + prep.prepared.length, + duration_ms: Date.now() - t0, + partial_pages: prep.partialPages, + system_error: msg, + }; + } + + // Phase 2: stage to a per-run dir + invoke gbrain import. + const stagingDir = makeStagingDir(); + // Register staging dir with the signal forwarder so SIGTERM/SIGINT can + // synchronously clean it up before process.exit (the async finally block + // below does NOT run after a signal-handler exit). + _activeStagingDir = stagingDir; + try { + const staging = writeStaged(prep.prepared, stagingDir); + failed += staging.errors.length; + if (!args.quiet && staging.errors.length > 0) { + for (const e of staging.errors.slice(0, 5)) { + console.error(`[stage-error] ${e.slug}: ${e.error}`); } - continue; } - state.sessions[path] = { - mtime_ns: Math.floor(statSync(path).mtimeMs * 1e6), - sha256: page.content_sha256, - ingested_at: new Date().toISOString(), - page_slug: page.slug, - partial: page.partial, - }; - written++; - if (!args.quiet) { - const tag = page.partial ? " [partial]" : ""; - console.log(`[${written}] ${page.slug}${tag}`); + // D7: snapshot sync-failures.jsonl byte-offset before import so we + // can read only newly-appended failure entries afterwards. + const syncFailuresPath = join(homedir(), ".gbrain", "sync-failures.jsonl"); + let preImportOffset = 0; + try { + if (existsSync(syncFailuresPath)) { + preImportOffset = statSync(syncFailuresPath).size; + } + } catch { + // best-effort; absent file → 0 offset, all future entries are "new" } + + if (!args.quiet) { + console.error( + `[memory-ingest] staged ${staging.written} pages → ${stagingDir}; running gbrain import...`, + ); + } + + // D6: single batch import. `--no-embed` matches the prior per-file + // behavior (we never enabled embedding); embeddings happen on-demand + // via gbrain's own pipelines. `--json` gives us structured counts. + // + // Async spawn (not spawnSync) so the signal forwarder installed in + // runGbrainImport propagates SIGTERM/SIGINT to the child. With sync + // spawn, parent termination orphans the gbrain process (observed + // during 2026-05-10 cold-run testing — gbrain kept running 15 min + // after the orchestrator timed out). + const importResult = await runGbrainImport(stagingDir, 30 * 60 * 1000); + + const stdout = importResult.stdout || ""; + const stderr = importResult.stderr || ""; + const importJson = parseImportJson(stdout); + + if (importResult.status !== 0) { + const tail = (stderr.trim().split("\n").pop() || "").slice(0, 300); + const msg = `gbrain import exited ${importResult.status}: ${tail}`; + console.error(`[memory-ingest] ERR: ${msg}`); + // We conservatively state-record nothing on a non-zero exit — per-run + // partial progress is invisible to us when the importer crashed. + // sync-failures.jsonl entries may still hold per-file detail. + failed += prep.prepared.length; + return { + written: 0, + skipped_secret: prep.skippedSecret, + skipped_dedup: prep.skippedDedup, + skipped_unattributed: prep.skippedUnattributed, + failed, + duration_ms: Date.now() - t0, + partial_pages: prep.partialPages, + system_error: msg, + }; + } + + if (!args.quiet) { + // Echo gbrain's own progress lines on stderr through so the user sees + // them when running interactively. Already on our stderr from the + // child via `stdio: pipe`, but we explicitly forward for clarity. + process.stderr.write(stderr); + } + + if (importJson === null) { + // gbrain exited 0 but didn't emit a parseable --json line. Treat as + // ERR rather than silently passing zeros through — silent zeros let + // a future gbrain-output regression mask data loss. + const msg = + "gbrain import exited 0 but emitted no parseable --json payload. " + + "Refusing to advance state."; + console.error(`[memory-ingest] ERR: ${msg}`); + failed += prep.prepared.length; + return { + written: 0, + skipped_secret: prep.skippedSecret, + skipped_dedup: prep.skippedDedup, + skipped_unattributed: prep.skippedUnattributed, + failed, + duration_ms: Date.now() - t0, + partial_pages: prep.partialPages, + system_error: msg, + }; + } + + // D7: identify which staged files failed to import and exclude them + // from state recording. Source paths get a retry on the next run. + const failedSources = readNewFailures( + syncFailuresPath, + preImportOffset, + staging.stagedPathToSource, + ); + failed += failedSources.size; + + // Phase 3: state recording. Only files that landed in gbrain get + // their mtime+sha256 stamped. Failed source paths are deliberately + // left un-state'd so the next run re-prepares them and gbrain's + // content_hash dedup short-circuits the import. + const nowIso = new Date().toISOString(); + for (const p of prep.prepared) { + if (failedSources.has(p.source_path)) continue; + try { + state.sessions[p.source_path] = { + mtime_ns: Math.floor(statSync(p.source_path).mtimeMs * 1e6), + sha256: fileSha256(p.source_path), + ingested_at: nowIso, + page_slug: p.page_slug, + partial: p.partial, + }; + written++; + if (!args.quiet) { + const tag = p.partial ? " [partial]" : ""; + console.log(`[${written}] ${p.page_slug}${tag}`); + } + } catch (err) { + // statSync can fail if the source file was removed mid-run; skip + // recording but don't fail the whole pass. + console.error( + `[state-record] ${p.source_path}: ${(err as Error).message}`, + ); + } + } + + if (!args.quiet) { + console.error( + `[memory-ingest] gbrain import: ${importJson.imported ?? 0} imported, ` + + `${importJson.skipped ?? 0} unchanged, ${importJson.errors ?? 0} failed` + + (failedSources.size > 0 + ? ` (see ~/.gbrain/sync-failures.jsonl for details)` + : ""), + ); + } + } finally { + cleanupStagingDir(stagingDir); + _activeStagingDir = null; } state.last_full_walk = new Date().toISOString(); @@ -988,12 +1534,12 @@ async function ingestPass(args: CliArgs): Promise { return { written, - skipped_secret: skippedSecret, - skipped_dedup: skippedDedup, - skipped_unattributed: skippedUnattributed, - failed, + skipped_secret: prep.skippedSecret, + skipped_dedup: prep.skippedDedup, + skipped_unattributed: prep.skippedUnattributed, + failed: failed + prep.parseFailed, duration_ms: Date.now() - t0, - partial_pages: partialPages, + partial_pages: prep.partialPages, }; } @@ -1067,11 +1613,15 @@ async function main(): Promise { if (result.written > 0 || result.failed > 0) { console.error(`[memory-ingest] ${result.written} written, ${result.failed} failed in ${dt}ms`); } + // D6: system_error → process-level failure; orchestrator sees ERR. + // Per-file errors do NOT exit non-zero. + if (result.system_error) process.exit(1); return; } const result = await ingestPass(args); printBulkResult(result, args); + if (result.system_error) process.exit(1); } main().catch((err) => {