mirror of
https://github.com/garrytan/gstack.git
synced 2026-05-05 05:05:08 +02:00
feat: gstack-dispatch-daemon — accept coding tasks from orchestrators
Bun process that watches ~/.gstack/dispatch/ for dispatch files, validates against schema, spawns Claude Code with acceptEdits permissions, enforces TTL, manages concurrency (default 2), and reports completion via Clawvisor callback or local disk. Smart retry: rate_limit and tool_failure get one retry. context_overflow strips learnings and retries. logic_error escalates immediately. Includes heartbeat file, audit log, orphan detection, graceful shutdown, and queue management with FIFO ordering. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Executable
+337
@@ -0,0 +1,337 @@
|
||||
#!/usr/bin/env bun
|
||||
/**
|
||||
* gstack-dispatch-daemon — accept and execute coding tasks from orchestrators
|
||||
*
|
||||
* Watches ~/.gstack/dispatch/ for new dispatch files, validates them against
|
||||
* the JSON schema, spawns Claude Code sessions with scoped permissions, and
|
||||
* reports completion back via Clawvisor callback or local disk.
|
||||
*
|
||||
* Usage:
|
||||
* gstack-dispatch-daemon # foreground mode
|
||||
* gstack-dispatch-daemon --max-concurrent 3 # custom concurrency
|
||||
* gstack-dispatch-daemon --local-only # no Clawvisor, filesystem approval
|
||||
*/
|
||||
|
||||
import { watch } from "fs";
|
||||
import { readdir, readFile, writeFile, mkdir, unlink } from "fs/promises";
|
||||
import { spawn } from "child_process";
|
||||
import { join, basename } from "path";
|
||||
import { existsSync } from "fs";
|
||||
|
||||
// --- Config ---
|
||||
const GSTACK_HOME = process.env.GSTACK_HOME || join(process.env.HOME!, ".gstack");
|
||||
const DISPATCH_DIR = join(GSTACK_HOME, "dispatch");
|
||||
const COMPLETED_DIR = join(DISPATCH_DIR, "completed");
|
||||
const AUDIT_LOG = join(DISPATCH_DIR, "audit.jsonl");
|
||||
const HEARTBEAT_FILE = join(DISPATCH_DIR, ".daemon-heartbeat");
|
||||
const MAX_CONCURRENT = parseInt(process.argv.find((_, i, a) => a[i - 1] === "--max-concurrent") || "2");
|
||||
const LOCAL_ONLY = process.argv.includes("--local-only");
|
||||
|
||||
// --- State ---
|
||||
const activeSessions = new Map<string, { proc: ReturnType<typeof spawn>; startedAt: number; project: string }>();
|
||||
const queue: string[] = [];
|
||||
let shuttingDown = false;
|
||||
|
||||
// --- Startup validation ---
|
||||
async function validateStartup() {
|
||||
await mkdir(DISPATCH_DIR, { recursive: true });
|
||||
await mkdir(COMPLETED_DIR, { recursive: true });
|
||||
|
||||
// Check claude binary
|
||||
try {
|
||||
const which = Bun.spawnSync(["which", "claude"]);
|
||||
if (which.exitCode !== 0) {
|
||||
console.error("[daemon] claude binary not found in PATH");
|
||||
process.exit(1);
|
||||
}
|
||||
} catch {
|
||||
console.error("[daemon] could not verify claude binary");
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
console.log(`[daemon] started (max_concurrent=${MAX_CONCURRENT}, local_only=${LOCAL_ONLY})`);
|
||||
console.log(`[daemon] watching ${DISPATCH_DIR}`);
|
||||
}
|
||||
|
||||
// --- Audit logging ---
|
||||
async function audit(event: string, dispatchId: string, details?: Record<string, unknown>) {
|
||||
const entry = JSON.stringify({
|
||||
ts: new Date().toISOString(),
|
||||
event,
|
||||
dispatch_id: dispatchId,
|
||||
...details,
|
||||
});
|
||||
await Bun.write(Bun.file(AUDIT_LOG), (existsSync(AUDIT_LOG) ? await Bun.file(AUDIT_LOG).text() : "") + entry + "\n");
|
||||
}
|
||||
|
||||
// --- Heartbeat ---
|
||||
let heartbeatInterval: ReturnType<typeof setInterval>;
|
||||
async function writeHeartbeat() {
|
||||
const data = JSON.stringify({
|
||||
ts: new Date().toISOString(),
|
||||
active: activeSessions.size,
|
||||
queued: queue.length,
|
||||
pid: process.pid,
|
||||
});
|
||||
await Bun.write(HEARTBEAT_FILE, data);
|
||||
}
|
||||
|
||||
// --- Schema validation (lightweight, no ajv dependency) ---
|
||||
function validateDispatch(data: unknown): data is DispatchFile {
|
||||
if (!data || typeof data !== "object") return false;
|
||||
const d = data as Record<string, unknown>;
|
||||
if (typeof d.id !== "string" || !d.id.startsWith("dispatch-")) return false;
|
||||
if (typeof d.dispatched_by !== "string") return false;
|
||||
if (typeof d.task !== "string" || d.task.length === 0) return false;
|
||||
if (typeof d.project !== "string") return false;
|
||||
if (typeof d.project_dir !== "string") return false;
|
||||
if (d.ttl_seconds !== undefined && (typeof d.ttl_seconds !== "number" || d.ttl_seconds < 60 || d.ttl_seconds > 86400)) return false;
|
||||
if (d.target_agent !== undefined && !["claude", "codex", "cursor", "gemini"].includes(d.target_agent as string)) return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
interface DispatchFile {
|
||||
id: string;
|
||||
dispatched_by: string;
|
||||
task: string;
|
||||
project: string;
|
||||
project_dir: string;
|
||||
target_agent?: string;
|
||||
learnings?: string[];
|
||||
constraints?: string[];
|
||||
callback_url?: string;
|
||||
clawvisor_task_id?: string;
|
||||
source_signature?: string;
|
||||
ttl_seconds?: number;
|
||||
}
|
||||
|
||||
// --- Session spawning ---
|
||||
async function spawnSession(dispatchPath: string) {
|
||||
const raw = await readFile(dispatchPath, "utf-8");
|
||||
let dispatch: DispatchFile;
|
||||
try {
|
||||
dispatch = JSON.parse(raw);
|
||||
} catch {
|
||||
await audit("schema_invalid", basename(dispatchPath), { error: "invalid JSON" });
|
||||
await unlink(dispatchPath).catch(() => {});
|
||||
return;
|
||||
}
|
||||
|
||||
if (!validateDispatch(dispatch)) {
|
||||
await audit("schema_invalid", dispatch.id || basename(dispatchPath), { error: "schema validation failed" });
|
||||
await unlink(dispatchPath).catch(() => {});
|
||||
return;
|
||||
}
|
||||
|
||||
// Only claude is supported in v0
|
||||
if (dispatch.target_agent && dispatch.target_agent !== "claude") {
|
||||
await audit("unsupported_agent", dispatch.id, { target_agent: dispatch.target_agent });
|
||||
await writeCompletion(dispatch, "failed", 0, { error: `unsupported agent: ${dispatch.target_agent}` });
|
||||
await unlink(dispatchPath).catch(() => {});
|
||||
return;
|
||||
}
|
||||
|
||||
// Concurrency check
|
||||
if (activeSessions.size >= MAX_CONCURRENT) {
|
||||
queue.push(dispatchPath);
|
||||
await audit("queued", dispatch.id, { queue_depth: queue.length });
|
||||
console.log(`[daemon] queued ${dispatch.id} (${queue.length} in queue)`);
|
||||
return;
|
||||
}
|
||||
|
||||
await executeSession(dispatchPath, dispatch);
|
||||
}
|
||||
|
||||
async function executeSession(dispatchPath: string, dispatch: DispatchFile, retryCount = 0) {
|
||||
await audit("spawned", dispatch.id, { project: dispatch.project, retry: retryCount });
|
||||
console.log(`[daemon] spawning session for ${dispatch.id} (project: ${dispatch.project})`);
|
||||
|
||||
const projectDir = dispatch.project_dir.replace("~", process.env.HOME!);
|
||||
const startedAt = Date.now();
|
||||
|
||||
// Build the prompt
|
||||
let prompt = `Load gstack. Dispatch ID: ${dispatch.id}. Task: ${dispatch.task}`;
|
||||
if (dispatch.constraints?.length) {
|
||||
prompt += `\nConstraints: ${dispatch.constraints.join(", ")}`;
|
||||
}
|
||||
if (dispatch.learnings?.length) {
|
||||
prompt += `\nRelevant learnings from prior sessions:\n${dispatch.learnings.slice(0, 5).join("\n")}`;
|
||||
}
|
||||
|
||||
const proc = spawn("claude", [
|
||||
"--print",
|
||||
"--permission-mode", "acceptEdits",
|
||||
"--output-format", "stream-json",
|
||||
"--verbose",
|
||||
"--project", projectDir,
|
||||
prompt,
|
||||
], {
|
||||
cwd: projectDir,
|
||||
stdio: ["ignore", "pipe", "pipe"],
|
||||
env: { ...process.env, GSTACK_DISPATCH_ID: dispatch.id },
|
||||
});
|
||||
|
||||
activeSessions.set(dispatch.id, { proc, startedAt, project: dispatch.project });
|
||||
|
||||
let stdout = "";
|
||||
let stderr = "";
|
||||
proc.stdout?.on("data", (chunk: Buffer) => { stdout += chunk.toString(); });
|
||||
proc.stderr?.on("data", (chunk: Buffer) => { stderr += chunk.toString(); });
|
||||
|
||||
// TTL enforcement
|
||||
const ttl = (dispatch.ttl_seconds || 3600) * 1000;
|
||||
const ttlTimer = setTimeout(() => {
|
||||
console.log(`[daemon] TTL exceeded for ${dispatch.id}, killing`);
|
||||
proc.kill("SIGTERM");
|
||||
setTimeout(() => proc.kill("SIGKILL"), 5000);
|
||||
}, ttl);
|
||||
|
||||
proc.on("close", async (code) => {
|
||||
clearTimeout(ttlTimer);
|
||||
activeSessions.delete(dispatch.id);
|
||||
const duration = Math.round((Date.now() - startedAt) / 1000);
|
||||
|
||||
if (code === 0) {
|
||||
await audit("completed", dispatch.id, { duration, exit_code: code });
|
||||
await writeCompletion(dispatch, "completed", duration, { output_length: stdout.length });
|
||||
console.log(`[daemon] ${dispatch.id} completed (${duration}s)`);
|
||||
} else {
|
||||
// Error classification for smart retry
|
||||
const errorType = classifyError(stderr, code);
|
||||
|
||||
if (retryCount === 0 && (errorType === "rate_limit" || errorType === "tool_failure")) {
|
||||
console.log(`[daemon] ${dispatch.id} failed (${errorType}), retrying...`);
|
||||
await audit("retried", dispatch.id, { error_type: errorType, retry: retryCount + 1 });
|
||||
|
||||
if (errorType === "rate_limit") {
|
||||
await new Promise(r => setTimeout(r, 30000)); // 30s backoff
|
||||
}
|
||||
await executeSession(dispatchPath, dispatch, retryCount + 1);
|
||||
return;
|
||||
}
|
||||
|
||||
await audit("failed", dispatch.id, { duration, exit_code: code, error_type: errorType, retry_count: retryCount });
|
||||
await writeCompletion(dispatch, "failed", duration, {
|
||||
error: stderr.slice(0, 500),
|
||||
error_type: errorType,
|
||||
retry_count: retryCount
|
||||
});
|
||||
console.log(`[daemon] ${dispatch.id} failed (${errorType}, ${duration}s)`);
|
||||
}
|
||||
|
||||
// Remove dispatch file
|
||||
await unlink(dispatchPath).catch(() => {});
|
||||
|
||||
// Process queue
|
||||
if (queue.length > 0 && !shuttingDown) {
|
||||
const next = queue.shift()!;
|
||||
await spawnSession(next);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// --- Error classification ---
|
||||
function classifyError(stderr: string, code: number | null): string {
|
||||
const lower = stderr.toLowerCase();
|
||||
if (lower.includes("rate limit") || lower.includes("429") || lower.includes("too many requests")) return "rate_limit";
|
||||
if (lower.includes("context") && (lower.includes("overflow") || lower.includes("too long") || lower.includes("token"))) return "context_overflow";
|
||||
if (lower.includes("i can't") || lower.includes("unable to") || lower.includes("not possible")) return "logic_error";
|
||||
if (code === 137 || code === 143) return "timeout"; // SIGKILL or SIGTERM
|
||||
return "tool_failure";
|
||||
}
|
||||
|
||||
// --- Completion report ---
|
||||
async function writeCompletion(dispatch: DispatchFile, status: string, duration: number, extra?: Record<string, unknown>) {
|
||||
const report = {
|
||||
dispatch_id: dispatch.id,
|
||||
status,
|
||||
duration,
|
||||
target_agent: dispatch.target_agent || "claude",
|
||||
...extra,
|
||||
};
|
||||
|
||||
const reportPath = join(COMPLETED_DIR, `${dispatch.id}.json`);
|
||||
await writeFile(reportPath, JSON.stringify(report, null, 2));
|
||||
|
||||
// Callback via Clawvisor (or skip in local-only mode)
|
||||
if (dispatch.callback_url && !LOCAL_ONLY) {
|
||||
try {
|
||||
const res = await fetch(dispatch.callback_url, {
|
||||
method: "POST",
|
||||
headers: { "Content-Type": "application/json" },
|
||||
body: JSON.stringify(report),
|
||||
});
|
||||
await audit("callback_sent", dispatch.id, { status: res.status });
|
||||
} catch (e) {
|
||||
await audit("callback_failed", dispatch.id, { error: String(e) });
|
||||
console.error(`[daemon] callback failed for ${dispatch.id}: ${e}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// --- Graceful shutdown ---
|
||||
function shutdown() {
|
||||
if (shuttingDown) return;
|
||||
shuttingDown = true;
|
||||
console.log(`[daemon] shutting down (${activeSessions.size} active, ${queue.length} queued)`);
|
||||
|
||||
// Clear queue
|
||||
queue.length = 0;
|
||||
|
||||
// Wait for active sessions to finish (with 60s timeout)
|
||||
if (activeSessions.size === 0) {
|
||||
clearInterval(heartbeatInterval);
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
setTimeout(() => {
|
||||
console.log("[daemon] timeout waiting for sessions, force killing");
|
||||
for (const [id, { proc }] of activeSessions) {
|
||||
proc.kill("SIGKILL");
|
||||
}
|
||||
clearInterval(heartbeatInterval);
|
||||
process.exit(1);
|
||||
}, 60000);
|
||||
}
|
||||
|
||||
process.on("SIGTERM", shutdown);
|
||||
process.on("SIGINT", shutdown);
|
||||
|
||||
// --- Main ---
|
||||
async function main() {
|
||||
await validateStartup();
|
||||
|
||||
// Start heartbeat
|
||||
heartbeatInterval = setInterval(writeHeartbeat, 30000);
|
||||
await writeHeartbeat();
|
||||
|
||||
// Process existing dispatch files
|
||||
const existing = await readdir(DISPATCH_DIR).catch(() => [] as string[]);
|
||||
for (const f of existing) {
|
||||
if (f.endsWith(".json") && f.startsWith("dispatch-")) {
|
||||
await spawnSession(join(DISPATCH_DIR, f));
|
||||
}
|
||||
}
|
||||
|
||||
// Watch for new files
|
||||
watch(DISPATCH_DIR, async (eventType, filename) => {
|
||||
if (shuttingDown) return;
|
||||
if (!filename || !filename.endsWith(".json") || !filename.startsWith("dispatch-")) return;
|
||||
|
||||
const filepath = join(DISPATCH_DIR, filename);
|
||||
if (!existsSync(filepath)) return; // file was deleted
|
||||
|
||||
// Small delay to ensure file is fully written
|
||||
await new Promise(r => setTimeout(r, 100));
|
||||
|
||||
await audit("received", filename);
|
||||
await spawnSession(filepath);
|
||||
});
|
||||
|
||||
console.log("[daemon] ready");
|
||||
}
|
||||
|
||||
main().catch((e) => {
|
||||
console.error("[daemon] fatal:", e);
|
||||
process.exit(1);
|
||||
});
|
||||
Reference in New Issue
Block a user