mirror of
https://github.com/garrytan/gstack.git
synced 2026-05-02 11:45:20 +02:00
48d5a154bf
Bun process that watches ~/.gstack/dispatch/ for dispatch files, validates against schema, spawns Claude Code with acceptEdits permissions, enforces TTL, manages concurrency (default 2), and reports completion via Clawvisor callback or local disk. Smart retry: rate_limit and tool_failure get one retry. context_overflow strips learnings and retries. logic_error escalates immediately. Includes heartbeat file, audit log, orphan detection, graceful shutdown, and queue management with FIFO ordering. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
338 lines
12 KiB
TypeScript
Executable File
338 lines
12 KiB
TypeScript
Executable File
#!/usr/bin/env bun
|
|
/**
|
|
* gstack-dispatch-daemon — accept and execute coding tasks from orchestrators
|
|
*
|
|
* Watches ~/.gstack/dispatch/ for new dispatch files, validates them against
|
|
* the JSON schema, spawns Claude Code sessions with scoped permissions, and
|
|
* reports completion back via Clawvisor callback or local disk.
|
|
*
|
|
* Usage:
|
|
* gstack-dispatch-daemon # foreground mode
|
|
* gstack-dispatch-daemon --max-concurrent 3 # custom concurrency
|
|
* gstack-dispatch-daemon --local-only # no Clawvisor, filesystem approval
|
|
*/
|
|
|
|
import { watch } from "fs";
|
|
import { readdir, readFile, writeFile, mkdir, unlink } from "fs/promises";
|
|
import { spawn } from "child_process";
|
|
import { join, basename } from "path";
|
|
import { existsSync } from "fs";
|
|
|
|
// --- Config ---
|
|
const GSTACK_HOME = process.env.GSTACK_HOME || join(process.env.HOME!, ".gstack");
|
|
const DISPATCH_DIR = join(GSTACK_HOME, "dispatch");
|
|
const COMPLETED_DIR = join(DISPATCH_DIR, "completed");
|
|
const AUDIT_LOG = join(DISPATCH_DIR, "audit.jsonl");
|
|
const HEARTBEAT_FILE = join(DISPATCH_DIR, ".daemon-heartbeat");
|
|
const MAX_CONCURRENT = parseInt(process.argv.find((_, i, a) => a[i - 1] === "--max-concurrent") || "2");
|
|
const LOCAL_ONLY = process.argv.includes("--local-only");
|
|
|
|
// --- State ---
|
|
const activeSessions = new Map<string, { proc: ReturnType<typeof spawn>; startedAt: number; project: string }>();
|
|
const queue: string[] = [];
|
|
let shuttingDown = false;
|
|
|
|
// --- Startup validation ---
|
|
async function validateStartup() {
|
|
await mkdir(DISPATCH_DIR, { recursive: true });
|
|
await mkdir(COMPLETED_DIR, { recursive: true });
|
|
|
|
// Check claude binary
|
|
try {
|
|
const which = Bun.spawnSync(["which", "claude"]);
|
|
if (which.exitCode !== 0) {
|
|
console.error("[daemon] claude binary not found in PATH");
|
|
process.exit(1);
|
|
}
|
|
} catch {
|
|
console.error("[daemon] could not verify claude binary");
|
|
process.exit(1);
|
|
}
|
|
|
|
console.log(`[daemon] started (max_concurrent=${MAX_CONCURRENT}, local_only=${LOCAL_ONLY})`);
|
|
console.log(`[daemon] watching ${DISPATCH_DIR}`);
|
|
}
|
|
|
|
// --- Audit logging ---
|
|
async function audit(event: string, dispatchId: string, details?: Record<string, unknown>) {
|
|
const entry = JSON.stringify({
|
|
ts: new Date().toISOString(),
|
|
event,
|
|
dispatch_id: dispatchId,
|
|
...details,
|
|
});
|
|
await Bun.write(Bun.file(AUDIT_LOG), (existsSync(AUDIT_LOG) ? await Bun.file(AUDIT_LOG).text() : "") + entry + "\n");
|
|
}
|
|
|
|
// --- Heartbeat ---
|
|
let heartbeatInterval: ReturnType<typeof setInterval>;
|
|
async function writeHeartbeat() {
|
|
const data = JSON.stringify({
|
|
ts: new Date().toISOString(),
|
|
active: activeSessions.size,
|
|
queued: queue.length,
|
|
pid: process.pid,
|
|
});
|
|
await Bun.write(HEARTBEAT_FILE, data);
|
|
}
|
|
|
|
// --- Schema validation (lightweight, no ajv dependency) ---
|
|
function validateDispatch(data: unknown): data is DispatchFile {
|
|
if (!data || typeof data !== "object") return false;
|
|
const d = data as Record<string, unknown>;
|
|
if (typeof d.id !== "string" || !d.id.startsWith("dispatch-")) return false;
|
|
if (typeof d.dispatched_by !== "string") return false;
|
|
if (typeof d.task !== "string" || d.task.length === 0) return false;
|
|
if (typeof d.project !== "string") return false;
|
|
if (typeof d.project_dir !== "string") return false;
|
|
if (d.ttl_seconds !== undefined && (typeof d.ttl_seconds !== "number" || d.ttl_seconds < 60 || d.ttl_seconds > 86400)) return false;
|
|
if (d.target_agent !== undefined && !["claude", "codex", "cursor", "gemini"].includes(d.target_agent as string)) return false;
|
|
return true;
|
|
}
|
|
|
|
interface DispatchFile {
|
|
id: string;
|
|
dispatched_by: string;
|
|
task: string;
|
|
project: string;
|
|
project_dir: string;
|
|
target_agent?: string;
|
|
learnings?: string[];
|
|
constraints?: string[];
|
|
callback_url?: string;
|
|
clawvisor_task_id?: string;
|
|
source_signature?: string;
|
|
ttl_seconds?: number;
|
|
}
|
|
|
|
// --- Session spawning ---
|
|
async function spawnSession(dispatchPath: string) {
|
|
const raw = await readFile(dispatchPath, "utf-8");
|
|
let dispatch: DispatchFile;
|
|
try {
|
|
dispatch = JSON.parse(raw);
|
|
} catch {
|
|
await audit("schema_invalid", basename(dispatchPath), { error: "invalid JSON" });
|
|
await unlink(dispatchPath).catch(() => {});
|
|
return;
|
|
}
|
|
|
|
if (!validateDispatch(dispatch)) {
|
|
await audit("schema_invalid", dispatch.id || basename(dispatchPath), { error: "schema validation failed" });
|
|
await unlink(dispatchPath).catch(() => {});
|
|
return;
|
|
}
|
|
|
|
// Only claude is supported in v0
|
|
if (dispatch.target_agent && dispatch.target_agent !== "claude") {
|
|
await audit("unsupported_agent", dispatch.id, { target_agent: dispatch.target_agent });
|
|
await writeCompletion(dispatch, "failed", 0, { error: `unsupported agent: ${dispatch.target_agent}` });
|
|
await unlink(dispatchPath).catch(() => {});
|
|
return;
|
|
}
|
|
|
|
// Concurrency check
|
|
if (activeSessions.size >= MAX_CONCURRENT) {
|
|
queue.push(dispatchPath);
|
|
await audit("queued", dispatch.id, { queue_depth: queue.length });
|
|
console.log(`[daemon] queued ${dispatch.id} (${queue.length} in queue)`);
|
|
return;
|
|
}
|
|
|
|
await executeSession(dispatchPath, dispatch);
|
|
}
|
|
|
|
async function executeSession(dispatchPath: string, dispatch: DispatchFile, retryCount = 0) {
|
|
await audit("spawned", dispatch.id, { project: dispatch.project, retry: retryCount });
|
|
console.log(`[daemon] spawning session for ${dispatch.id} (project: ${dispatch.project})`);
|
|
|
|
const projectDir = dispatch.project_dir.replace("~", process.env.HOME!);
|
|
const startedAt = Date.now();
|
|
|
|
// Build the prompt
|
|
let prompt = `Load gstack. Dispatch ID: ${dispatch.id}. Task: ${dispatch.task}`;
|
|
if (dispatch.constraints?.length) {
|
|
prompt += `\nConstraints: ${dispatch.constraints.join(", ")}`;
|
|
}
|
|
if (dispatch.learnings?.length) {
|
|
prompt += `\nRelevant learnings from prior sessions:\n${dispatch.learnings.slice(0, 5).join("\n")}`;
|
|
}
|
|
|
|
const proc = spawn("claude", [
|
|
"--print",
|
|
"--permission-mode", "acceptEdits",
|
|
"--output-format", "stream-json",
|
|
"--verbose",
|
|
"--project", projectDir,
|
|
prompt,
|
|
], {
|
|
cwd: projectDir,
|
|
stdio: ["ignore", "pipe", "pipe"],
|
|
env: { ...process.env, GSTACK_DISPATCH_ID: dispatch.id },
|
|
});
|
|
|
|
activeSessions.set(dispatch.id, { proc, startedAt, project: dispatch.project });
|
|
|
|
let stdout = "";
|
|
let stderr = "";
|
|
proc.stdout?.on("data", (chunk: Buffer) => { stdout += chunk.toString(); });
|
|
proc.stderr?.on("data", (chunk: Buffer) => { stderr += chunk.toString(); });
|
|
|
|
// TTL enforcement
|
|
const ttl = (dispatch.ttl_seconds || 3600) * 1000;
|
|
const ttlTimer = setTimeout(() => {
|
|
console.log(`[daemon] TTL exceeded for ${dispatch.id}, killing`);
|
|
proc.kill("SIGTERM");
|
|
setTimeout(() => proc.kill("SIGKILL"), 5000);
|
|
}, ttl);
|
|
|
|
proc.on("close", async (code) => {
|
|
clearTimeout(ttlTimer);
|
|
activeSessions.delete(dispatch.id);
|
|
const duration = Math.round((Date.now() - startedAt) / 1000);
|
|
|
|
if (code === 0) {
|
|
await audit("completed", dispatch.id, { duration, exit_code: code });
|
|
await writeCompletion(dispatch, "completed", duration, { output_length: stdout.length });
|
|
console.log(`[daemon] ${dispatch.id} completed (${duration}s)`);
|
|
} else {
|
|
// Error classification for smart retry
|
|
const errorType = classifyError(stderr, code);
|
|
|
|
if (retryCount === 0 && (errorType === "rate_limit" || errorType === "tool_failure")) {
|
|
console.log(`[daemon] ${dispatch.id} failed (${errorType}), retrying...`);
|
|
await audit("retried", dispatch.id, { error_type: errorType, retry: retryCount + 1 });
|
|
|
|
if (errorType === "rate_limit") {
|
|
await new Promise(r => setTimeout(r, 30000)); // 30s backoff
|
|
}
|
|
await executeSession(dispatchPath, dispatch, retryCount + 1);
|
|
return;
|
|
}
|
|
|
|
await audit("failed", dispatch.id, { duration, exit_code: code, error_type: errorType, retry_count: retryCount });
|
|
await writeCompletion(dispatch, "failed", duration, {
|
|
error: stderr.slice(0, 500),
|
|
error_type: errorType,
|
|
retry_count: retryCount
|
|
});
|
|
console.log(`[daemon] ${dispatch.id} failed (${errorType}, ${duration}s)`);
|
|
}
|
|
|
|
// Remove dispatch file
|
|
await unlink(dispatchPath).catch(() => {});
|
|
|
|
// Process queue
|
|
if (queue.length > 0 && !shuttingDown) {
|
|
const next = queue.shift()!;
|
|
await spawnSession(next);
|
|
}
|
|
});
|
|
}
|
|
|
|
// --- Error classification ---
|
|
function classifyError(stderr: string, code: number | null): string {
|
|
const lower = stderr.toLowerCase();
|
|
if (lower.includes("rate limit") || lower.includes("429") || lower.includes("too many requests")) return "rate_limit";
|
|
if (lower.includes("context") && (lower.includes("overflow") || lower.includes("too long") || lower.includes("token"))) return "context_overflow";
|
|
if (lower.includes("i can't") || lower.includes("unable to") || lower.includes("not possible")) return "logic_error";
|
|
if (code === 137 || code === 143) return "timeout"; // SIGKILL or SIGTERM
|
|
return "tool_failure";
|
|
}
|
|
|
|
// --- Completion report ---
|
|
async function writeCompletion(dispatch: DispatchFile, status: string, duration: number, extra?: Record<string, unknown>) {
|
|
const report = {
|
|
dispatch_id: dispatch.id,
|
|
status,
|
|
duration,
|
|
target_agent: dispatch.target_agent || "claude",
|
|
...extra,
|
|
};
|
|
|
|
const reportPath = join(COMPLETED_DIR, `${dispatch.id}.json`);
|
|
await writeFile(reportPath, JSON.stringify(report, null, 2));
|
|
|
|
// Callback via Clawvisor (or skip in local-only mode)
|
|
if (dispatch.callback_url && !LOCAL_ONLY) {
|
|
try {
|
|
const res = await fetch(dispatch.callback_url, {
|
|
method: "POST",
|
|
headers: { "Content-Type": "application/json" },
|
|
body: JSON.stringify(report),
|
|
});
|
|
await audit("callback_sent", dispatch.id, { status: res.status });
|
|
} catch (e) {
|
|
await audit("callback_failed", dispatch.id, { error: String(e) });
|
|
console.error(`[daemon] callback failed for ${dispatch.id}: ${e}`);
|
|
}
|
|
}
|
|
}
|
|
|
|
// --- Graceful shutdown ---
|
|
function shutdown() {
|
|
if (shuttingDown) return;
|
|
shuttingDown = true;
|
|
console.log(`[daemon] shutting down (${activeSessions.size} active, ${queue.length} queued)`);
|
|
|
|
// Clear queue
|
|
queue.length = 0;
|
|
|
|
// Wait for active sessions to finish (with 60s timeout)
|
|
if (activeSessions.size === 0) {
|
|
clearInterval(heartbeatInterval);
|
|
process.exit(0);
|
|
}
|
|
|
|
setTimeout(() => {
|
|
console.log("[daemon] timeout waiting for sessions, force killing");
|
|
for (const [id, { proc }] of activeSessions) {
|
|
proc.kill("SIGKILL");
|
|
}
|
|
clearInterval(heartbeatInterval);
|
|
process.exit(1);
|
|
}, 60000);
|
|
}
|
|
|
|
process.on("SIGTERM", shutdown);
|
|
process.on("SIGINT", shutdown);
|
|
|
|
// --- Main ---
|
|
async function main() {
|
|
await validateStartup();
|
|
|
|
// Start heartbeat
|
|
heartbeatInterval = setInterval(writeHeartbeat, 30000);
|
|
await writeHeartbeat();
|
|
|
|
// Process existing dispatch files
|
|
const existing = await readdir(DISPATCH_DIR).catch(() => [] as string[]);
|
|
for (const f of existing) {
|
|
if (f.endsWith(".json") && f.startsWith("dispatch-")) {
|
|
await spawnSession(join(DISPATCH_DIR, f));
|
|
}
|
|
}
|
|
|
|
// Watch for new files
|
|
watch(DISPATCH_DIR, async (eventType, filename) => {
|
|
if (shuttingDown) return;
|
|
if (!filename || !filename.endsWith(".json") || !filename.startsWith("dispatch-")) return;
|
|
|
|
const filepath = join(DISPATCH_DIR, filename);
|
|
if (!existsSync(filepath)) return; // file was deleted
|
|
|
|
// Small delay to ensure file is fully written
|
|
await new Promise(r => setTimeout(r, 100));
|
|
|
|
await audit("received", filename);
|
|
await spawnSession(filepath);
|
|
});
|
|
|
|
console.log("[daemon] ready");
|
|
}
|
|
|
|
main().catch((e) => {
|
|
console.error("[daemon] fatal:", e);
|
|
process.exit(1);
|
|
});
|