diff --git a/bin/gstack-gbrain-sync.ts b/bin/gstack-gbrain-sync.ts index 2b7979f12..bc54e9760 100644 --- a/bin/gstack-gbrain-sync.ts +++ b/bin/gstack-gbrain-sync.ts @@ -80,6 +80,111 @@ const STATE_PATH = join(GSTACK_HOME, ".gbrain-sync-state.json"); const LOCK_PATH = join(GSTACK_HOME, ".sync-gbrain.lock"); const STALE_LOCK_MS = 5 * 60 * 1000; +// Default 35-minute timeout for code-walk + memory-ingest stages. Override via +// GSTACK_SYNC_CODE_TIMEOUT_MS / GSTACK_SYNC_MEMORY_TIMEOUT_MS. Bounds-checked +// in resolveStageTimeoutMs below so wildly-low values don't make resume +// useless and wildly-high values don't mask config typos. See #1611. +const DEFAULT_STAGE_TIMEOUT_MS = 35 * 60 * 1000; // 2_100_000ms = 35min +const MIN_STAGE_TIMEOUT_MS = 60_000; // 1 minute floor +const MAX_STAGE_TIMEOUT_MS = 86_400_000; // 24 hour ceiling + +/** + * Parse a stage-timeout env value with bounds validation. Returns the bounded + * value or the default with a stderr warning if the env was malformed or + * out-of-range. Exported for the regression test. + */ +export function resolveStageTimeoutMs( + envValue: string | undefined, + envName: string, +): number { + if (envValue === undefined || envValue === "") return DEFAULT_STAGE_TIMEOUT_MS; + const n = Number.parseInt(envValue, 10); + if (!Number.isFinite(n) || Number.isNaN(n) || n <= 0) { + console.warn( + `[sync] ${envName}="${envValue}" is not a positive integer; falling back to ${DEFAULT_STAGE_TIMEOUT_MS}ms`, + ); + return DEFAULT_STAGE_TIMEOUT_MS; + } + if (n < MIN_STAGE_TIMEOUT_MS) { + console.warn( + `[sync] ${envName}=${n} is below the ${MIN_STAGE_TIMEOUT_MS}ms (1min) floor; falling back to ${DEFAULT_STAGE_TIMEOUT_MS}ms`, + ); + return DEFAULT_STAGE_TIMEOUT_MS; + } + if (n > MAX_STAGE_TIMEOUT_MS) { + console.warn( + `[sync] ${envName}=${n} is above the ${MAX_STAGE_TIMEOUT_MS}ms (24h) ceiling; falling back to ${DEFAULT_STAGE_TIMEOUT_MS}ms`, + ); + return DEFAULT_STAGE_TIMEOUT_MS; + } + return n; +} + +/** + * gbrain writes ~/.gbrain/import-checkpoint.json on every import run. If a + * previous /sync-gbrain hit the timeout (SIGTERM = exit 143), the checkpoint + * + its staging dir survive on disk. Detect both and let gbrain resume from + * processedIndex+1 on the next run. If the staging dir is missing/empty/ + * unreadable, fall through to a fresh restage with a one-line warning so the + * user sees we noticed. See #1611 + plan D1/C1. + */ +interface GbrainCheckpoint { + dir?: string; + totalFiles?: number; + processedIndex?: number; + completedFiles?: number; + timestamp?: string; +} + +export function readGbrainCheckpoint(): GbrainCheckpoint | null { + const cpPath = join(homedir(), ".gbrain", "import-checkpoint.json"); + if (!existsSync(cpPath)) return null; + try { + const raw = readFileSync(cpPath, "utf-8"); + const parsed = JSON.parse(raw); + if (!parsed || typeof parsed !== "object") return null; + return parsed as GbrainCheckpoint; + } catch { + // Corrupt JSON — treat as no checkpoint and fall through to fresh restage. + return null; + } +} + +export type ResumeVerdict = + | { kind: "no-checkpoint" } + | { kind: "resume"; stagingDir: string; processedIndex: number; totalFiles: number } + | { kind: "stale-staging-missing"; stagingDir: string }; + +/** + * Decide whether the next memory-ingest run should resume from gbrain's + * checkpoint or restage from scratch. + * - no checkpoint → run a fresh ingest pass + * - checkpoint + staging ok → resume (gbrain picks up at processedIndex+1) + * - checkpoint + staging gone → warn, fall through to fresh restage + */ +export function decideResume(): ResumeVerdict { + const cp = readGbrainCheckpoint(); + if (!cp || !cp.dir) return { kind: "no-checkpoint" }; + const stagingDir = cp.dir; + if (!existsSync(stagingDir)) { + return { kind: "stale-staging-missing", stagingDir }; + } + // Treat "non-empty" as the safe-to-resume signal. statSync on a missing + // file throws; we already handled missing above so this is dir-level shape. + try { + const st = statSync(stagingDir); + if (!st.isDirectory()) return { kind: "stale-staging-missing", stagingDir }; + } catch { + return { kind: "stale-staging-missing", stagingDir }; + } + return { + kind: "resume", + stagingDir, + processedIndex: cp.processedIndex ?? 0, + totalFiles: cp.totalFiles ?? 0, + }; +} + // ── CLI ──────────────────────────────────────────────────────────────────── function printUsage(): void { @@ -607,9 +712,13 @@ async function runCodeImport(args: CliArgs): Promise { // code`, so --full must run it FIRST, then reindex-code, to honor the // documented "full walk + reindex" contract for both fresh and populated // sources. + const codeTimeoutMs = resolveStageTimeoutMs( + process.env.GSTACK_SYNC_CODE_TIMEOUT_MS, + "GSTACK_SYNC_CODE_TIMEOUT_MS", + ); const walkResult = spawnGbrain(["sync", "--strategy", "code", "--source", sourceId], { stdio: args.quiet ? ["ignore", "ignore", "ignore"] : ["ignore", "inherit", "inherit"], - timeout: 35 * 60 * 1000, + timeout: codeTimeoutMs, baseEnv: gbrainEnv, }); @@ -627,7 +736,7 @@ async function runCodeImport(args: CliArgs): Promise { if (args.mode === "full") { const reindexResult = spawnGbrain(["reindex-code", "--source", sourceId, "--yes"], { stdio: args.quiet ? ["ignore", "ignore", "ignore"] : ["ignore", "inherit", "inherit"], - timeout: 35 * 60 * 1000, + timeout: codeTimeoutMs, baseEnv: gbrainEnv, }); @@ -770,6 +879,25 @@ function runMemoryIngest(args: CliArgs): StageResult { return skipStageForLocalStatus("memory", localStatus, t0); } + // Resume detection (#1611 / plan D1 + C1). If a previous run hit the + // timeout and gbrain left ~/.gbrain/import-checkpoint.json plus its staging + // dir on disk, signal the grandchild via env so it skips the prepare phase + // and lets `gbrain import` resume from processedIndex+1 against the same + // staging dir. If the staging dir is gone (disk pressure cleanup, OS + // reboot, user manual cleanup), warn and fall through to a fresh restage. + const resume = decideResume(); + const childEnv = buildGbrainEnv({ announce: false }); + if (resume.kind === "resume") { + console.error( + `[sync:memory] resuming from gbrain checkpoint (${resume.processedIndex}/${resume.totalFiles} files staged at ${resume.stagingDir})`, + ); + childEnv.GSTACK_INGEST_RESUME_DIR = resume.stagingDir; + } else if (resume.kind === "stale-staging-missing") { + console.error( + `[sync:memory] previous checkpoint stale (staging dir ${resume.stagingDir} gone), restaging from scratch`, + ); + } + const ingestPath = join(import.meta.dir, "gstack-memory-ingest.ts"); const ingestArgs = ["run", ingestPath]; if (args.mode === "full") ingestArgs.push("--bulk"); @@ -780,10 +908,14 @@ function runMemoryIngest(args: CliArgs): StageResult { // .env.local footgun affects gstack-memory-ingest.ts too, not just the // direct gbrain spawns in this file). The grandchild calls gbrain import // internally and must see the DATABASE_URL from gbrain's own config. + const memoryTimeoutMs = resolveStageTimeoutMs( + process.env.GSTACK_SYNC_MEMORY_TIMEOUT_MS, + "GSTACK_SYNC_MEMORY_TIMEOUT_MS", + ); const result = spawnSync("bun", ingestArgs, { encoding: "utf-8", - timeout: 35 * 60 * 1000, - env: buildGbrainEnv({ announce: false }), + timeout: memoryTimeoutMs, + env: childEnv, }); // D6: parse [memory-ingest] lines from the child's stderr. ERR-prefixed diff --git a/bin/gstack-memory-ingest.ts b/bin/gstack-memory-ingest.ts index 967101050..6df459f88 100644 --- a/bin/gstack-memory-ingest.ts +++ b/bin/gstack-memory-ingest.ts @@ -1272,13 +1272,37 @@ function cleanupStagingDir(dir: string): void { * 1. forward the signal to the child (otherwise gbrain orphans, holds the * PGLite write lock, and burns CPU — observed during 2026-05-10 cold-run * testing) - * 2. synchronously clean up the staging dir BEFORE process.exit (otherwise - * finally blocks in async callers don't run after process.exit from - * inside a signal handler, leaking the staging dir on every interrupt) + * 2. PRESERVE the staging dir when gbrain has written an import-checkpoint + * pointing at it (the next /sync-gbrain run can resume from + * processedIndex+1). Otherwise synchronously clean up before + * process.exit, since `finally` blocks in ingestPass never run after + * process.exit fires from inside a signal handler. + * + * Resume semantics added for #1611: prior behavior unconditionally cleaned + * up the staging dir on SIGTERM, so the gbrain checkpoint always pointed at + * a missing dir and the next run had to restage from scratch. */ let _activeImportChild: ChildProcess | null = null; let _activeStagingDir: string | null = null; let _signalHandlersInstalled = false; + +/** + * Returns true if gbrain has written ~/.gbrain/import-checkpoint.json with + * `dir` matching the current active staging dir. Indicates the next run + * can resume against this staging dir. + */ +function stagingDirIsCheckpointed(stagingDir: string): boolean { + try { + const cpPath = join(homedir(), ".gbrain", "import-checkpoint.json"); + if (!existsSync(cpPath)) return false; + const raw = readFileSync(cpPath, "utf-8"); + const cp = JSON.parse(raw) as { dir?: string }; + return cp.dir === stagingDir; + } catch { + return false; + } +} + function installSignalForwarder(): void { if (_signalHandlersInstalled) return; _signalHandlersInstalled = true; @@ -1290,11 +1314,24 @@ function installSignalForwarder(): void { // child may have already exited between the alive-check and the kill } } - // Synchronously clean up the active staging dir before exiting. The async - // `finally` blocks in ingestPass never run after process.exit fires from - // inside this handler, so cleanup has to happen here. if (_activeStagingDir) { - cleanupStagingDir(_activeStagingDir); + if (stagingDirIsCheckpointed(_activeStagingDir)) { + // Preserve for next-run resume. The orchestrator's decideResume() + // (in gstack-gbrain-sync.ts) will see the checkpoint + dir and + // re-invoke gbrain import against this same staging dir, picking + // up from processedIndex+1. See #1611. + try { + process.stderr.write( + `[memory-ingest] ${signal} received — preserving staging dir for resume: ${_activeStagingDir}\n`, + ); + } catch { + // best-effort: stderr may be closed already + } + } else { + // No checkpoint pointing here — the import never reached gbrain or + // crashed before writing one. Clean up so we don't leak the dir. + cleanupStagingDir(_activeStagingDir); + } _activeStagingDir = null; } // Re-raise to default action so the parent actually exits. Without this, @@ -1444,19 +1481,46 @@ async function ingestPass(args: CliArgs): Promise { // entirely. gstack-brain-sync push will pick the dir up via its allowlist // and the brain admin's pull job will index transcripts into the remote // brain. Local PGLite (if any) stays code-only. + // + // Resume branch for #1611: when the orchestrator sets + // GSTACK_INGEST_RESUME_DIR (because gbrain's import-checkpoint.json points + // at an existing dir from a prior SIGTERM'd run), reuse that staging dir + // and skip the prepare/writeStaged phase entirely. gbrain's checkpoint + // tells it where to resume. const remoteHttpMode = isRemoteHttpMcpMode(); - const stagingDir = remoteHttpMode - ? makePersistentTranscriptDir() - : makeStagingDir(); + const resumeDir = process.env.GSTACK_INGEST_RESUME_DIR; + const resuming = !remoteHttpMode + && typeof resumeDir === "string" + && resumeDir.length > 0 + && existsSync(resumeDir); + const stagingDir = resuming + ? resumeDir! + : remoteHttpMode + ? makePersistentTranscriptDir() + : makeStagingDir(); // Register staging dir with the signal forwarder so SIGTERM/SIGINT can - // synchronously clean it up before process.exit (the async finally block - // below does NOT run after a signal-handler exit). In remote-http mode we - // skip registration — the dir is meant to persist. + // either preserve (when gbrain checkpointed it) or synchronously clean up. + // The async finally block below does NOT run after a signal-handler exit. + // In remote-http mode we skip registration — the dir is meant to persist. if (!remoteHttpMode) { _activeStagingDir = stagingDir; } try { - const staging = writeStaged(prep.prepared, stagingDir); + let staging: StagingResult; + if (resuming) { + // Pages are already on disk from the previous run. Skip writeStaged. + // The "written" count for the verdict reflects what's on disk now; + // gbrain's import will skip already-completed entries via its own + // checkpoint (processedIndex+1). + if (!args.quiet) { + console.error( + `[memory-ingest] resuming previous staging dir ${stagingDir} (skipping prepare phase)`, + ); + } + staging = { staging_dir: stagingDir, written: prep.prepared.length, errors: [], stagedPathToSource: new Map() }; + } else { + staging = writeStaged(prep.prepared, stagingDir); + } failed += staging.errors.length; if (!args.quiet && staging.errors.length > 0) { for (const e of staging.errors.slice(0, 5)) {