Files
shannon/src/temporal/workflows.ts
T
Arjun Malleswaran 51e621d0d5 Feat/temporal (#46)
* refactor: modularize claude-executor and extract shared utilities

- Extract message handling into src/ai/message-handlers.ts with pure functions
- Extract output formatting into src/ai/output-formatters.ts
- Extract progress management into src/ai/progress-manager.ts
- Add audit-logger.ts with Null Object pattern for optional logging
- Add shared utilities: formatting.ts, file-io.ts, functional.ts
- Consolidate getPromptNameForAgent into src/types/agents.ts

* feat: add Claude Code custom commands for debug and review

* feat: add Temporal integration foundation (phase 1-2)

- Add Temporal SDK dependencies (@temporalio/client, worker, workflow, activity)
- Add shared types for pipeline state, metrics, and progress queries
- Add classifyErrorForTemporal() for retry behavior classification
- Add docker-compose for Temporal server with SQLite persistence

* feat: add Temporal activities for agent execution (phase 3)

- Add activities.ts with heartbeat loop, git checkpoint/rollback, and error classification
- Export runClaudePrompt, validateAgentOutput, ClaudePromptResult for Temporal use
- Track attempt number via Temporal Context for accurate audit logging
- Rollback git workspace before retry to ensure clean state

* feat: add Temporal workflow for 5-phase pipeline orchestration (phase 4)

* feat: add Temporal worker, client, and query tools (phase 5)

- Add worker.ts with workflow bundling and graceful shutdown
- Add client.ts CLI to start pipelines with progress polling
- Add query.ts CLI to inspect running workflow state
- Fix buffer overflow by truncating error messages and stack traces
- Skip git operations gracefully on non-git repositories
- Add kill.sh/start.sh dev scripts and Dockerfile.worker

* feat: fix Docker worker container setup

- Install uv instead of deprecated uvx package
- Add mcp-server and configs directories to container
- Mount target repo dynamically via TARGET_REPO env variable

* fix: add report assembly step to Temporal workflow

- Add assembleReportActivity to concatenate exploitation evidence files before report agent runs
- Call assembleFinalReport in workflow Phase 5 before runReportAgent
- Ensure deliverables directory exists before writing final report
- Simplify pipeline-testing report prompt to just prepend header

* refactor: consolidate Docker setup to root docker-compose.yml

* feat: improve Temporal client UX and env handling

- Change default to fire-and-forget (--wait flag to opt-in)
- Add splash screen and improve console output formatting
- Add .env to gitignore, remove from dockerignore for container access
- Add Taskfile for common development commands

* refactor: simplify session ID handling and improve Taskfile options

- Include hostname in workflow ID for better audit log organization
- Extract sanitizeHostname utility to audit/utils.ts for reuse
- Remove unused generateSessionLogPath and buildLogFilePath functions
- Simplify Taskfile with CONFIG/OUTPUT/CLEAN named parameters

* chore: add .env.example and simplify .gitignore

* docs: update README and CLAUDE.md for Temporal workflow usage

- Replace Docker CLI instructions with Task-based commands
- Add monitoring/stopping sections and workflow examples
- Document Temporal orchestration layer and troubleshooting
- Simplify file structure to key files overview

* refactor: replace Taskfile with bash CLI script

- Add shannon bash script with start/logs/query/stop/help commands
- Remove Taskfile.yml dependency (no longer requires Task installation)
- Update README.md and CLAUDE.md to use ./shannon commands
- Update client.ts output to show ./shannon commands

* docs: fix deliverable filename in README

* refactor: remove direct CLI and .shannon-store.json in favor of Temporal

- Delete src/shannon.ts direct CLI entry point (Temporal is now the only mode)
- Remove .shannon-store.json session lock (Temporal handles workflow deduplication)
- Remove broken scripts/export-metrics.js (imported non-existent function)
- Update package.json to remove main, start script, and bin entry
- Clean up CLAUDE.md and debug.md to remove obsolete references

* chore: remove licensing comments from prompt files to prevent leaking into actual prompts

* fix: resolve parallel workflow race conditions and retry logic bugs

- Fix save_deliverable race condition using closure pattern instead of global variable
- Fix error classification order so OutputValidationError matches before generic validation
- Fix ApplicationFailure re-classification bug by checking instanceof before re-throwing
- Add per-error-type retry limits (3 for output validation, 50 for billing)
- Add fast retry intervals for pipeline testing mode (10s vs 5min)
- Increase worker concurrent activities to 25 for parallel workflows

* refactor: pipeline vuln→exploit workflow for parallel execution

- Replace sync barrier between vuln/exploit phases with independent pipelines
- Each vuln type runs: vuln agent → queue check → conditional exploit
- Add checkExploitationQueue activity to skip exploits when no vulns found
- Use Promise.allSettled for graceful failure handling across pipelines
- Add PipelineSummary type for aggregated cost/duration/turns metrics

* fix: re-throw retryable errors in checkExploitationQueue

* fix: detect and retry on Claude Code spending cap errors

- Add spending cap pattern detection in detectApiError() with retryable error
- Add matching patterns to classifyErrorForTemporal() for proper Temporal retry
- Add defense-in-depth safeguard in runClaudePrompt() for $0 cost / low turn detection
- Add final sanity check in activities before declaring success

* fix: increase heartbeat timeout to prevent false worker-dead detection

Original 30s timeout was from POC spec assuming <5min activities. With
hour-long activities and multiple concurrent workflows sharing one worker,
resource contention causes event loop stalls exceeding 30s, triggering
false heartbeat timeouts. Increased to 10min (prod) and 5min (testing).

* fix: temporal db init

* fix: persist home dir

* feat: add per-workflow unified logging with ./shannon logs ID=<workflow-id>

- Add WorkflowLogger class for human-readable, per-workflow log files
- Create workflow.log in audit-logs/{workflowId}/ with phase, agent, tool, and LLM events
- Update ./shannon logs to require ID param and tail specific workflow log
- Add phase transition logging at workflow boundaries
- Include workflow completion summary with agent breakdown (duration, cost)
- Mount audit-logs volume in docker-compose for host access

---------

Co-authored-by: ezl-keygraph <ezhil@keygraph.io>
2026-01-15 10:36:11 -08:00

326 lines
11 KiB
TypeScript

// Copyright (C) 2025 Keygraph, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License version 3
// as published by the Free Software Foundation.
/**
* Temporal workflow for Shannon pentest pipeline.
*
* Orchestrates the penetration testing workflow:
* 1. Pre-Reconnaissance (sequential)
* 2. Reconnaissance (sequential)
* 3-4. Vulnerability + Exploitation (5 pipelined pairs in parallel)
* Each pair: vuln agent → queue check → conditional exploit
* No synchronization barrier - exploits start when their vuln finishes
* 5. Reporting (sequential)
*
* Features:
* - Queryable state via getProgress
* - Automatic retry with backoff for transient/billing errors
* - Non-retryable classification for permanent errors
* - Audit correlation via workflowId
* - Graceful failure handling: pipelines continue if one fails
*/
import {
proxyActivities,
setHandler,
workflowInfo,
} from '@temporalio/workflow';
import type * as activities from './activities.js';
import type { ActivityInput } from './activities.js';
import {
getProgress,
type PipelineInput,
type PipelineState,
type PipelineProgress,
type PipelineSummary,
type VulnExploitPipelineResult,
type AgentMetrics,
} from './shared.js';
import type { VulnType } from '../queue-validation.js';
// Retry configuration for production (long intervals for billing recovery)
const PRODUCTION_RETRY = {
initialInterval: '5 minutes',
maximumInterval: '30 minutes',
backoffCoefficient: 2,
maximumAttempts: 50,
nonRetryableErrorTypes: [
'AuthenticationError',
'PermissionError',
'InvalidRequestError',
'RequestTooLargeError',
'ConfigurationError',
'InvalidTargetError',
'ExecutionLimitError',
],
};
// Retry configuration for pipeline testing (fast iteration)
const TESTING_RETRY = {
initialInterval: '10 seconds',
maximumInterval: '30 seconds',
backoffCoefficient: 2,
maximumAttempts: 5,
nonRetryableErrorTypes: PRODUCTION_RETRY.nonRetryableErrorTypes,
};
// Activity proxy with production retry configuration (default)
const acts = proxyActivities<typeof activities>({
startToCloseTimeout: '2 hours',
heartbeatTimeout: '10 minutes', // Long timeout for resource-constrained workers with many concurrent activities
retry: PRODUCTION_RETRY,
});
// Activity proxy with testing retry configuration (fast)
const testActs = proxyActivities<typeof activities>({
startToCloseTimeout: '10 minutes',
heartbeatTimeout: '5 minutes', // Shorter for testing but still tolerant of resource contention
retry: TESTING_RETRY,
});
/**
* Compute aggregated metrics from the current pipeline state.
* Called on both success and failure to provide partial metrics.
*/
function computeSummary(state: PipelineState): PipelineSummary {
const metrics = Object.values(state.agentMetrics);
return {
totalCostUsd: metrics.reduce((sum, m) => sum + (m.costUsd ?? 0), 0),
totalDurationMs: Date.now() - state.startTime,
totalTurns: metrics.reduce((sum, m) => sum + (m.numTurns ?? 0), 0),
agentCount: state.completedAgents.length,
};
}
export async function pentestPipelineWorkflow(
input: PipelineInput
): Promise<PipelineState> {
const { workflowId } = workflowInfo();
// Select activity proxy based on testing mode
// Pipeline testing uses fast retry intervals (10s) for quick iteration
const a = input.pipelineTestingMode ? testActs : acts;
// Workflow state (queryable)
const state: PipelineState = {
status: 'running',
currentPhase: null,
currentAgent: null,
completedAgents: [],
failedAgent: null,
error: null,
startTime: Date.now(),
agentMetrics: {},
summary: null,
};
// Register query handler for real-time progress inspection
setHandler(getProgress, (): PipelineProgress => ({
...state,
workflowId,
elapsedMs: Date.now() - state.startTime,
}));
// Build ActivityInput with required workflowId for audit correlation
// Activities require workflowId (non-optional), PipelineInput has it optional
// Use spread to conditionally include optional properties (exactOptionalPropertyTypes)
const activityInput: ActivityInput = {
webUrl: input.webUrl,
repoPath: input.repoPath,
workflowId,
...(input.configPath !== undefined && { configPath: input.configPath }),
...(input.outputPath !== undefined && { outputPath: input.outputPath }),
...(input.pipelineTestingMode !== undefined && {
pipelineTestingMode: input.pipelineTestingMode,
}),
};
try {
// === Phase 1: Pre-Reconnaissance ===
state.currentPhase = 'pre-recon';
state.currentAgent = 'pre-recon';
await a.logPhaseTransition(activityInput, 'pre-recon', 'start');
state.agentMetrics['pre-recon'] =
await a.runPreReconAgent(activityInput);
state.completedAgents.push('pre-recon');
await a.logPhaseTransition(activityInput, 'pre-recon', 'complete');
// === Phase 2: Reconnaissance ===
state.currentPhase = 'recon';
state.currentAgent = 'recon';
await a.logPhaseTransition(activityInput, 'recon', 'start');
state.agentMetrics['recon'] = await a.runReconAgent(activityInput);
state.completedAgents.push('recon');
await a.logPhaseTransition(activityInput, 'recon', 'complete');
// === Phases 3-4: Vulnerability Analysis + Exploitation (Pipelined) ===
// Each vuln type runs as an independent pipeline:
// vuln agent → queue check → conditional exploit agent
// This eliminates the synchronization barrier between phases - each exploit
// starts immediately when its vuln agent finishes, not waiting for all.
state.currentPhase = 'vulnerability-exploitation';
state.currentAgent = 'pipelines';
await a.logPhaseTransition(activityInput, 'vulnerability-exploitation', 'start');
// Helper: Run a single vuln→exploit pipeline
async function runVulnExploitPipeline(
vulnType: VulnType,
runVulnAgent: () => Promise<AgentMetrics>,
runExploitAgent: () => Promise<AgentMetrics>
): Promise<VulnExploitPipelineResult> {
// Step 1: Run vulnerability agent
const vulnMetrics = await runVulnAgent();
// Step 2: Check exploitation queue (starts immediately after vuln)
const decision = await a.checkExploitationQueue(activityInput, vulnType);
// Step 3: Conditionally run exploit agent
let exploitMetrics: AgentMetrics | null = null;
if (decision.shouldExploit) {
exploitMetrics = await runExploitAgent();
}
return {
vulnType,
vulnMetrics,
exploitMetrics,
exploitDecision: {
shouldExploit: decision.shouldExploit,
vulnerabilityCount: decision.vulnerabilityCount,
},
error: null,
};
}
// Run all 5 pipelines in parallel with graceful failure handling
// Promise.allSettled ensures other pipelines continue if one fails
const pipelineResults = await Promise.allSettled([
runVulnExploitPipeline(
'injection',
() => a.runInjectionVulnAgent(activityInput),
() => a.runInjectionExploitAgent(activityInput)
),
runVulnExploitPipeline(
'xss',
() => a.runXssVulnAgent(activityInput),
() => a.runXssExploitAgent(activityInput)
),
runVulnExploitPipeline(
'auth',
() => a.runAuthVulnAgent(activityInput),
() => a.runAuthExploitAgent(activityInput)
),
runVulnExploitPipeline(
'ssrf',
() => a.runSsrfVulnAgent(activityInput),
() => a.runSsrfExploitAgent(activityInput)
),
runVulnExploitPipeline(
'authz',
() => a.runAuthzVulnAgent(activityInput),
() => a.runAuthzExploitAgent(activityInput)
),
]);
// Aggregate results from all pipelines
const failedPipelines: string[] = [];
for (const result of pipelineResults) {
if (result.status === 'fulfilled') {
const { vulnType, vulnMetrics, exploitMetrics } = result.value;
// Record vuln agent metrics
if (vulnMetrics) {
state.agentMetrics[`${vulnType}-vuln`] = vulnMetrics;
state.completedAgents.push(`${vulnType}-vuln`);
}
// Record exploit agent metrics (if it ran)
if (exploitMetrics) {
state.agentMetrics[`${vulnType}-exploit`] = exploitMetrics;
state.completedAgents.push(`${vulnType}-exploit`);
}
} else {
// Pipeline failed - log error but continue with others
const errorMsg =
result.reason instanceof Error
? result.reason.message
: String(result.reason);
failedPipelines.push(errorMsg);
}
}
// Log any pipeline failures (workflow continues despite failures)
if (failedPipelines.length > 0) {
console.log(
`⚠️ ${failedPipelines.length} pipeline(s) failed:`,
failedPipelines
);
}
// Update phase markers
state.currentPhase = 'exploitation';
state.currentAgent = null;
await a.logPhaseTransition(activityInput, 'vulnerability-exploitation', 'complete');
// === Phase 5: Reporting ===
state.currentPhase = 'reporting';
state.currentAgent = 'report';
await a.logPhaseTransition(activityInput, 'reporting', 'start');
// First, assemble the concatenated report from exploitation evidence files
await a.assembleReportActivity(activityInput);
// Then run the report agent to add executive summary and clean up
state.agentMetrics['report'] = await a.runReportAgent(activityInput);
state.completedAgents.push('report');
await a.logPhaseTransition(activityInput, 'reporting', 'complete');
// === Complete ===
state.status = 'completed';
state.currentPhase = null;
state.currentAgent = null;
state.summary = computeSummary(state);
// Log workflow completion summary
await a.logWorkflowComplete(activityInput, {
status: 'completed',
totalDurationMs: state.summary.totalDurationMs,
totalCostUsd: state.summary.totalCostUsd,
completedAgents: state.completedAgents,
agentMetrics: Object.fromEntries(
Object.entries(state.agentMetrics).map(([name, m]) => [
name,
{ durationMs: m.durationMs, costUsd: m.costUsd },
])
),
});
return state;
} catch (error) {
state.status = 'failed';
state.failedAgent = state.currentAgent;
state.error = error instanceof Error ? error.message : String(error);
state.summary = computeSummary(state);
// Log workflow failure summary
await a.logWorkflowComplete(activityInput, {
status: 'failed',
totalDurationMs: state.summary.totalDurationMs,
totalCostUsd: state.summary.totalCostUsd,
completedAgents: state.completedAgents,
agentMetrics: Object.fromEntries(
Object.entries(state.agentMetrics).map(([name, m]) => [
name,
{ durationMs: m.durationMs, costUsd: m.costUsd },
])
),
error: state.error ?? undefined,
});
throw error;
}
}