refactor: pipeline vuln→exploit workflow for parallel execution

- Replace sync barrier between vuln/exploit phases with independent pipelines
- Each vuln type runs: vuln agent → queue check → conditional exploit
- Add checkExploitationQueue activity to skip exploits when no vulns found
- Use Promise.allSettled for graceful failure handling across pipelines
- Add PipelineSummary type for aggregated cost/duration/turns metrics
This commit is contained in:
ajmallesh
2026-01-13 13:08:12 -08:00
parent c12eca046c
commit eaff84b847
4 changed files with 190 additions and 63 deletions

View File

@@ -56,6 +56,11 @@ import {
import { loadPrompt } from '../prompts/prompt-manager.js';
import { parseConfig, distributeConfig } from '../config-parser.js';
import { classifyErrorForTemporal } from '../error-handling.js';
import {
validateQueueAndDeliverable,
type VulnType,
type ExploitationDecision,
} from '../queue-validation.js';
import {
createGitCheckpoint,
commitGitSuccess,
@@ -342,3 +347,38 @@ export async function assembleReportActivity(input: ActivityInput): Promise<void
// Don't throw - the report agent can still create content even if no exploitation files exist
}
}
/**
* Check if exploitation should run for a given vulnerability type.
* Reads the vulnerability queue file and returns the decision.
*
* This activity allows the workflow to skip exploit agents entirely
* when no vulnerabilities were found, saving API calls and time.
*/
export async function checkExploitationQueue(
input: ActivityInput,
vulnType: VulnType
): Promise<ExploitationDecision> {
const { repoPath } = input;
try {
const decision = await validateQueueAndDeliverable(vulnType, repoPath);
console.log(
chalk.blue(
`🔍 ${vulnType}: ${decision.shouldExploit ? `${decision.vulnerabilityCount} vulnerabilities found` : 'no vulnerabilities, skipping exploitation'}`
)
);
return decision;
} catch (error) {
// If validation fails (missing files, invalid JSON), log and skip exploitation
// This is safer than crashing - the vuln agent likely failed or found nothing
const errMsg = error instanceof Error ? error.message : String(error);
console.log(chalk.yellow(`⚠️ ${vulnType}: Queue validation failed (${errMsg}), skipping exploitation`));
return {
shouldExploit: false,
shouldRetry: false,
vulnerabilityCount: 0,
vulnType,
};
}
}

View File

