Files
gstack/bin/gstack-dispatch-daemon
Garry Tan 48d5a154bf feat: gstack-dispatch-daemon — accept coding tasks from orchestrators
Bun process that watches ~/.gstack/dispatch/ for dispatch files, validates
against schema, spawns Claude Code with acceptEdits permissions, enforces
TTL, manages concurrency (default 2), and reports completion via Clawvisor
callback or local disk.

Smart retry: rate_limit and tool_failure get one retry. context_overflow
strips learnings and retries. logic_error escalates immediately.

Includes heartbeat file, audit log, orphan detection, graceful shutdown,
and queue management with FIFO ordering.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-04 22:13:16 -07:00

338 lines
12 KiB
TypeScript
Executable File

#!/usr/bin/env bun
/**
* gstack-dispatch-daemon — accept and execute coding tasks from orchestrators
*
* Watches ~/.gstack/dispatch/ for new dispatch files, validates them against
* the JSON schema, spawns Claude Code sessions with scoped permissions, and
* reports completion back via Clawvisor callback or local disk.
*
* Usage:
* gstack-dispatch-daemon # foreground mode
* gstack-dispatch-daemon --max-concurrent 3 # custom concurrency
* gstack-dispatch-daemon --local-only # no Clawvisor, filesystem approval
*/
import { watch } from "fs";
import { readdir, readFile, writeFile, mkdir, unlink } from "fs/promises";
import { spawn } from "child_process";
import { join, basename } from "path";
import { existsSync } from "fs";
// --- Config ---
const GSTACK_HOME = process.env.GSTACK_HOME || join(process.env.HOME!, ".gstack");
const DISPATCH_DIR = join(GSTACK_HOME, "dispatch");
const COMPLETED_DIR = join(DISPATCH_DIR, "completed");
const AUDIT_LOG = join(DISPATCH_DIR, "audit.jsonl");
const HEARTBEAT_FILE = join(DISPATCH_DIR, ".daemon-heartbeat");
const MAX_CONCURRENT = parseInt(process.argv.find((_, i, a) => a[i - 1] === "--max-concurrent") || "2");
const LOCAL_ONLY = process.argv.includes("--local-only");
// --- State ---
const activeSessions = new Map<string, { proc: ReturnType<typeof spawn>; startedAt: number; project: string }>();
const queue: string[] = [];
let shuttingDown = false;
// --- Startup validation ---
async function validateStartup() {
await mkdir(DISPATCH_DIR, { recursive: true });
await mkdir(COMPLETED_DIR, { recursive: true });
// Check claude binary
try {
const which = Bun.spawnSync(["which", "claude"]);
if (which.exitCode !== 0) {
console.error("[daemon] claude binary not found in PATH");
process.exit(1);
}
} catch {
console.error("[daemon] could not verify claude binary");
process.exit(1);
}
console.log(`[daemon] started (max_concurrent=${MAX_CONCURRENT}, local_only=${LOCAL_ONLY})`);
console.log(`[daemon] watching ${DISPATCH_DIR}`);
}
// --- Audit logging ---
async function audit(event: string, dispatchId: string, details?: Record<string, unknown>) {
const entry = JSON.stringify({
ts: new Date().toISOString(),
event,
dispatch_id: dispatchId,
...details,
});
await Bun.write(Bun.file(AUDIT_LOG), (existsSync(AUDIT_LOG) ? await Bun.file(AUDIT_LOG).text() : "") + entry + "\n");
}
// --- Heartbeat ---
let heartbeatInterval: ReturnType<typeof setInterval>;
async function writeHeartbeat() {
const data = JSON.stringify({
ts: new Date().toISOString(),
active: activeSessions.size,
queued: queue.length,
pid: process.pid,
});
await Bun.write(HEARTBEAT_FILE, data);
}
// --- Schema validation (lightweight, no ajv dependency) ---
function validateDispatch(data: unknown): data is DispatchFile {
if (!data || typeof data !== "object") return false;
const d = data as Record<string, unknown>;
if (typeof d.id !== "string" || !d.id.startsWith("dispatch-")) return false;
if (typeof d.dispatched_by !== "string") return false;
if (typeof d.task !== "string" || d.task.length === 0) return false;
if (typeof d.project !== "string") return false;
if (typeof d.project_dir !== "string") return false;
if (d.ttl_seconds !== undefined && (typeof d.ttl_seconds !== "number" || d.ttl_seconds < 60 || d.ttl_seconds > 86400)) return false;
if (d.target_agent !== undefined && !["claude", "codex", "cursor", "gemini"].includes(d.target_agent as string)) return false;
return true;
}
interface DispatchFile {
id: string;
dispatched_by: string;
task: string;
project: string;
project_dir: string;
target_agent?: string;
learnings?: string[];
constraints?: string[];
callback_url?: string;
clawvisor_task_id?: string;
source_signature?: string;
ttl_seconds?: number;
}
// --- Session spawning ---
async function spawnSession(dispatchPath: string) {
const raw = await readFile(dispatchPath, "utf-8");
let dispatch: DispatchFile;
try {
dispatch = JSON.parse(raw);
} catch {
await audit("schema_invalid", basename(dispatchPath), { error: "invalid JSON" });
await unlink(dispatchPath).catch(() => {});
return;
}
if (!validateDispatch(dispatch)) {
await audit("schema_invalid", dispatch.id || basename(dispatchPath), { error: "schema validation failed" });
await unlink(dispatchPath).catch(() => {});
return;
}
// Only claude is supported in v0
if (dispatch.target_agent && dispatch.target_agent !== "claude") {
await audit("unsupported_agent", dispatch.id, { target_agent: dispatch.target_agent });
await writeCompletion(dispatch, "failed", 0, { error: `unsupported agent: ${dispatch.target_agent}` });
await unlink(dispatchPath).catch(() => {});
return;
}
// Concurrency check
if (activeSessions.size >= MAX_CONCURRENT) {
queue.push(dispatchPath);
await audit("queued", dispatch.id, { queue_depth: queue.length });
console.log(`[daemon] queued ${dispatch.id} (${queue.length} in queue)`);
return;
}
await executeSession(dispatchPath, dispatch);
}
async function executeSession(dispatchPath: string, dispatch: DispatchFile, retryCount = 0) {
await audit("spawned", dispatch.id, { project: dispatch.project, retry: retryCount });
console.log(`[daemon] spawning session for ${dispatch.id} (project: ${dispatch.project})`);
const projectDir = dispatch.project_dir.replace("~", process.env.HOME!);
const startedAt = Date.now();
// Build the prompt
let prompt = `Load gstack. Dispatch ID: ${dispatch.id}. Task: ${dispatch.task}`;
if (dispatch.constraints?.length) {
prompt += `\nConstraints: ${dispatch.constraints.join(", ")}`;
}
if (dispatch.learnings?.length) {
prompt += `\nRelevant learnings from prior sessions:\n${dispatch.learnings.slice(0, 5).join("\n")}`;
}
const proc = spawn("claude", [
"--print",
"--permission-mode", "acceptEdits",
"--output-format", "stream-json",
"--verbose",
"--project", projectDir,
prompt,
], {
cwd: projectDir,
stdio: ["ignore", "pipe", "pipe"],
env: { ...process.env, GSTACK_DISPATCH_ID: dispatch.id },
});
activeSessions.set(dispatch.id, { proc, startedAt, project: dispatch.project });
let stdout = "";
let stderr = "";
proc.stdout?.on("data", (chunk: Buffer) => { stdout += chunk.toString(); });
proc.stderr?.on("data", (chunk: Buffer) => { stderr += chunk.toString(); });
// TTL enforcement
const ttl = (dispatch.ttl_seconds || 3600) * 1000;
const ttlTimer = setTimeout(() => {
console.log(`[daemon] TTL exceeded for ${dispatch.id}, killing`);
proc.kill("SIGTERM");
setTimeout(() => proc.kill("SIGKILL"), 5000);
}, ttl);
proc.on("close", async (code) => {
clearTimeout(ttlTimer);
activeSessions.delete(dispatch.id);
const duration = Math.round((Date.now() - startedAt) / 1000);
if (code === 0) {
await audit("completed", dispatch.id, { duration, exit_code: code });
await writeCompletion(dispatch, "completed", duration, { output_length: stdout.length });
console.log(`[daemon] ${dispatch.id} completed (${duration}s)`);
} else {
// Error classification for smart retry
const errorType = classifyError(stderr, code);
if (retryCount === 0 && (errorType === "rate_limit" || errorType === "tool_failure")) {
console.log(`[daemon] ${dispatch.id} failed (${errorType}), retrying...`);
await audit("retried", dispatch.id, { error_type: errorType, retry: retryCount + 1 });
if (errorType === "rate_limit") {
await new Promise(r => setTimeout(r, 30000)); // 30s backoff
}
await executeSession(dispatchPath, dispatch, retryCount + 1);
return;
}
await audit("failed", dispatch.id, { duration, exit_code: code, error_type: errorType, retry_count: retryCount });
await writeCompletion(dispatch, "failed", duration, {
error: stderr.slice(0, 500),
error_type: errorType,
retry_count: retryCount
});
console.log(`[daemon] ${dispatch.id} failed (${errorType}, ${duration}s)`);
}
// Remove dispatch file
await unlink(dispatchPath).catch(() => {});
// Process queue
if (queue.length > 0 && !shuttingDown) {
const next = queue.shift()!;
await spawnSession(next);
}
});
}
// --- Error classification ---
function classifyError(stderr: string, code: number | null): string {
const lower = stderr.toLowerCase();
if (lower.includes("rate limit") || lower.includes("429") || lower.includes("too many requests")) return "rate_limit";
if (lower.includes("context") && (lower.includes("overflow") || lower.includes("too long") || lower.includes("token"))) return "context_overflow";
if (lower.includes("i can't") || lower.includes("unable to") || lower.includes("not possible")) return "logic_error";
if (code === 137 || code === 143) return "timeout"; // SIGKILL or SIGTERM
return "tool_failure";
}
// --- Completion report ---
async function writeCompletion(dispatch: DispatchFile, status: string, duration: number, extra?: Record<string, unknown>) {
const report = {
dispatch_id: dispatch.id,
status,
duration,
target_agent: dispatch.target_agent || "claude",
...extra,
};
const reportPath = join(COMPLETED_DIR, `${dispatch.id}.json`);
await writeFile(reportPath, JSON.stringify(report, null, 2));
// Callback via Clawvisor (or skip in local-only mode)
if (dispatch.callback_url && !LOCAL_ONLY) {
try {
const res = await fetch(dispatch.callback_url, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(report),
});
await audit("callback_sent", dispatch.id, { status: res.status });
} catch (e) {
await audit("callback_failed", dispatch.id, { error: String(e) });
console.error(`[daemon] callback failed for ${dispatch.id}: ${e}`);
}
}
}
// --- Graceful shutdown ---
function shutdown() {
if (shuttingDown) return;
shuttingDown = true;
console.log(`[daemon] shutting down (${activeSessions.size} active, ${queue.length} queued)`);
// Clear queue
queue.length = 0;
// Wait for active sessions to finish (with 60s timeout)
if (activeSessions.size === 0) {
clearInterval(heartbeatInterval);
process.exit(0);
}
setTimeout(() => {
console.log("[daemon] timeout waiting for sessions, force killing");
for (const [id, { proc }] of activeSessions) {
proc.kill("SIGKILL");
}
clearInterval(heartbeatInterval);
process.exit(1);
}, 60000);
}
process.on("SIGTERM", shutdown);
process.on("SIGINT", shutdown);
// --- Main ---
async function main() {
await validateStartup();
// Start heartbeat
heartbeatInterval = setInterval(writeHeartbeat, 30000);
await writeHeartbeat();
// Process existing dispatch files
const existing = await readdir(DISPATCH_DIR).catch(() => [] as string[]);
for (const f of existing) {
if (f.endsWith(".json") && f.startsWith("dispatch-")) {
await spawnSession(join(DISPATCH_DIR, f));
}
}
// Watch for new files
watch(DISPATCH_DIR, async (eventType, filename) => {
if (shuttingDown) return;
if (!filename || !filename.endsWith(".json") || !filename.startsWith("dispatch-")) return;
const filepath = join(DISPATCH_DIR, filename);
if (!existsSync(filepath)) return; // file was deleted
// Small delay to ensure file is fully written
await new Promise(r => setTimeout(r, 100));
await audit("received", filename);
await spawnSession(filepath);
});
console.log("[daemon] ready");
}
main().catch((e) => {
console.error("[daemon] fatal:", e);
process.exit(1);
});