mirror of
https://github.com/garrytan/gstack.git
synced 2026-06-23 18:20:00 +02:00
feat(brain): concurrent-refresh lockfile dedup (T15 / D3)
When autoplan dispatches 4 planning skills back-to-back and they all hit a cold-miss on the same digest, only ONE actually fetches from the brain. The rest dedup via the project-scoped lockfile at ~/.gstack/projects/<slug>/brain-cache/.refresh.lock. Reuses the 5-min stale-takeover convention from /sync-gbrain. Lock is taken over when: - File is older than CACHE_REFRESH_LOCK_TIMEOUT_MS - PID is on the same host and dead (process.kill(pid, 0) fails) - Lock file is corrupt (defensive) withRefreshLock(projectSlug, fn) returns either the callback's value or the literal 'dedup'. The CLI emits exit code 3 + diagnostic stderr on dedup, so callers can choose to wait + retry (resolver does this) or fall through to stale-but-usable behavior. test/cache-concurrent-refresh.test.ts: 7 tests covering acquire/release, stale-takeover, dead-PID takeover, corrupt-lock recovery, error-path release, and cross-project lock location. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
+121
-8
@@ -19,12 +19,13 @@
|
||||
* unreachable. Concurrent-refresh dedup is a follow-up commit (T15).
|
||||
*/
|
||||
|
||||
import { existsSync, mkdirSync, readFileSync, writeFileSync, renameSync, statSync, unlinkSync, readdirSync } from 'fs';
|
||||
import { existsSync, mkdirSync, readFileSync, writeFileSync, renameSync, statSync, unlinkSync, readdirSync, openSync, closeSync } from 'fs';
|
||||
import { join, dirname } from 'path';
|
||||
import { homedir } from 'os';
|
||||
import { homedir, hostname } from 'os';
|
||||
import { execGbrainJson, spawnGbrain } from '../lib/gbrain-exec';
|
||||
import {
|
||||
BRAIN_CACHE_ENTITIES,
|
||||
CACHE_REFRESH_LOCK_TIMEOUT_MS,
|
||||
GSTACK_SCHEMA_PACK_NAME,
|
||||
GSTACK_SCHEMA_PACK_VERSION,
|
||||
type BrainCacheEntity,
|
||||
@@ -215,6 +216,107 @@ export function cmdGet(entityName: string, projectSlug: string | null): GetResul
|
||||
// Subcommand: refresh
|
||||
// ──────────────────────────────────────────────────────────────────────────
|
||||
|
||||
// ──────────────────────────────────────────────────────────────────────────
|
||||
// Lockfile dedup (T15 / D3)
|
||||
// ──────────────────────────────────────────────────────────────────────────
|
||||
|
||||
/**
|
||||
* Returns the lock file path for a project scope. Cross-project entities
|
||||
* still lock per-project (the project triggering the refresh holds the lock);
|
||||
* concurrent attempts from different projects on cross-project entities
|
||||
* serialize naturally because they're rare and the lock window is short.
|
||||
*/
|
||||
function lockPath(projectSlug: string | null): string {
|
||||
const dir = projectSlug
|
||||
? join(GSTACK_HOME, 'projects', projectSlug, 'brain-cache')
|
||||
: join(GSTACK_HOME, 'brain-cache');
|
||||
return join(dir, '.refresh.lock');
|
||||
}
|
||||
|
||||
interface LockHandle {
|
||||
fd: number;
|
||||
path: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to acquire the refresh lock. Returns null when another process holds it
|
||||
* (and the lock is fresh). Stale locks (process dead OR older than the
|
||||
* timeout) are taken over.
|
||||
*/
|
||||
function tryAcquireLock(projectSlug: string | null): LockHandle | null {
|
||||
const path = lockPath(projectSlug);
|
||||
mkdirSync(dirname(path), { recursive: true });
|
||||
|
||||
// If a lock exists, see if it's stale
|
||||
if (existsSync(path)) {
|
||||
try {
|
||||
const raw = readFileSync(path, 'utf-8');
|
||||
const lock = JSON.parse(raw) as { pid: number; host: string; ts: number };
|
||||
const age = Date.now() - lock.ts;
|
||||
const sameHost = lock.host === hostname();
|
||||
const processGone = sameHost && lock.pid > 0 && !isPidAlive(lock.pid);
|
||||
if (age <= CACHE_REFRESH_LOCK_TIMEOUT_MS && !processGone) {
|
||||
return null; // someone else holds a fresh lock
|
||||
}
|
||||
// Stale: take over
|
||||
} catch {
|
||||
// Corrupt lock file → take over
|
||||
}
|
||||
}
|
||||
|
||||
// Write our lock (best-effort O_EXCL via tmp+rename for atomic creation)
|
||||
const payload = JSON.stringify({ pid: process.pid, host: hostname(), ts: Date.now() });
|
||||
const tmp = `${path}.tmp.${process.pid}.${Date.now()}`;
|
||||
try {
|
||||
writeFileSync(tmp, payload);
|
||||
renameSync(tmp, path);
|
||||
} catch (err) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// Race: another process may have raced us. Re-read and verify ownership.
|
||||
try {
|
||||
const raw = readFileSync(path, 'utf-8');
|
||||
const lock = JSON.parse(raw) as { pid: number; host: string };
|
||||
if (lock.pid !== process.pid || lock.host !== hostname()) {
|
||||
return null;
|
||||
}
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
return { fd: -1, path };
|
||||
}
|
||||
|
||||
function releaseLock(handle: LockHandle): void {
|
||||
try { unlinkSync(handle.path); } catch { /* best effort */ }
|
||||
}
|
||||
|
||||
function isPidAlive(pid: number): boolean {
|
||||
try {
|
||||
process.kill(pid, 0);
|
||||
return true;
|
||||
} catch (err: any) {
|
||||
if (err?.code === 'EPERM') return true; // exists but we don't own it
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Run a refresh callback under the project-scoped lock. If another refresh is
|
||||
* already in flight, returns 'dedup' and the caller can either wait + retry
|
||||
* (the resolver does this) or fall through to stale-but-usable. Stale locks
|
||||
* (process dead, or older than CACHE_REFRESH_LOCK_TIMEOUT_MS) are taken over.
|
||||
*/
|
||||
export function withRefreshLock<T>(projectSlug: string | null, fn: () => T): T | 'dedup' {
|
||||
const handle = tryAcquireLock(projectSlug);
|
||||
if (!handle) return 'dedup';
|
||||
try {
|
||||
return fn();
|
||||
} finally {
|
||||
releaseLock(handle);
|
||||
}
|
||||
}
|
||||
|
||||
/** Refreshes one entity from the brain. Returns true on success. */
|
||||
export function refreshEntity(entityName: string, projectSlug: string | null): boolean {
|
||||
const entity = BRAIN_CACHE_ENTITIES[entityName];
|
||||
@@ -532,14 +634,25 @@ async function main(): Promise<number> {
|
||||
return 0;
|
||||
}
|
||||
case 'refresh': {
|
||||
// D3: dedup concurrent refreshes via lockfile. Skipped (dedup) when
|
||||
// another process is already mid-refresh on the same project.
|
||||
if (flags.entity) {
|
||||
const ok = refreshEntity(String(flags.entity), projectSlug);
|
||||
process.stdout.write(ok ? `refreshed ${flags.entity}\n` : `failed to refresh ${flags.entity}\n`);
|
||||
return ok ? 0 : 1;
|
||||
const entityName = String(flags.entity);
|
||||
const result = withRefreshLock(projectSlug, () => refreshEntity(entityName, projectSlug));
|
||||
if (result === 'dedup') {
|
||||
process.stderr.write(`(dedup: another refresh in flight)\n`);
|
||||
return 3;
|
||||
}
|
||||
process.stdout.write(result ? `refreshed ${entityName}\n` : `failed to refresh ${entityName}\n`);
|
||||
return result ? 0 : 1;
|
||||
}
|
||||
const { success, failed } = refreshAll(projectSlug);
|
||||
process.stdout.write(`refreshed=${success} failed=${failed}\n`);
|
||||
return failed > 0 ? 1 : 0;
|
||||
const allResult = withRefreshLock(projectSlug, () => refreshAll(projectSlug));
|
||||
if (allResult === 'dedup') {
|
||||
process.stderr.write(`(dedup: another refresh in flight)\n`);
|
||||
return 3;
|
||||
}
|
||||
process.stdout.write(`refreshed=${allResult.success} failed=${allResult.failed}\n`);
|
||||
return allResult.failed > 0 ? 1 : 0;
|
||||
}
|
||||
case 'invalidate': {
|
||||
const entityName = positional[0];
|
||||
|
||||
Reference in New Issue
Block a user