@@ -190,18 +190,11 @@ async function startPipeline(): Promise<void> {
clearInterval(progressInterval);
console.log(chalk.green.bold('\nPipeline completed successfully!'));
console.log(
chalk.gray(`Duration: ${Math.floor((Date.now() - result.startTime) / 1000)}s`)
);
console.log(chalk.gray(`Agents completed: ${result.completedAgents.length}`));
// Show cost summary if available
const totalCost = Object.values(result.agentMetrics).reduce(
(sum, m) => sum + (m.costUsd ?? 0),
0
);
if (totalCost > 0) {
console.log(chalk.gray(`Total cost: $${totalCost.toFixed(4)}`));
if (result.summary) {
console.log(chalk.gray(`Duration: ${Math.floor(result.summary.totalDurationMs / 1000)}s`));
console.log(chalk.gray(`Agents completed: ${result.summary.agentCount}`));
console.log(chalk.gray(`Total turns: ${result.summary.totalTurns}`));
console.log(chalk.gray(`Total cost: $${result.summary.totalCostUsd.toFixed(4)}`));
}
} catch (error) {
clearInterval(progressInterval);

View File

@@ -19,6 +19,13 @@ export interface AgentMetrics {
numTurns: number | null;
}
export interface PipelineSummary {
totalCostUsd: number;
totalDurationMs: number; // Wall-clock time (end - start)
totalTurns: number;
agentCount: number;
}
export interface PipelineState {
status: 'running' | 'completed' | 'failed';
currentPhase: string | null;
@@ -28,6 +35,7 @@ export interface PipelineState {
error: string | null;
startTime: number;
agentMetrics: Record<string, AgentMetrics>;
summary: PipelineSummary | null;
}
// Extended state returned by getProgress query (includes computed fields)
@@ -36,6 +44,18 @@ export interface PipelineProgress extends PipelineState {
elapsedMs: number;
}
// Result from a single vuln→exploit pipeline
export interface VulnExploitPipelineResult {
vulnType: string;
vulnMetrics: AgentMetrics | null;
exploitMetrics: AgentMetrics | null;
exploitDecision: {
shouldExploit: boolean;
vulnerabilityCount: number;
} | null;
error: string | null;
}
// === Queries ===
export const getProgress = defineQuery<PipelineProgress>('getProgress');

View File

@@ -7,11 +7,12 @@
/**
* Temporal workflow for Shannon pentest pipeline.
*
* Orchestrates the 5-phase penetration testing workflow:
* Orchestrates the penetration testing workflow:
* 1. Pre-Reconnaissance (sequential)
* 2. Reconnaissance (sequential)
* 3. Vulnerability Analysis (parallel - 5 agents)
* 4. Exploitation (parallel - 5 agents)
* 3-4. Vulnerability + Exploitation (5 pipelined pairs in parallel)
* Each pair: vuln agent → queue check → conditional exploit
* No synchronization barrier - exploits start when their vuln finishes
* 5. Reporting (sequential)
*
* Features:
@@ -19,6 +20,7 @@
* - Automatic retry with backoff for transient/billing errors
* - Non-retryable classification for permanent errors
* - Audit correlation via workflowId
* - Graceful failure handling: pipelines continue if one fails
*/
import {
@@ -33,7 +35,11 @@ import {
type PipelineInput,
type PipelineState,
type PipelineProgress,
type PipelineSummary,
type VulnExploitPipelineResult,
type AgentMetrics,
} from './shared.js';
import type { VulnType } from '../queue-validation.js';
// Retry configuration for production (long intervals for billing recovery)
const PRODUCTION_RETRY = {
@@ -75,6 +81,20 @@ const testActs = proxyActivities<typeof activities>({
retry: TESTING_RETRY,
});
/**
* Compute aggregated metrics from the current pipeline state.
* Called on both success and failure to provide partial metrics.
*/
function computeSummary(state: PipelineState): PipelineSummary {
const metrics = Object.values(state.agentMetrics);
return {
totalCostUsd: metrics.reduce((sum, m) => sum + (m.costUsd ?? 0), 0),
totalDurationMs: Date.now() - state.startTime,
totalTurns: metrics.reduce((sum, m) => sum + (m.numTurns ?? 0), 0),
agentCount: state.completedAgents.length,
};
}
export async function pentestPipelineWorkflow(
input: PipelineInput
): Promise<PipelineState> {
@@ -94,6 +114,7 @@ export async function pentestPipelineWorkflow(
error: null,
startTime: Date.now(),
agentMetrics: {},
summary: null,
};
// Register query handler for real-time progress inspection
@@ -131,61 +152,112 @@ export async function pentestPipelineWorkflow(
state.agentMetrics['recon'] = await a.runReconAgent(activityInput);
state.completedAgents.push('recon');
// === Phase 3: Vulnerability Analysis (Parallel) ===
state.currentPhase = 'vulnerability-analysis';
state.currentAgent = 'vuln-agents';
// === Phases 3-4: Vulnerability Analysis + Exploitation (Pipelined) ===
// Each vuln type runs as an independent pipeline:
// vuln agent → queue check → conditional exploit agent
// This eliminates the synchronization barrier between phases - each exploit
// starts immediately when its vuln agent finishes, not waiting for all.
state.currentPhase = 'vulnerability-exploitation';
state.currentAgent = 'pipelines';
const vulnResults = await Promise.all([
a.runInjectionVulnAgent(activityInput),
a.runXssVulnAgent(activityInput),
a.runAuthVulnAgent(activityInput),
a.runSsrfVulnAgent(activityInput),
a.runAuthzVulnAgent(activityInput),
// Helper: Run a single vuln→exploit pipeline
async function runVulnExploitPipeline(
vulnType: VulnType,
runVulnAgent: () => Promise<AgentMetrics>,
runExploitAgent: () => Promise<AgentMetrics>
): Promise<VulnExploitPipelineResult> {
// Step 1: Run vulnerability agent
const vulnMetrics = await runVulnAgent();
// Step 2: Check exploitation queue (starts immediately after vuln)
const decision = await a.checkExploitationQueue(activityInput, vulnType);
// Step 3: Conditionally run exploit agent
let exploitMetrics: AgentMetrics | null = null;
if (decision.shouldExploit) {
exploitMetrics = await runExploitAgent();
}
return {
vulnType,
vulnMetrics,
exploitMetrics,
exploitDecision: {
shouldExploit: decision.shouldExploit,
vulnerabilityCount: decision.vulnerabilityCount,
},
error: null,
};
}
// Run all 5 pipelines in parallel with graceful failure handling
// Promise.allSettled ensures other pipelines continue if one fails
const pipelineResults = await Promise.allSettled([
runVulnExploitPipeline(
'injection',
() => a.runInjectionVulnAgent(activityInput),
() => a.runInjectionExploitAgent(activityInput)
),
runVulnExploitPipeline(
'xss',
() => a.runXssVulnAgent(activityInput),
() => a.runXssExploitAgent(activityInput)
),
runVulnExploitPipeline(
'auth',
() => a.runAuthVulnAgent(activityInput),
() => a.runAuthExploitAgent(activityInput)
),
runVulnExploitPipeline(
'ssrf',
() => a.runSsrfVulnAgent(activityInput),
() => a.runSsrfExploitAgent(activityInput)
),
runVulnExploitPipeline(
'authz',
() => a.runAuthzVulnAgent(activityInput),
() => a.runAuthzExploitAgent(activityInput)
),
]);
const vulnAgents = [
'injection-vuln',
'xss-vuln',
'auth-vuln',
'ssrf-vuln',
'authz-vuln',
] as const;
for (let i = 0; i < vulnAgents.length; i++) {
const agentName = vulnAgents[i];
const metrics = vulnResults[i];
if (agentName && metrics) {
state.agentMetrics[agentName] = metrics;
state.completedAgents.push(agentName);
// Aggregate results from all pipelines
const failedPipelines: string[] = [];
for (const result of pipelineResults) {
if (result.status === 'fulfilled') {
const { vulnType, vulnMetrics, exploitMetrics } = result.value;
// Record vuln agent metrics
if (vulnMetrics) {
state.agentMetrics[`${vulnType}-vuln`] = vulnMetrics;
state.completedAgents.push(`${vulnType}-vuln`);
}
// Record exploit agent metrics (if it ran)
if (exploitMetrics) {
state.agentMetrics[`${vulnType}-exploit`] = exploitMetrics;
state.completedAgents.push(`${vulnType}-exploit`);
}
} else {
// Pipeline failed - log error but continue with others
const errorMsg =
result.reason instanceof Error
? result.reason.message
: String(result.reason);
failedPipelines.push(errorMsg);
}
}
// === Phase 4: Exploitation (Parallel) ===
// Log any pipeline failures (workflow continues despite failures)
if (failedPipelines.length > 0) {
console.log(
`⚠️ ${failedPipelines.length} pipeline(s) failed:`,
failedPipelines
);
}
// Update phase markers
state.currentPhase = 'exploitation';
state.currentAgent = 'exploit-agents';
const exploitResults = await Promise.all([
a.runInjectionExploitAgent(activityInput),
a.runXssExploitAgent(activityInput),
a.runAuthExploitAgent(activityInput),
a.runSsrfExploitAgent(activityInput),
a.runAuthzExploitAgent(activityInput),
]);
const exploitAgents = [
'injection-exploit',
'xss-exploit',
'auth-exploit',
'ssrf-exploit',
'authz-exploit',
] as const;
for (let i = 0; i < exploitAgents.length; i++) {
const agentName = exploitAgents[i];
const metrics = exploitResults[i];
if (agentName && metrics) {
state.agentMetrics[agentName] = metrics;
state.completedAgents.push(agentName);
}
}
state.currentAgent = null;
// === Phase 5: Reporting ===
state.currentPhase = 'reporting';
@@ -202,11 +274,13 @@ export async function pentestPipelineWorkflow(
state.status = 'completed';
state.currentPhase = null;
state.currentAgent = null;
state.summary = computeSummary(state);
return state;
} catch (error) {
state.status = 'failed';
state.failedAgent = state.currentAgent;
state.error = error instanceof Error ? error.message : String(error);
state.summary = computeSummary(state);
throw error;
}
}