fix(gbrain-sync): configurable timeouts + resume from gbrain checkpoint (#1611)

The memory and code stages hardcoded a 35-min spawn timeout. On brains with
~2000+ staged files, /sync-gbrain --full reliably SIGTERM'd the child at
exactly 35 minutes with exit 143. gbrain left ~/.gbrain/import-checkpoint.json
pointing at the staging dir, but gstack-memory-ingest's SIGTERM handler
unconditionally cleaned the dir up — so the next run found a checkpoint
pointing at nothing and restaged from scratch, repeating the SIGTERM forever.

Three changes:

1. Configurable timeouts via env (bounds 60_000ms - 86_400_000ms, default
   2_100_000ms = 35min unchanged):
     GSTACK_SYNC_MEMORY_TIMEOUT_MS
     GSTACK_SYNC_CODE_TIMEOUT_MS
   Out-of-range or non-numeric values warn and fall back to the default.

2. SIGTERM in gstack-memory-ingest no longer always cleans up the staging
   dir. If gbrain has written ~/.gbrain/import-checkpoint.json pointing at
   the active staging dir, the dir is PRESERVED for next-run resume.
   Otherwise (no checkpoint pointing here, crash before gbrain ever
   touched it) it's cleaned up as before.

3. Next /sync-gbrain run detects gbrain's checkpoint via decideResume() in
   gstack-gbrain-sync.ts:
     - no checkpoint               → fresh ingest pass
     - checkpoint + staging ok     → set GSTACK_INGEST_RESUME_DIR; child
                                      reuses staging dir and skips
                                      writeStaged; gbrain import resumes
                                      from processedIndex+1
     - checkpoint + staging gone   → warn "previous checkpoint stale
                                      (staging dir gone), restaging from
                                      scratch" and proceed

Reuses gbrain's own checkpoint as the source of truth (D1 — no double-store
state). Detect-then-fallback semantics per C1.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Garry Tan
2026-05-21 09:44:10 -07:00
parent a05546cddc
commit 700c9a4ff8
2 changed files with 214 additions and 18 deletions
+136 -4
View File
@@ -80,6 +80,111 @@ const STATE_PATH = join(GSTACK_HOME, ".gbrain-sync-state.json");
const LOCK_PATH = join(GSTACK_HOME, ".sync-gbrain.lock");
const STALE_LOCK_MS = 5 * 60 * 1000;
// Default 35-minute timeout for code-walk + memory-ingest stages. Override via
// GSTACK_SYNC_CODE_TIMEOUT_MS / GSTACK_SYNC_MEMORY_TIMEOUT_MS. Bounds-checked
// in resolveStageTimeoutMs below so wildly-low values don't make resume
// useless and wildly-high values don't mask config typos. See #1611.
const DEFAULT_STAGE_TIMEOUT_MS = 35 * 60 * 1000; // 2_100_000ms = 35min
const MIN_STAGE_TIMEOUT_MS = 60_000; // 1 minute floor
const MAX_STAGE_TIMEOUT_MS = 86_400_000; // 24 hour ceiling
/**
* Parse a stage-timeout env value with bounds validation. Returns the bounded
* value or the default with a stderr warning if the env was malformed or
* out-of-range. Exported for the regression test.
*/
export function resolveStageTimeoutMs(
envValue: string | undefined,
envName: string,
): number {
if (envValue === undefined || envValue === "") return DEFAULT_STAGE_TIMEOUT_MS;
const n = Number.parseInt(envValue, 10);
if (!Number.isFinite(n) || Number.isNaN(n) || n <= 0) {
console.warn(
`[sync] ${envName}="${envValue}" is not a positive integer; falling back to ${DEFAULT_STAGE_TIMEOUT_MS}ms`,
);
return DEFAULT_STAGE_TIMEOUT_MS;
}
if (n < MIN_STAGE_TIMEOUT_MS) {
console.warn(
`[sync] ${envName}=${n} is below the ${MIN_STAGE_TIMEOUT_MS}ms (1min) floor; falling back to ${DEFAULT_STAGE_TIMEOUT_MS}ms`,
);
return DEFAULT_STAGE_TIMEOUT_MS;
}
if (n > MAX_STAGE_TIMEOUT_MS) {
console.warn(
`[sync] ${envName}=${n} is above the ${MAX_STAGE_TIMEOUT_MS}ms (24h) ceiling; falling back to ${DEFAULT_STAGE_TIMEOUT_MS}ms`,
);
return DEFAULT_STAGE_TIMEOUT_MS;
}
return n;
}
/**
* gbrain writes ~/.gbrain/import-checkpoint.json on every import run. If a
* previous /sync-gbrain hit the timeout (SIGTERM = exit 143), the checkpoint
* + its staging dir survive on disk. Detect both and let gbrain resume from
* processedIndex+1 on the next run. If the staging dir is missing/empty/
* unreadable, fall through to a fresh restage with a one-line warning so the
* user sees we noticed. See #1611 + plan D1/C1.
*/
interface GbrainCheckpoint {
dir?: string;
totalFiles?: number;
processedIndex?: number;
completedFiles?: number;
timestamp?: string;
}
export function readGbrainCheckpoint(): GbrainCheckpoint | null {
const cpPath = join(homedir(), ".gbrain", "import-checkpoint.json");
if (!existsSync(cpPath)) return null;
try {
const raw = readFileSync(cpPath, "utf-8");
const parsed = JSON.parse(raw);
if (!parsed || typeof parsed !== "object") return null;
return parsed as GbrainCheckpoint;
} catch {
// Corrupt JSON — treat as no checkpoint and fall through to fresh restage.
return null;
}
}
export type ResumeVerdict =
| { kind: "no-checkpoint" }
| { kind: "resume"; stagingDir: string; processedIndex: number; totalFiles: number }
| { kind: "stale-staging-missing"; stagingDir: string };
/**
* Decide whether the next memory-ingest run should resume from gbrain's
* checkpoint or restage from scratch.
* - no checkpoint → run a fresh ingest pass
* - checkpoint + staging ok → resume (gbrain picks up at processedIndex+1)
* - checkpoint + staging gone → warn, fall through to fresh restage
*/
export function decideResume(): ResumeVerdict {
const cp = readGbrainCheckpoint();
if (!cp || !cp.dir) return { kind: "no-checkpoint" };
const stagingDir = cp.dir;
if (!existsSync(stagingDir)) {
return { kind: "stale-staging-missing", stagingDir };
}
// Treat "non-empty" as the safe-to-resume signal. statSync on a missing
// file throws; we already handled missing above so this is dir-level shape.
try {
const st = statSync(stagingDir);
if (!st.isDirectory()) return { kind: "stale-staging-missing", stagingDir };
} catch {
return { kind: "stale-staging-missing", stagingDir };
}
return {
kind: "resume",
stagingDir,
processedIndex: cp.processedIndex ?? 0,
totalFiles: cp.totalFiles ?? 0,
};
}
// ── CLI ────────────────────────────────────────────────────────────────────
function printUsage(): void {
@@ -607,9 +712,13 @@ async function runCodeImport(args: CliArgs): Promise<StageResult> {
// code`, so --full must run it FIRST, then reindex-code, to honor the
// documented "full walk + reindex" contract for both fresh and populated
// sources.
const codeTimeoutMs = resolveStageTimeoutMs(
process.env.GSTACK_SYNC_CODE_TIMEOUT_MS,
"GSTACK_SYNC_CODE_TIMEOUT_MS",
);
const walkResult = spawnGbrain(["sync", "--strategy", "code", "--source", sourceId], {
stdio: args.quiet ? ["ignore", "ignore", "ignore"] : ["ignore", "inherit", "inherit"],
timeout: 35 * 60 * 1000,
timeout: codeTimeoutMs,
baseEnv: gbrainEnv,
});
@@ -627,7 +736,7 @@ async function runCodeImport(args: CliArgs): Promise<StageResult> {
if (args.mode === "full") {
const reindexResult = spawnGbrain(["reindex-code", "--source", sourceId, "--yes"], {
stdio: args.quiet ? ["ignore", "ignore", "ignore"] : ["ignore", "inherit", "inherit"],
timeout: 35 * 60 * 1000,
timeout: codeTimeoutMs,
baseEnv: gbrainEnv,
});
@@ -770,6 +879,25 @@ function runMemoryIngest(args: CliArgs): StageResult {
return skipStageForLocalStatus("memory", localStatus, t0);
}
// Resume detection (#1611 / plan D1 + C1). If a previous run hit the
// timeout and gbrain left ~/.gbrain/import-checkpoint.json plus its staging
// dir on disk, signal the grandchild via env so it skips the prepare phase
// and lets `gbrain import` resume from processedIndex+1 against the same
// staging dir. If the staging dir is gone (disk pressure cleanup, OS
// reboot, user manual cleanup), warn and fall through to a fresh restage.
const resume = decideResume();
const childEnv = buildGbrainEnv({ announce: false });
if (resume.kind === "resume") {
console.error(
`[sync:memory] resuming from gbrain checkpoint (${resume.processedIndex}/${resume.totalFiles} files staged at ${resume.stagingDir})`,
);
childEnv.GSTACK_INGEST_RESUME_DIR = resume.stagingDir;
} else if (resume.kind === "stale-staging-missing") {
console.error(
`[sync:memory] previous checkpoint stale (staging dir ${resume.stagingDir} gone), restaging from scratch`,
);
}
const ingestPath = join(import.meta.dir, "gstack-memory-ingest.ts");
const ingestArgs = ["run", ingestPath];
if (args.mode === "full") ingestArgs.push("--bulk");
@@ -780,10 +908,14 @@ function runMemoryIngest(args: CliArgs): StageResult {
// .env.local footgun affects gstack-memory-ingest.ts too, not just the
// direct gbrain spawns in this file). The grandchild calls gbrain import
// internally and must see the DATABASE_URL from gbrain's own config.
const memoryTimeoutMs = resolveStageTimeoutMs(
process.env.GSTACK_SYNC_MEMORY_TIMEOUT_MS,
"GSTACK_SYNC_MEMORY_TIMEOUT_MS",
);
const result = spawnSync("bun", ingestArgs, {
encoding: "utf-8",
timeout: 35 * 60 * 1000,
env: buildGbrainEnv({ announce: false }),
timeout: memoryTimeoutMs,
env: childEnv,
});
// D6: parse [memory-ingest] lines from the child's stderr. ERR-prefixed
+78 -14
View File
@@ -1272,13 +1272,37 @@ function cleanupStagingDir(dir: string): void {
* 1. forward the signal to the child (otherwise gbrain orphans, holds the
* PGLite write lock, and burns CPU — observed during 2026-05-10 cold-run
* testing)
* 2. synchronously clean up the staging dir BEFORE process.exit (otherwise
* finally blocks in async callers don't run after process.exit from
* inside a signal handler, leaking the staging dir on every interrupt)
* 2. PRESERVE the staging dir when gbrain has written an import-checkpoint
* pointing at it (the next /sync-gbrain run can resume from
* processedIndex+1). Otherwise synchronously clean up before
* process.exit, since `finally` blocks in ingestPass never run after
* process.exit fires from inside a signal handler.
*
* Resume semantics added for #1611: prior behavior unconditionally cleaned
* up the staging dir on SIGTERM, so the gbrain checkpoint always pointed at
* a missing dir and the next run had to restage from scratch.
*/
let _activeImportChild: ChildProcess | null = null;
let _activeStagingDir: string | null = null;
let _signalHandlersInstalled = false;
/**
* Returns true if gbrain has written ~/.gbrain/import-checkpoint.json with
* `dir` matching the current active staging dir. Indicates the next run
* can resume against this staging dir.
*/
function stagingDirIsCheckpointed(stagingDir: string): boolean {
try {
const cpPath = join(homedir(), ".gbrain", "import-checkpoint.json");
if (!existsSync(cpPath)) return false;
const raw = readFileSync(cpPath, "utf-8");
const cp = JSON.parse(raw) as { dir?: string };
return cp.dir === stagingDir;
} catch {
return false;
}
}
function installSignalForwarder(): void {
if (_signalHandlersInstalled) return;
_signalHandlersInstalled = true;
@@ -1290,11 +1314,24 @@ function installSignalForwarder(): void {
// child may have already exited between the alive-check and the kill
}
}
// Synchronously clean up the active staging dir before exiting. The async
// `finally` blocks in ingestPass never run after process.exit fires from
// inside this handler, so cleanup has to happen here.
if (_activeStagingDir) {
cleanupStagingDir(_activeStagingDir);
if (stagingDirIsCheckpointed(_activeStagingDir)) {
// Preserve for next-run resume. The orchestrator's decideResume()
// (in gstack-gbrain-sync.ts) will see the checkpoint + dir and
// re-invoke gbrain import against this same staging dir, picking
// up from processedIndex+1. See #1611.
try {
process.stderr.write(
`[memory-ingest] ${signal} received — preserving staging dir for resume: ${_activeStagingDir}\n`,
);
} catch {
// best-effort: stderr may be closed already
}
} else {
// No checkpoint pointing here — the import never reached gbrain or
// crashed before writing one. Clean up so we don't leak the dir.
cleanupStagingDir(_activeStagingDir);
}
_activeStagingDir = null;
}
// Re-raise to default action so the parent actually exits. Without this,
@@ -1444,19 +1481,46 @@ async function ingestPass(args: CliArgs): Promise<BulkResult> {
// entirely. gstack-brain-sync push will pick the dir up via its allowlist
// and the brain admin's pull job will index transcripts into the remote
// brain. Local PGLite (if any) stays code-only.
//
// Resume branch for #1611: when the orchestrator sets
// GSTACK_INGEST_RESUME_DIR (because gbrain's import-checkpoint.json points
// at an existing dir from a prior SIGTERM'd run), reuse that staging dir
// and skip the prepare/writeStaged phase entirely. gbrain's checkpoint
// tells it where to resume.
const remoteHttpMode = isRemoteHttpMcpMode();
const stagingDir = remoteHttpMode
? makePersistentTranscriptDir()
: makeStagingDir();
const resumeDir = process.env.GSTACK_INGEST_RESUME_DIR;
const resuming = !remoteHttpMode
&& typeof resumeDir === "string"
&& resumeDir.length > 0
&& existsSync(resumeDir);
const stagingDir = resuming
? resumeDir!
: remoteHttpMode
? makePersistentTranscriptDir()
: makeStagingDir();
// Register staging dir with the signal forwarder so SIGTERM/SIGINT can
// synchronously clean it up before process.exit (the async finally block
// below does NOT run after a signal-handler exit). In remote-http mode we
// skip registration — the dir is meant to persist.
// either preserve (when gbrain checkpointed it) or synchronously clean up.
// The async finally block below does NOT run after a signal-handler exit.
// In remote-http mode we skip registration — the dir is meant to persist.
if (!remoteHttpMode) {
_activeStagingDir = stagingDir;
}
try {
const staging = writeStaged(prep.prepared, stagingDir);
let staging: StagingResult;
if (resuming) {
// Pages are already on disk from the previous run. Skip writeStaged.
// The "written" count for the verdict reflects what's on disk now;
// gbrain's import will skip already-completed entries via its own
// checkpoint (processedIndex+1).
if (!args.quiet) {
console.error(
`[memory-ingest] resuming previous staging dir ${stagingDir} (skipping prepare phase)`,
);
}
staging = { staging_dir: stagingDir, written: prep.prepared.length, errors: [], stagedPathToSource: new Map() };
} else {
staging = writeStaged(prep.prepared, stagingDir);
}
failed += staging.errors.length;
if (!args.quiet && staging.errors.length > 0) {
for (const e of staging.errors.slice(0, 5)) {