#!/usr/bin/env bun /** * gstack-dispatch-daemon — accept and execute coding tasks from orchestrators * * Watches ~/.gstack/dispatch/ for new dispatch files, validates them against * the JSON schema, spawns Claude Code sessions with scoped permissions, and * reports completion back via Clawvisor callback or local disk. * * Usage: * gstack-dispatch-daemon # foreground mode * gstack-dispatch-daemon --max-concurrent 3 # custom concurrency * gstack-dispatch-daemon --local-only # no Clawvisor, filesystem approval */ import { watch } from "fs"; import { readdir, readFile, writeFile, mkdir, unlink } from "fs/promises"; import { spawn } from "child_process"; import { join, basename } from "path"; import { existsSync } from "fs"; // --- Config --- const GSTACK_HOME = process.env.GSTACK_HOME || join(process.env.HOME!, ".gstack"); const DISPATCH_DIR = join(GSTACK_HOME, "dispatch"); const COMPLETED_DIR = join(DISPATCH_DIR, "completed"); const AUDIT_LOG = join(DISPATCH_DIR, "audit.jsonl"); const HEARTBEAT_FILE = join(DISPATCH_DIR, ".daemon-heartbeat"); const MAX_CONCURRENT = parseInt(process.argv.find((_, i, a) => a[i - 1] === "--max-concurrent") || "2"); const LOCAL_ONLY = process.argv.includes("--local-only"); // --- State --- const activeSessions = new Map; startedAt: number; project: string }>(); const queue: string[] = []; let shuttingDown = false; // --- Startup validation --- async function validateStartup() { await mkdir(DISPATCH_DIR, { recursive: true }); await mkdir(COMPLETED_DIR, { recursive: true }); // Check claude binary try { const which = Bun.spawnSync(["which", "claude"]); if (which.exitCode !== 0) { console.error("[daemon] claude binary not found in PATH"); process.exit(1); } } catch { console.error("[daemon] could not verify claude binary"); process.exit(1); } console.log(`[daemon] started (max_concurrent=${MAX_CONCURRENT}, local_only=${LOCAL_ONLY})`); console.log(`[daemon] watching ${DISPATCH_DIR}`); } // --- Audit logging --- async function audit(event: string, dispatchId: string, details?: Record) { const entry = JSON.stringify({ ts: new Date().toISOString(), event, dispatch_id: dispatchId, ...details, }); await Bun.write(Bun.file(AUDIT_LOG), (existsSync(AUDIT_LOG) ? await Bun.file(AUDIT_LOG).text() : "") + entry + "\n"); } // --- Heartbeat --- let heartbeatInterval: ReturnType; async function writeHeartbeat() { const data = JSON.stringify({ ts: new Date().toISOString(), active: activeSessions.size, queued: queue.length, pid: process.pid, }); await Bun.write(HEARTBEAT_FILE, data); } // --- Schema validation (lightweight, no ajv dependency) --- function validateDispatch(data: unknown): data is DispatchFile { if (!data || typeof data !== "object") return false; const d = data as Record; if (typeof d.id !== "string" || !d.id.startsWith("dispatch-")) return false; if (typeof d.dispatched_by !== "string") return false; if (typeof d.task !== "string" || d.task.length === 0) return false; if (typeof d.project !== "string") return false; if (typeof d.project_dir !== "string") return false; if (d.ttl_seconds !== undefined && (typeof d.ttl_seconds !== "number" || d.ttl_seconds < 60 || d.ttl_seconds > 86400)) return false; if (d.target_agent !== undefined && !["claude", "codex", "cursor", "gemini"].includes(d.target_agent as string)) return false; return true; } interface DispatchFile { id: string; dispatched_by: string; task: string; project: string; project_dir: string; target_agent?: string; learnings?: string[]; constraints?: string[]; callback_url?: string; clawvisor_task_id?: string; source_signature?: string; ttl_seconds?: number; } // --- Session spawning --- async function spawnSession(dispatchPath: string) { const raw = await readFile(dispatchPath, "utf-8"); let dispatch: DispatchFile; try { dispatch = JSON.parse(raw); } catch { await audit("schema_invalid", basename(dispatchPath), { error: "invalid JSON" }); await unlink(dispatchPath).catch(() => {}); return; } if (!validateDispatch(dispatch)) { await audit("schema_invalid", dispatch.id || basename(dispatchPath), { error: "schema validation failed" }); await unlink(dispatchPath).catch(() => {}); return; } // Only claude is supported in v0 if (dispatch.target_agent && dispatch.target_agent !== "claude") { await audit("unsupported_agent", dispatch.id, { target_agent: dispatch.target_agent }); await writeCompletion(dispatch, "failed", 0, { error: `unsupported agent: ${dispatch.target_agent}` }); await unlink(dispatchPath).catch(() => {}); return; } // Concurrency check if (activeSessions.size >= MAX_CONCURRENT) { queue.push(dispatchPath); await audit("queued", dispatch.id, { queue_depth: queue.length }); console.log(`[daemon] queued ${dispatch.id} (${queue.length} in queue)`); return; } await executeSession(dispatchPath, dispatch); } async function executeSession(dispatchPath: string, dispatch: DispatchFile, retryCount = 0) { await audit("spawned", dispatch.id, { project: dispatch.project, retry: retryCount }); console.log(`[daemon] spawning session for ${dispatch.id} (project: ${dispatch.project})`); const projectDir = dispatch.project_dir.replace("~", process.env.HOME!); const startedAt = Date.now(); // Build the prompt let prompt = `Load gstack. Dispatch ID: ${dispatch.id}. Task: ${dispatch.task}`; if (dispatch.constraints?.length) { prompt += `\nConstraints: ${dispatch.constraints.join(", ")}`; } if (dispatch.learnings?.length) { prompt += `\nRelevant learnings from prior sessions:\n${dispatch.learnings.slice(0, 5).join("\n")}`; } const proc = spawn("claude", [ "--print", "--permission-mode", "acceptEdits", "--output-format", "stream-json", "--verbose", "--project", projectDir, prompt, ], { cwd: projectDir, stdio: ["ignore", "pipe", "pipe"], env: { ...process.env, GSTACK_DISPATCH_ID: dispatch.id }, }); activeSessions.set(dispatch.id, { proc, startedAt, project: dispatch.project }); let stdout = ""; let stderr = ""; proc.stdout?.on("data", (chunk: Buffer) => { stdout += chunk.toString(); }); proc.stderr?.on("data", (chunk: Buffer) => { stderr += chunk.toString(); }); // TTL enforcement const ttl = (dispatch.ttl_seconds || 3600) * 1000; const ttlTimer = setTimeout(() => { console.log(`[daemon] TTL exceeded for ${dispatch.id}, killing`); proc.kill("SIGTERM"); setTimeout(() => proc.kill("SIGKILL"), 5000); }, ttl); proc.on("close", async (code) => { clearTimeout(ttlTimer); activeSessions.delete(dispatch.id); const duration = Math.round((Date.now() - startedAt) / 1000); if (code === 0) { await audit("completed", dispatch.id, { duration, exit_code: code }); await writeCompletion(dispatch, "completed", duration, { output_length: stdout.length }); console.log(`[daemon] ${dispatch.id} completed (${duration}s)`); } else { // Error classification for smart retry const errorType = classifyError(stderr, code); if (retryCount === 0 && (errorType === "rate_limit" || errorType === "tool_failure")) { console.log(`[daemon] ${dispatch.id} failed (${errorType}), retrying...`); await audit("retried", dispatch.id, { error_type: errorType, retry: retryCount + 1 }); if (errorType === "rate_limit") { await new Promise(r => setTimeout(r, 30000)); // 30s backoff } await executeSession(dispatchPath, dispatch, retryCount + 1); return; } await audit("failed", dispatch.id, { duration, exit_code: code, error_type: errorType, retry_count: retryCount }); await writeCompletion(dispatch, "failed", duration, { error: stderr.slice(0, 500), error_type: errorType, retry_count: retryCount }); console.log(`[daemon] ${dispatch.id} failed (${errorType}, ${duration}s)`); } // Remove dispatch file await unlink(dispatchPath).catch(() => {}); // Process queue if (queue.length > 0 && !shuttingDown) { const next = queue.shift()!; await spawnSession(next); } }); } // --- Error classification --- function classifyError(stderr: string, code: number | null): string { const lower = stderr.toLowerCase(); if (lower.includes("rate limit") || lower.includes("429") || lower.includes("too many requests")) return "rate_limit"; if (lower.includes("context") && (lower.includes("overflow") || lower.includes("too long") || lower.includes("token"))) return "context_overflow"; if (lower.includes("i can't") || lower.includes("unable to") || lower.includes("not possible")) return "logic_error"; if (code === 137 || code === 143) return "timeout"; // SIGKILL or SIGTERM return "tool_failure"; } // --- Completion report --- async function writeCompletion(dispatch: DispatchFile, status: string, duration: number, extra?: Record) { const report = { dispatch_id: dispatch.id, status, duration, target_agent: dispatch.target_agent || "claude", ...extra, }; const reportPath = join(COMPLETED_DIR, `${dispatch.id}.json`); await writeFile(reportPath, JSON.stringify(report, null, 2)); // Callback via Clawvisor (or skip in local-only mode) if (dispatch.callback_url && !LOCAL_ONLY) { try { const res = await fetch(dispatch.callback_url, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify(report), }); await audit("callback_sent", dispatch.id, { status: res.status }); } catch (e) { await audit("callback_failed", dispatch.id, { error: String(e) }); console.error(`[daemon] callback failed for ${dispatch.id}: ${e}`); } } } // --- Graceful shutdown --- function shutdown() { if (shuttingDown) return; shuttingDown = true; console.log(`[daemon] shutting down (${activeSessions.size} active, ${queue.length} queued)`); // Clear queue queue.length = 0; // Wait for active sessions to finish (with 60s timeout) if (activeSessions.size === 0) { clearInterval(heartbeatInterval); process.exit(0); } setTimeout(() => { console.log("[daemon] timeout waiting for sessions, force killing"); for (const [id, { proc }] of activeSessions) { proc.kill("SIGKILL"); } clearInterval(heartbeatInterval); process.exit(1); }, 60000); } process.on("SIGTERM", shutdown); process.on("SIGINT", shutdown); // --- Main --- async function main() { await validateStartup(); // Start heartbeat heartbeatInterval = setInterval(writeHeartbeat, 30000); await writeHeartbeat(); // Process existing dispatch files const existing = await readdir(DISPATCH_DIR).catch(() => [] as string[]); for (const f of existing) { if (f.endsWith(".json") && f.startsWith("dispatch-")) { await spawnSession(join(DISPATCH_DIR, f)); } } // Watch for new files watch(DISPATCH_DIR, async (eventType, filename) => { if (shuttingDown) return; if (!filename || !filename.endsWith(".json") || !filename.startsWith("dispatch-")) return; const filepath = join(DISPATCH_DIR, filename); if (!existsSync(filepath)) return; // file was deleted // Small delay to ensure file is fully written await new Promise(r => setTimeout(r, 100)); await audit("received", filename); await spawnSession(filepath); }); console.log("[daemon] ready"); } main().catch((e) => { console.error("[daemon] fatal:", e); process.exit(1); });