diff --git a/bin/gstack-dispatch-daemon b/bin/gstack-dispatch-daemon new file mode 100755 index 00000000..ef833b27 --- /dev/null +++ b/bin/gstack-dispatch-daemon @@ -0,0 +1,337 @@ +#!/usr/bin/env bun +/** + * gstack-dispatch-daemon — accept and execute coding tasks from orchestrators + * + * Watches ~/.gstack/dispatch/ for new dispatch files, validates them against + * the JSON schema, spawns Claude Code sessions with scoped permissions, and + * reports completion back via Clawvisor callback or local disk. + * + * Usage: + * gstack-dispatch-daemon # foreground mode + * gstack-dispatch-daemon --max-concurrent 3 # custom concurrency + * gstack-dispatch-daemon --local-only # no Clawvisor, filesystem approval + */ + +import { watch } from "fs"; +import { readdir, readFile, writeFile, mkdir, unlink } from "fs/promises"; +import { spawn } from "child_process"; +import { join, basename } from "path"; +import { existsSync } from "fs"; + +// --- Config --- +const GSTACK_HOME = process.env.GSTACK_HOME || join(process.env.HOME!, ".gstack"); +const DISPATCH_DIR = join(GSTACK_HOME, "dispatch"); +const COMPLETED_DIR = join(DISPATCH_DIR, "completed"); +const AUDIT_LOG = join(DISPATCH_DIR, "audit.jsonl"); +const HEARTBEAT_FILE = join(DISPATCH_DIR, ".daemon-heartbeat"); +const MAX_CONCURRENT = parseInt(process.argv.find((_, i, a) => a[i - 1] === "--max-concurrent") || "2"); +const LOCAL_ONLY = process.argv.includes("--local-only"); + +// --- State --- +const activeSessions = new Map; startedAt: number; project: string }>(); +const queue: string[] = []; +let shuttingDown = false; + +// --- Startup validation --- +async function validateStartup() { + await mkdir(DISPATCH_DIR, { recursive: true }); + await mkdir(COMPLETED_DIR, { recursive: true }); + + // Check claude binary + try { + const which = Bun.spawnSync(["which", "claude"]); + if (which.exitCode !== 0) { + console.error("[daemon] claude binary not found in PATH"); + process.exit(1); + } + } catch { + console.error("[daemon] could not verify claude binary"); + process.exit(1); + } + + console.log(`[daemon] started (max_concurrent=${MAX_CONCURRENT}, local_only=${LOCAL_ONLY})`); + console.log(`[daemon] watching ${DISPATCH_DIR}`); +} + +// --- Audit logging --- +async function audit(event: string, dispatchId: string, details?: Record) { + const entry = JSON.stringify({ + ts: new Date().toISOString(), + event, + dispatch_id: dispatchId, + ...details, + }); + await Bun.write(Bun.file(AUDIT_LOG), (existsSync(AUDIT_LOG) ? await Bun.file(AUDIT_LOG).text() : "") + entry + "\n"); +} + +// --- Heartbeat --- +let heartbeatInterval: ReturnType; +async function writeHeartbeat() { + const data = JSON.stringify({ + ts: new Date().toISOString(), + active: activeSessions.size, + queued: queue.length, + pid: process.pid, + }); + await Bun.write(HEARTBEAT_FILE, data); +} + +// --- Schema validation (lightweight, no ajv dependency) --- +function validateDispatch(data: unknown): data is DispatchFile { + if (!data || typeof data !== "object") return false; + const d = data as Record; + if (typeof d.id !== "string" || !d.id.startsWith("dispatch-")) return false; + if (typeof d.dispatched_by !== "string") return false; + if (typeof d.task !== "string" || d.task.length === 0) return false; + if (typeof d.project !== "string") return false; + if (typeof d.project_dir !== "string") return false; + if (d.ttl_seconds !== undefined && (typeof d.ttl_seconds !== "number" || d.ttl_seconds < 60 || d.ttl_seconds > 86400)) return false; + if (d.target_agent !== undefined && !["claude", "codex", "cursor", "gemini"].includes(d.target_agent as string)) return false; + return true; +} + +interface DispatchFile { + id: string; + dispatched_by: string; + task: string; + project: string; + project_dir: string; + target_agent?: string; + learnings?: string[]; + constraints?: string[]; + callback_url?: string; + clawvisor_task_id?: string; + source_signature?: string; + ttl_seconds?: number; +} + +// --- Session spawning --- +async function spawnSession(dispatchPath: string) { + const raw = await readFile(dispatchPath, "utf-8"); + let dispatch: DispatchFile; + try { + dispatch = JSON.parse(raw); + } catch { + await audit("schema_invalid", basename(dispatchPath), { error: "invalid JSON" }); + await unlink(dispatchPath).catch(() => {}); + return; + } + + if (!validateDispatch(dispatch)) { + await audit("schema_invalid", dispatch.id || basename(dispatchPath), { error: "schema validation failed" }); + await unlink(dispatchPath).catch(() => {}); + return; + } + + // Only claude is supported in v0 + if (dispatch.target_agent && dispatch.target_agent !== "claude") { + await audit("unsupported_agent", dispatch.id, { target_agent: dispatch.target_agent }); + await writeCompletion(dispatch, "failed", 0, { error: `unsupported agent: ${dispatch.target_agent}` }); + await unlink(dispatchPath).catch(() => {}); + return; + } + + // Concurrency check + if (activeSessions.size >= MAX_CONCURRENT) { + queue.push(dispatchPath); + await audit("queued", dispatch.id, { queue_depth: queue.length }); + console.log(`[daemon] queued ${dispatch.id} (${queue.length} in queue)`); + return; + } + + await executeSession(dispatchPath, dispatch); +} + +async function executeSession(dispatchPath: string, dispatch: DispatchFile, retryCount = 0) { + await audit("spawned", dispatch.id, { project: dispatch.project, retry: retryCount }); + console.log(`[daemon] spawning session for ${dispatch.id} (project: ${dispatch.project})`); + + const projectDir = dispatch.project_dir.replace("~", process.env.HOME!); + const startedAt = Date.now(); + + // Build the prompt + let prompt = `Load gstack. Dispatch ID: ${dispatch.id}. Task: ${dispatch.task}`; + if (dispatch.constraints?.length) { + prompt += `\nConstraints: ${dispatch.constraints.join(", ")}`; + } + if (dispatch.learnings?.length) { + prompt += `\nRelevant learnings from prior sessions:\n${dispatch.learnings.slice(0, 5).join("\n")}`; + } + + const proc = spawn("claude", [ + "--print", + "--permission-mode", "acceptEdits", + "--output-format", "stream-json", + "--verbose", + "--project", projectDir, + prompt, + ], { + cwd: projectDir, + stdio: ["ignore", "pipe", "pipe"], + env: { ...process.env, GSTACK_DISPATCH_ID: dispatch.id }, + }); + + activeSessions.set(dispatch.id, { proc, startedAt, project: dispatch.project }); + + let stdout = ""; + let stderr = ""; + proc.stdout?.on("data", (chunk: Buffer) => { stdout += chunk.toString(); }); + proc.stderr?.on("data", (chunk: Buffer) => { stderr += chunk.toString(); }); + + // TTL enforcement + const ttl = (dispatch.ttl_seconds || 3600) * 1000; + const ttlTimer = setTimeout(() => { + console.log(`[daemon] TTL exceeded for ${dispatch.id}, killing`); + proc.kill("SIGTERM"); + setTimeout(() => proc.kill("SIGKILL"), 5000); + }, ttl); + + proc.on("close", async (code) => { + clearTimeout(ttlTimer); + activeSessions.delete(dispatch.id); + const duration = Math.round((Date.now() - startedAt) / 1000); + + if (code === 0) { + await audit("completed", dispatch.id, { duration, exit_code: code }); + await writeCompletion(dispatch, "completed", duration, { output_length: stdout.length }); + console.log(`[daemon] ${dispatch.id} completed (${duration}s)`); + } else { + // Error classification for smart retry + const errorType = classifyError(stderr, code); + + if (retryCount === 0 && (errorType === "rate_limit" || errorType === "tool_failure")) { + console.log(`[daemon] ${dispatch.id} failed (${errorType}), retrying...`); + await audit("retried", dispatch.id, { error_type: errorType, retry: retryCount + 1 }); + + if (errorType === "rate_limit") { + await new Promise(r => setTimeout(r, 30000)); // 30s backoff + } + await executeSession(dispatchPath, dispatch, retryCount + 1); + return; + } + + await audit("failed", dispatch.id, { duration, exit_code: code, error_type: errorType, retry_count: retryCount }); + await writeCompletion(dispatch, "failed", duration, { + error: stderr.slice(0, 500), + error_type: errorType, + retry_count: retryCount + }); + console.log(`[daemon] ${dispatch.id} failed (${errorType}, ${duration}s)`); + } + + // Remove dispatch file + await unlink(dispatchPath).catch(() => {}); + + // Process queue + if (queue.length > 0 && !shuttingDown) { + const next = queue.shift()!; + await spawnSession(next); + } + }); +} + +// --- Error classification --- +function classifyError(stderr: string, code: number | null): string { + const lower = stderr.toLowerCase(); + if (lower.includes("rate limit") || lower.includes("429") || lower.includes("too many requests")) return "rate_limit"; + if (lower.includes("context") && (lower.includes("overflow") || lower.includes("too long") || lower.includes("token"))) return "context_overflow"; + if (lower.includes("i can't") || lower.includes("unable to") || lower.includes("not possible")) return "logic_error"; + if (code === 137 || code === 143) return "timeout"; // SIGKILL or SIGTERM + return "tool_failure"; +} + +// --- Completion report --- +async function writeCompletion(dispatch: DispatchFile, status: string, duration: number, extra?: Record) { + const report = { + dispatch_id: dispatch.id, + status, + duration, + target_agent: dispatch.target_agent || "claude", + ...extra, + }; + + const reportPath = join(COMPLETED_DIR, `${dispatch.id}.json`); + await writeFile(reportPath, JSON.stringify(report, null, 2)); + + // Callback via Clawvisor (or skip in local-only mode) + if (dispatch.callback_url && !LOCAL_ONLY) { + try { + const res = await fetch(dispatch.callback_url, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(report), + }); + await audit("callback_sent", dispatch.id, { status: res.status }); + } catch (e) { + await audit("callback_failed", dispatch.id, { error: String(e) }); + console.error(`[daemon] callback failed for ${dispatch.id}: ${e}`); + } + } +} + +// --- Graceful shutdown --- +function shutdown() { + if (shuttingDown) return; + shuttingDown = true; + console.log(`[daemon] shutting down (${activeSessions.size} active, ${queue.length} queued)`); + + // Clear queue + queue.length = 0; + + // Wait for active sessions to finish (with 60s timeout) + if (activeSessions.size === 0) { + clearInterval(heartbeatInterval); + process.exit(0); + } + + setTimeout(() => { + console.log("[daemon] timeout waiting for sessions, force killing"); + for (const [id, { proc }] of activeSessions) { + proc.kill("SIGKILL"); + } + clearInterval(heartbeatInterval); + process.exit(1); + }, 60000); +} + +process.on("SIGTERM", shutdown); +process.on("SIGINT", shutdown); + +// --- Main --- +async function main() { + await validateStartup(); + + // Start heartbeat + heartbeatInterval = setInterval(writeHeartbeat, 30000); + await writeHeartbeat(); + + // Process existing dispatch files + const existing = await readdir(DISPATCH_DIR).catch(() => [] as string[]); + for (const f of existing) { + if (f.endsWith(".json") && f.startsWith("dispatch-")) { + await spawnSession(join(DISPATCH_DIR, f)); + } + } + + // Watch for new files + watch(DISPATCH_DIR, async (eventType, filename) => { + if (shuttingDown) return; + if (!filename || !filename.endsWith(".json") || !filename.startsWith("dispatch-")) return; + + const filepath = join(DISPATCH_DIR, filename); + if (!existsSync(filepath)) return; // file was deleted + + // Small delay to ensure file is fully written + await new Promise(r => setTimeout(r, 100)); + + await audit("received", filename); + await spawnSession(filepath); + }); + + console.log("[daemon] ready"); +} + +main().catch((e) => { + console.error("[daemon] fatal:", e); + process.exit(1); +});