diff --git a/src/temporal/activities.ts b/src/temporal/activities.ts index 40f9d3b..abe763d 100644 --- a/src/temporal/activities.ts +++ b/src/temporal/activities.ts @@ -56,6 +56,11 @@ import { import { loadPrompt } from '../prompts/prompt-manager.js'; import { parseConfig, distributeConfig } from '../config-parser.js'; import { classifyErrorForTemporal } from '../error-handling.js'; +import { + validateQueueAndDeliverable, + type VulnType, + type ExploitationDecision, +} from '../queue-validation.js'; import { createGitCheckpoint, commitGitSuccess, @@ -342,3 +347,38 @@ export async function assembleReportActivity(input: ActivityInput): Promise { + const { repoPath } = input; + + try { + const decision = await validateQueueAndDeliverable(vulnType, repoPath); + console.log( + chalk.blue( + `🔍 ${vulnType}: ${decision.shouldExploit ? `${decision.vulnerabilityCount} vulnerabilities found` : 'no vulnerabilities, skipping exploitation'}` + ) + ); + return decision; + } catch (error) { + // If validation fails (missing files, invalid JSON), log and skip exploitation + // This is safer than crashing - the vuln agent likely failed or found nothing + const errMsg = error instanceof Error ? error.message : String(error); + console.log(chalk.yellow(`⚠️ ${vulnType}: Queue validation failed (${errMsg}), skipping exploitation`)); + return { + shouldExploit: false, + shouldRetry: false, + vulnerabilityCount: 0, + vulnType, + }; + } +} diff --git a/src/temporal/client.ts b/src/temporal/client.ts index 73936a9..3d402f3 100644 --- a/src/temporal/client.ts +++ b/src/temporal/client.ts @@ -190,18 +190,11 @@ async function startPipeline(): Promise { clearInterval(progressInterval); console.log(chalk.green.bold('\nPipeline completed successfully!')); - console.log( - chalk.gray(`Duration: ${Math.floor((Date.now() - result.startTime) / 1000)}s`) - ); - console.log(chalk.gray(`Agents completed: ${result.completedAgents.length}`)); - - // Show cost summary if available - const totalCost = Object.values(result.agentMetrics).reduce( - (sum, m) => sum + (m.costUsd ?? 0), - 0 - ); - if (totalCost > 0) { - console.log(chalk.gray(`Total cost: $${totalCost.toFixed(4)}`)); + if (result.summary) { + console.log(chalk.gray(`Duration: ${Math.floor(result.summary.totalDurationMs / 1000)}s`)); + console.log(chalk.gray(`Agents completed: ${result.summary.agentCount}`)); + console.log(chalk.gray(`Total turns: ${result.summary.totalTurns}`)); + console.log(chalk.gray(`Total cost: $${result.summary.totalCostUsd.toFixed(4)}`)); } } catch (error) { clearInterval(progressInterval); diff --git a/src/temporal/shared.ts b/src/temporal/shared.ts index 578b3a3..e10ad33 100644 --- a/src/temporal/shared.ts +++ b/src/temporal/shared.ts @@ -19,6 +19,13 @@ export interface AgentMetrics { numTurns: number | null; } +export interface PipelineSummary { + totalCostUsd: number; + totalDurationMs: number; // Wall-clock time (end - start) + totalTurns: number; + agentCount: number; +} + export interface PipelineState { status: 'running' | 'completed' | 'failed'; currentPhase: string | null; @@ -28,6 +35,7 @@ export interface PipelineState { error: string | null; startTime: number; agentMetrics: Record; + summary: PipelineSummary | null; } // Extended state returned by getProgress query (includes computed fields) @@ -36,6 +44,18 @@ export interface PipelineProgress extends PipelineState { elapsedMs: number; } +// Result from a single vuln→exploit pipeline +export interface VulnExploitPipelineResult { + vulnType: string; + vulnMetrics: AgentMetrics | null; + exploitMetrics: AgentMetrics | null; + exploitDecision: { + shouldExploit: boolean; + vulnerabilityCount: number; + } | null; + error: string | null; +} + // === Queries === export const getProgress = defineQuery('getProgress'); diff --git a/src/temporal/workflows.ts b/src/temporal/workflows.ts index 078b5d1..0b5e384 100644 --- a/src/temporal/workflows.ts +++ b/src/temporal/workflows.ts @@ -7,11 +7,12 @@ /** * Temporal workflow for Shannon pentest pipeline. * - * Orchestrates the 5-phase penetration testing workflow: + * Orchestrates the penetration testing workflow: * 1. Pre-Reconnaissance (sequential) * 2. Reconnaissance (sequential) - * 3. Vulnerability Analysis (parallel - 5 agents) - * 4. Exploitation (parallel - 5 agents) + * 3-4. Vulnerability + Exploitation (5 pipelined pairs in parallel) + * Each pair: vuln agent → queue check → conditional exploit + * No synchronization barrier - exploits start when their vuln finishes * 5. Reporting (sequential) * * Features: @@ -19,6 +20,7 @@ * - Automatic retry with backoff for transient/billing errors * - Non-retryable classification for permanent errors * - Audit correlation via workflowId + * - Graceful failure handling: pipelines continue if one fails */ import { @@ -33,7 +35,11 @@ import { type PipelineInput, type PipelineState, type PipelineProgress, + type PipelineSummary, + type VulnExploitPipelineResult, + type AgentMetrics, } from './shared.js'; +import type { VulnType } from '../queue-validation.js'; // Retry configuration for production (long intervals for billing recovery) const PRODUCTION_RETRY = { @@ -75,6 +81,20 @@ const testActs = proxyActivities({ retry: TESTING_RETRY, }); +/** + * Compute aggregated metrics from the current pipeline state. + * Called on both success and failure to provide partial metrics. + */ +function computeSummary(state: PipelineState): PipelineSummary { + const metrics = Object.values(state.agentMetrics); + return { + totalCostUsd: metrics.reduce((sum, m) => sum + (m.costUsd ?? 0), 0), + totalDurationMs: Date.now() - state.startTime, + totalTurns: metrics.reduce((sum, m) => sum + (m.numTurns ?? 0), 0), + agentCount: state.completedAgents.length, + }; +} + export async function pentestPipelineWorkflow( input: PipelineInput ): Promise { @@ -94,6 +114,7 @@ export async function pentestPipelineWorkflow( error: null, startTime: Date.now(), agentMetrics: {}, + summary: null, }; // Register query handler for real-time progress inspection @@ -131,61 +152,112 @@ export async function pentestPipelineWorkflow( state.agentMetrics['recon'] = await a.runReconAgent(activityInput); state.completedAgents.push('recon'); - // === Phase 3: Vulnerability Analysis (Parallel) === - state.currentPhase = 'vulnerability-analysis'; - state.currentAgent = 'vuln-agents'; + // === Phases 3-4: Vulnerability Analysis + Exploitation (Pipelined) === + // Each vuln type runs as an independent pipeline: + // vuln agent → queue check → conditional exploit agent + // This eliminates the synchronization barrier between phases - each exploit + // starts immediately when its vuln agent finishes, not waiting for all. + state.currentPhase = 'vulnerability-exploitation'; + state.currentAgent = 'pipelines'; - const vulnResults = await Promise.all([ - a.runInjectionVulnAgent(activityInput), - a.runXssVulnAgent(activityInput), - a.runAuthVulnAgent(activityInput), - a.runSsrfVulnAgent(activityInput), - a.runAuthzVulnAgent(activityInput), + // Helper: Run a single vuln→exploit pipeline + async function runVulnExploitPipeline( + vulnType: VulnType, + runVulnAgent: () => Promise, + runExploitAgent: () => Promise + ): Promise { + // Step 1: Run vulnerability agent + const vulnMetrics = await runVulnAgent(); + + // Step 2: Check exploitation queue (starts immediately after vuln) + const decision = await a.checkExploitationQueue(activityInput, vulnType); + + // Step 3: Conditionally run exploit agent + let exploitMetrics: AgentMetrics | null = null; + if (decision.shouldExploit) { + exploitMetrics = await runExploitAgent(); + } + + return { + vulnType, + vulnMetrics, + exploitMetrics, + exploitDecision: { + shouldExploit: decision.shouldExploit, + vulnerabilityCount: decision.vulnerabilityCount, + }, + error: null, + }; + } + + // Run all 5 pipelines in parallel with graceful failure handling + // Promise.allSettled ensures other pipelines continue if one fails + const pipelineResults = await Promise.allSettled([ + runVulnExploitPipeline( + 'injection', + () => a.runInjectionVulnAgent(activityInput), + () => a.runInjectionExploitAgent(activityInput) + ), + runVulnExploitPipeline( + 'xss', + () => a.runXssVulnAgent(activityInput), + () => a.runXssExploitAgent(activityInput) + ), + runVulnExploitPipeline( + 'auth', + () => a.runAuthVulnAgent(activityInput), + () => a.runAuthExploitAgent(activityInput) + ), + runVulnExploitPipeline( + 'ssrf', + () => a.runSsrfVulnAgent(activityInput), + () => a.runSsrfExploitAgent(activityInput) + ), + runVulnExploitPipeline( + 'authz', + () => a.runAuthzVulnAgent(activityInput), + () => a.runAuthzExploitAgent(activityInput) + ), ]); - const vulnAgents = [ - 'injection-vuln', - 'xss-vuln', - 'auth-vuln', - 'ssrf-vuln', - 'authz-vuln', - ] as const; - for (let i = 0; i < vulnAgents.length; i++) { - const agentName = vulnAgents[i]; - const metrics = vulnResults[i]; - if (agentName && metrics) { - state.agentMetrics[agentName] = metrics; - state.completedAgents.push(agentName); + // Aggregate results from all pipelines + const failedPipelines: string[] = []; + for (const result of pipelineResults) { + if (result.status === 'fulfilled') { + const { vulnType, vulnMetrics, exploitMetrics } = result.value; + + // Record vuln agent metrics + if (vulnMetrics) { + state.agentMetrics[`${vulnType}-vuln`] = vulnMetrics; + state.completedAgents.push(`${vulnType}-vuln`); + } + + // Record exploit agent metrics (if it ran) + if (exploitMetrics) { + state.agentMetrics[`${vulnType}-exploit`] = exploitMetrics; + state.completedAgents.push(`${vulnType}-exploit`); + } + } else { + // Pipeline failed - log error but continue with others + const errorMsg = + result.reason instanceof Error + ? result.reason.message + : String(result.reason); + failedPipelines.push(errorMsg); } } - // === Phase 4: Exploitation (Parallel) === + // Log any pipeline failures (workflow continues despite failures) + if (failedPipelines.length > 0) { + console.log( + `⚠️ ${failedPipelines.length} pipeline(s) failed:`, + failedPipelines + ); + } + + // Update phase markers state.currentPhase = 'exploitation'; - state.currentAgent = 'exploit-agents'; - - const exploitResults = await Promise.all([ - a.runInjectionExploitAgent(activityInput), - a.runXssExploitAgent(activityInput), - a.runAuthExploitAgent(activityInput), - a.runSsrfExploitAgent(activityInput), - a.runAuthzExploitAgent(activityInput), - ]); - - const exploitAgents = [ - 'injection-exploit', - 'xss-exploit', - 'auth-exploit', - 'ssrf-exploit', - 'authz-exploit', - ] as const; - for (let i = 0; i < exploitAgents.length; i++) { - const agentName = exploitAgents[i]; - const metrics = exploitResults[i]; - if (agentName && metrics) { - state.agentMetrics[agentName] = metrics; - state.completedAgents.push(agentName); - } - } + state.currentAgent = null; // === Phase 5: Reporting === state.currentPhase = 'reporting'; @@ -202,11 +274,13 @@ export async function pentestPipelineWorkflow( state.status = 'completed'; state.currentPhase = null; state.currentAgent = null; + state.summary = computeSummary(state); return state; } catch (error) { state.status = 'failed'; state.failedAgent = state.currentAgent; state.error = error instanceof Error ? error.message : String(error); + state.summary = computeSummary(state); throw error; } }