Merge pull request #140 from KeygraphHQ/feat/resume-workspace

feat: add named workspaces with resume support
This commit is contained in:
ezl-keygraph
2026-02-17 00:23:23 +05:30
committed by GitHub
13 changed files with 1059 additions and 298 deletions
+8 -4
View File
@@ -18,9 +18,14 @@ git clone https://github.com/org/repo.git ./repos/my-repo
./shannon start URL=<url> REPO=my-repo
./shannon start URL=<url> REPO=my-repo CONFIG=./configs/my-config.yaml
# Workspaces & Resume
./shannon start URL=<url> REPO=my-repo WORKSPACE=my-audit # New named workspace
./shannon start URL=<url> REPO=my-repo WORKSPACE=my-audit # Resume (same command)
./shannon start URL=<url> REPO=my-repo WORKSPACE=<auto-name> # Resume auto-named run
./shannon workspaces # List all workspaces
# Monitor
./shannon logs # Real-time worker logs
./shannon query ID=<workflow-id> # Query workflow progress
# Temporal Web UI: http://localhost:8233
# Stop
@@ -31,7 +36,7 @@ git clone https://github.com/org/repo.git ./repos/my-repo
npm run build
```
**Options:** `CONFIG=<file>` (YAML config), `OUTPUT=<path>` (default: `./audit-logs/`), `PIPELINE_TESTING=true` (minimal prompts, 10s retries), `REBUILD=true` (force Docker rebuild), `ROUTER=true` (multi-model routing via [claude-code-router](https://github.com/musistudio/claude-code-router))
**Options:** `CONFIG=<file>` (YAML config), `OUTPUT=<path>` (default: `./audit-logs/`), `WORKSPACE=<name>` (named workspace; auto-resumes if exists), `PIPELINE_TESTING=true` (minimal prompts, 10s retries), `REBUILD=true` (force Docker rebuild), `ROUTER=true` (multi-model routing via [claude-code-router](https://github.com/musistudio/claude-code-router))
## Architecture
@@ -51,8 +56,6 @@ Durable workflow orchestration with crash recovery, queryable progress, intellig
- `src/temporal/worker.ts` — Worker entry point
- `src/temporal/client.ts` — CLI client for starting workflows
- `src/temporal/shared.ts` — Types, interfaces, query definitions
- `src/temporal/query.ts` — Query tool for progress inspection
### Five-Phase Pipeline
1. **Pre-Recon** (`pre-recon`) — External scans (nmap, subfinder, whatweb) + source code analysis
@@ -67,6 +70,7 @@ Durable workflow orchestration with crash recovery, queryable progress, intellig
- **SDK Integration** — Uses `@anthropic-ai/claude-agent-sdk` with `maxTurns: 10_000` and `bypassPermissions` mode. Playwright MCP for browser automation, TOTP generation via MCP tool. Login flow template at `prompts/shared/login-instructions.txt` supports form, SSO, API, and basic auth
- **Audit System** — Crash-safe append-only logging in `audit-logs/{hostname}_{sessionId}/`. Tracks session metrics, per-agent logs, prompts, and deliverables
- **Deliverables** — Saved to `deliverables/` in the target repo via the `save_deliverable` MCP tool
- **Workspaces & Resume** — Named workspaces via `WORKSPACE=<name>` or auto-named from URL+timestamp. Resume passes `--workspace` to the Temporal client (`src/temporal/client.ts`), which loads `session.json` to detect completed agents. `loadResumeState()` in `src/temporal/activities.ts` validates deliverable existence, restores git checkpoints, and cleans up incomplete deliverables. Workspace listing via `src/temporal/workspaces.ts`
## Development Notes
+34
View File
@@ -85,6 +85,7 @@ Shannon is available in two editions:
- [Monitoring Progress](#monitoring-progress)
- [Stopping Shannon](#stopping-shannon)
- [Usage Examples](#usage-examples)
- [Workspaces and Resuming](#workspaces-and-resuming)
- [Configuration (Optional)](#configuration-optional)
- [[EXPERIMENTAL - UNSUPPORTED] Router Mode (Alternative Providers)](#experimental---unsupported-router-mode-alternative-providers)
- [Output and Results](#output-and-results)
@@ -167,8 +168,41 @@ open http://localhost:8233
# Custom output directory
./shannon start URL=https://example.com REPO=repo-name OUTPUT=./my-reports
# Named workspace
./shannon start URL=https://example.com REPO=repo-name WORKSPACE=q1-audit
# List all workspaces
./shannon workspaces
```
### Workspaces and Resuming
Shannon supports **workspaces** that allow you to resume interrupted or failed runs without re-running completed agents.
**How it works:**
- Every run creates a workspace in `audit-logs/` (auto-named by default, e.g. `example-com_shannon-1771007534808`)
- Use `WORKSPACE=<name>` to give your run a custom name for easier reference
- To resume any run, pass its workspace name via `WORKSPACE=` — Shannon detects which agents completed successfully and picks up where it left off
- Each agent's progress is checkpointed via git commits, so resumed runs start from a clean, validated state
```bash
# Start with a named workspace
./shannon start URL=https://example.com REPO=repo-name WORKSPACE=my-audit
# Resume the same workspace (skips completed agents)
./shannon start URL=https://example.com REPO=repo-name WORKSPACE=my-audit
# Resume an auto-named workspace from a previous run
./shannon start URL=https://example.com REPO=repo-name WORKSPACE=example-com_shannon-1771007534808
# List all workspaces and their status
./shannon workspaces
```
> [!NOTE]
> The `URL` must match the original workspace URL when resuming. Shannon will reject mismatched URLs to prevent cross-target contamination.
### Prepare Your Repository
Shannon expects target repositories to be placed under the `./repos/` directory at the project root. The `REPO` flag refers to a folder name inside `./repos/`. Copy the repository you want to scan into `./repos/`, or clone it directly there:
+1 -2
View File
@@ -7,8 +7,7 @@
"temporal:server": "docker compose -f docker/docker-compose.temporal.yml up temporal -d",
"temporal:server:stop": "docker compose -f docker/docker-compose.temporal.yml down",
"temporal:worker": "node dist/temporal/worker.js",
"temporal:start": "node dist/temporal/client.js",
"temporal:query": "node dist/temporal/query.js"
"temporal:start": "node dist/temporal/client.js"
},
"dependencies": {
"@anthropic-ai/claude-agent-sdk": "^0.2.38",
+33 -18
View File
@@ -41,8 +41,8 @@ show_help() {
Usage:
./shannon start URL=<url> REPO=<name> Start a pentest workflow
./shannon workspaces List all workspaces
./shannon logs ID=<workflow-id> Tail logs for a specific workflow
./shannon query ID=<workflow-id> Query workflow progress
./shannon stop Stop all containers
./shannon help Show this help message
@@ -50,6 +50,7 @@ Options for 'start':
REPO=<name> Folder name under ./repos/ (e.g. REPO=repo-name)
CONFIG=<path> Configuration file (YAML)
OUTPUT=<path> Output directory for reports (default: ./audit-logs/)
WORKSPACE=<name> Named workspace (auto-resumes if exists, creates if new)
PIPELINE_TESTING=true Use minimal prompts for fast testing
ROUTER=true Route requests through claude-code-router (multi-model support)
@@ -58,10 +59,11 @@ Options for 'stop':
Examples:
./shannon start URL=https://example.com REPO=repo-name
./shannon start URL=https://example.com REPO=repo-name WORKSPACE=q1-audit
./shannon start URL=https://example.com REPO=repo-name CONFIG=./config.yaml
./shannon start URL=https://example.com REPO=repo-name OUTPUT=./my-reports
./shannon workspaces
./shannon logs ID=example.com_shannon-1234567890
./shannon query ID=shannon-1234567890
./shannon stop CLEAN=true
Monitor workflows at http://localhost:8233
@@ -81,6 +83,7 @@ parse_args() {
PIPELINE_TESTING=*) PIPELINE_TESTING="${arg#PIPELINE_TESTING=}" ;;
REBUILD=*) REBUILD="${arg#REBUILD=}" ;;
ROUTER=*) ROUTER="${arg#ROUTER=}" ;;
WORKSPACE=*) WORKSPACE="${arg#WORKSPACE=}" ;;
esac
done
}
@@ -229,6 +232,7 @@ cmd_start() {
fi
[ "$PIPELINE_TESTING" = "true" ] && ARGS="$ARGS --pipeline-testing"
[ -n "$WORKSPACE" ] && ARGS="$ARGS --workspace $WORKSPACE"
# Run the client to submit workflow
docker compose -f "$COMPOSE_FILE" $COMPOSE_OVERRIDE exec -T worker \
@@ -253,10 +257,26 @@ cmd_logs() {
if [ -f "./audit-logs/${ID}/workflow.log" ]; then
WORKFLOW_LOG="./audit-logs/${ID}/workflow.log"
else
# Search for the workflow directory (handles custom OUTPUT paths)
FOUND=$(find . -maxdepth 3 -path "*/${ID}/workflow.log" -type f 2>/dev/null | head -1)
if [ -n "$FOUND" ]; then
WORKFLOW_LOG="$FOUND"
# For resume workflow IDs (e.g. workspace_resume_123), check the original workspace
WORKSPACE_ID="${ID%%_resume_*}"
if [ "$WORKSPACE_ID" != "$ID" ] && [ -f "./audit-logs/${WORKSPACE_ID}/workflow.log" ]; then
WORKFLOW_LOG="./audit-logs/${WORKSPACE_ID}/workflow.log"
fi
# For named workspace IDs (e.g. workspace_shannon-123), check the workspace name
if [ -z "$WORKFLOW_LOG" ]; then
WORKSPACE_ID="${ID%%_shannon-*}"
if [ "$WORKSPACE_ID" != "$ID" ] && [ -f "./audit-logs/${WORKSPACE_ID}/workflow.log" ]; then
WORKFLOW_LOG="./audit-logs/${WORKSPACE_ID}/workflow.log"
fi
fi
if [ -z "$WORKFLOW_LOG" ]; then
# Search for the workflow directory (handles custom OUTPUT paths)
FOUND=$(find . -maxdepth 3 -path "*/${ID}/workflow.log" -type f 2>/dev/null | head -1)
if [ -n "$FOUND" ]; then
WORKFLOW_LOG="$FOUND"
fi
fi
fi
@@ -270,22 +290,17 @@ cmd_logs() {
echo " - Workflow hasn't started yet"
echo " - Workflow ID is incorrect"
echo ""
echo "Check: ./shannon query ID=$ID for workflow details"
echo "Check the Temporal Web UI at http://localhost:8233 for workflow details"
exit 1
fi
}
cmd_query() {
parse_args "$@"
if [ -z "$ID" ]; then
echo "ERROR: ID is required"
echo "Usage: ./shannon query ID=<workflow-id>"
exit 1
fi
cmd_workspaces() {
# Ensure containers are running (need worker to execute node)
ensure_containers
docker compose -f "$COMPOSE_FILE" $COMPOSE_OVERRIDE exec -T worker \
node dist/temporal/query.js "$ID"
node dist/temporal/workspaces.js
}
cmd_stop() {
@@ -308,9 +323,9 @@ case "${1:-help}" in
shift
cmd_logs "$@"
;;
query)
workspaces)
shift
cmd_query "$@"
cmd_workspaces
;;
stop)
shift
+28 -2
View File
@@ -64,8 +64,10 @@ export class AuditSession {
/**
* Initialize audit session (creates directories, session.json)
* Idempotent and race-safe
*
* @param workflowId - Optional workflow ID for tracking original or resume workflows
*/
async initialize(): Promise<void> {
async initialize(workflowId?: string): Promise<void> {
if (this.initialized) {
return; // Already initialized
}
@@ -74,7 +76,7 @@ export class AuditSession {
await initializeAuditStructure(this.sessionMetadata);
// Initialize metrics tracker (loads or creates session.json)
await this.metricsTracker.initialize();
await this.metricsTracker.initialize(workflowId);
// Initialize workflow logger
await this.workflowLogger.initialize();
@@ -252,4 +254,28 @@ export class AuditSession {
await this.ensureInitialized();
await this.workflowLogger.logWorkflowComplete(summary);
}
/**
* Add a resume attempt to the session
* Call this when a workflow is resuming from an existing workspace
*
* @param workflowId - The new workflow ID for this resume attempt
* @param terminatedWorkflows - IDs of workflows that were terminated
* @param checkpointHash - Git checkpoint hash that was restored
*/
async addResumeAttempt(
workflowId: string,
terminatedWorkflows: string[],
checkpointHash?: string
): Promise<void> {
await this.ensureInitialized();
const unlock = await sessionMutex.lock(this.sessionId);
try {
await this.metricsTracker.reload();
await this.metricsTracker.addResumeAttempt(workflowId, terminatedWorkflows, checkpointHash);
} finally {
unlock();
}
}
}
+68 -3
View File
@@ -46,6 +46,13 @@ interface PhaseMetrics {
agent_count: number;
}
export interface ResumeAttempt {
workflowId: string;
timestamp: string;
terminatedPrevious?: string;
resumedFromCheckpoint?: string;
}
interface SessionData {
session: {
id: string;
@@ -54,6 +61,8 @@ interface SessionData {
status: 'in-progress' | 'completed' | 'failed';
createdAt: string;
completedAt?: string;
originalWorkflowId?: string; // First workflow that created this workspace
resumeAttempts?: ResumeAttempt[]; // Track all resume attempts
};
metrics: {
total_duration_ms: number;
@@ -95,8 +104,10 @@ export class MetricsTracker {
/**
* Initialize session.json (idempotent)
*
* @param workflowId - Optional workflow ID to set as originalWorkflowId for new sessions
*/
async initialize(): Promise<void> {
async initialize(workflowId?: string): Promise<void> {
// Check if session.json already exists
const exists = await fileExists(this.sessionJsonPath);
@@ -105,21 +116,24 @@ export class MetricsTracker {
this.data = await readJson<SessionData>(this.sessionJsonPath);
} else {
// Create new session.json
this.data = this.createInitialData();
this.data = this.createInitialData(workflowId);
await this.save();
}
}
/**
* Create initial session.json structure
*
* @param workflowId - Optional workflow ID to set as originalWorkflowId
*/
private createInitialData(): SessionData {
private createInitialData(workflowId?: string): SessionData {
const sessionData: SessionData = {
session: {
id: this.sessionMetadata.id,
webUrl: this.sessionMetadata.webUrl,
status: 'in-progress',
createdAt: (this.sessionMetadata as { createdAt?: string }).createdAt || formatTimestamp(),
resumeAttempts: [],
},
metrics: {
total_duration_ms: 0,
@@ -128,6 +142,12 @@ export class MetricsTracker {
agents: {}, // Agent-level metrics
},
};
// Set originalWorkflowId if provided (for new workspaces)
if (workflowId) {
sessionData.session.originalWorkflowId = workflowId;
}
// Only add repoPath if it exists
if (this.sessionMetadata.repoPath) {
sessionData.session.repoPath = this.sessionMetadata.repoPath;
@@ -229,6 +249,51 @@ export class MetricsTracker {
await this.save();
}
/**
* Add a resume attempt to the session
*
* @param workflowId - The new workflow ID for this resume attempt
* @param terminatedWorkflows - IDs of workflows that were terminated
* @param checkpointHash - Git checkpoint hash that was restored
*/
async addResumeAttempt(
workflowId: string,
terminatedWorkflows: string[],
checkpointHash?: string
): Promise<void> {
if (!this.data) {
throw new Error('MetricsTracker not initialized');
}
// Ensure originalWorkflowId is set (backfill if missing from old sessions)
if (!this.data.session.originalWorkflowId) {
this.data.session.originalWorkflowId = this.data.session.id;
}
// Ensure resumeAttempts array exists
if (!this.data.session.resumeAttempts) {
this.data.session.resumeAttempts = [];
}
// Add new resume attempt
const resumeAttempt: ResumeAttempt = {
workflowId,
timestamp: formatTimestamp(),
};
if (terminatedWorkflows.length > 0) {
resumeAttempt.terminatedPrevious = terminatedWorkflows.join(',');
}
if (checkpointHash) {
resumeAttempt.resumedFromCheckpoint = checkpointHash;
}
this.data.session.resumeAttempts.push(resumeAttempt);
await this.save();
}
/**
* Recalculate aggregations (total duration, total cost, phases)
*/
+284 -26
View File
@@ -72,9 +72,14 @@ import { getPromptNameForAgent } from '../types/agents.js';
import { AuditSession } from '../audit/index.js';
import type { WorkflowSummary } from '../audit/workflow-logger.js';
import type { AgentName } from '../types/agents.js';
import type { AgentMetrics } from './shared.js';
import { getDeliverablePath, ALL_AGENTS } from '../types/agents.js';
import type { AgentMetrics, ResumeState } from './shared.js';
import type { DistributedConfig } from '../types/config.js';
import { copyDeliverablesToAudit, type SessionMetadata } from '../audit/utils.js';
import { copyDeliverablesToAudit, type SessionMetadata, readJson, fileExists } from '../audit/utils.js';
import type { ResumeAttempt } from '../audit/metrics-tracker.js';
import { executeGitCommandWithRetry } from '../utils/git-manager.js';
import path from 'path';
import fs from 'fs/promises';
const HEARTBEAT_INTERVAL_MS = 2000; // Must be < heartbeatTimeout (10min production, 5min testing)
@@ -89,6 +94,7 @@ export interface ActivityInput {
outputPath?: string;
pipelineTestingMode?: boolean;
workflowId: string;
sessionId: string; // Workspace name (for resume) or workflowId (for new runs)
}
/**
@@ -142,8 +148,9 @@ async function runAgentActivity(
}
// 2. Build session metadata for audit
// Use sessionId (workspace name) for directory, workflowId for tracking
const sessionMetadata: SessionMetadata = {
id: workflowId,
id: input.sessionId,
webUrl,
repoPath,
...(outputPath && { outputPath }),
@@ -151,7 +158,7 @@ async function runAgentActivity(
// 3. Initialize audit session (idempotent, safe across retries)
const auditSession = new AuditSession(sessionMetadata);
await auditSession.initialize();
await auditSession.initialize(workflowId);
// 4. Load prompt
const promptName = getPromptNameForAgent(agentName);
@@ -239,7 +246,8 @@ async function runAgentActivity(
throw new Error(`Agent ${agentName} failed output validation`);
}
// 9. Success - commit and log
// 9. Success - commit deliverables, then capture checkpoint hash
await commitGitSuccess(repoPath, agentName);
const commitHash = await getGitCommitHash(repoPath);
await auditSession.endAgent(agentName, {
attemptNumber,
@@ -249,14 +257,6 @@ async function runAgentActivity(
model: result.model,
...(commitHash && { checkpoint: commitHash }),
});
await commitGitSuccess(repoPath, agentName);
// 9.5. Copy deliverables to audit-logs (non-fatal)
try {
await copyDeliverablesToAudit(sessionMetadata, repoPath);
} catch (copyErr) {
console.error(`Failed to copy deliverables to audit-logs for ${agentName}:`, copyErr);
}
// 10. Return metrics
return {
@@ -386,13 +386,12 @@ export async function assembleReportActivity(input: ActivityInput): Promise<void
* This must be called AFTER runReportAgent to add the model information to the Executive Summary.
*/
export async function injectReportMetadataActivity(input: ActivityInput): Promise<void> {
const { repoPath, outputPath } = input;
if (!outputPath) {
console.log(chalk.yellow('⚠️ No output path provided, skipping model injection'));
return;
}
const { repoPath, sessionId, outputPath } = input;
const effectiveOutputPath = outputPath
? path.join(outputPath, sessionId)
: path.join('./audit-logs', sessionId);
try {
await injectModelIntoReport(repoPath, outputPath);
await injectModelIntoReport(repoPath, effectiveOutputPath);
} catch (error) {
const err = error as Error;
console.log(chalk.yellow(`⚠️ Error injecting model into report: ${err.message}`));
@@ -449,6 +448,227 @@ export async function checkExploitationQueue(
};
}
// === Resume Activities ===
/**
* Session.json structure for resume state loading
*/
interface SessionJson {
session: {
id: string;
webUrl: string;
repoPath?: string;
originalWorkflowId?: string;
resumeAttempts?: ResumeAttempt[];
};
metrics: {
agents: Record<string, {
status: 'in-progress' | 'success' | 'failed';
checkpoint?: string;
}>;
};
}
/**
* Load resume state from an existing workspace.
* Validates workspace exists, URL matches, and determines which agents to skip.
*
* @throws ApplicationFailure.nonRetryable if workspace not found or URL mismatch
*/
export async function loadResumeState(
workspaceName: string,
expectedUrl: string,
expectedRepoPath: string
): Promise<ResumeState> {
const sessionPath = path.join('./audit-logs', workspaceName, 'session.json');
// Validate workspace exists
const exists = await fileExists(sessionPath);
if (!exists) {
throw ApplicationFailure.nonRetryable(
`Workspace not found: ${workspaceName}\nExpected path: ${sessionPath}`,
'WorkspaceNotFoundError'
);
}
// Load session.json
let session: SessionJson;
try {
session = await readJson<SessionJson>(sessionPath);
} catch (error) {
const errorMsg = error instanceof Error ? error.message : String(error);
throw ApplicationFailure.nonRetryable(
`Corrupted session.json in workspace ${workspaceName}: ${errorMsg}`,
'CorruptedSessionError'
);
}
// Validate URL matches
if (session.session.webUrl !== expectedUrl) {
throw ApplicationFailure.nonRetryable(
`URL mismatch with workspace\n Workspace URL: ${session.session.webUrl}\n Provided URL: ${expectedUrl}`,
'URLMismatchError'
);
}
// Find completed agents (status === 'success' AND deliverable exists)
const completedAgents: string[] = [];
const agents = session.metrics.agents;
for (const agentName of ALL_AGENTS) {
const agentData = agents[agentName];
// Skip if agent never ran or didn't succeed
if (!agentData || agentData.status !== 'success') {
continue;
}
// Validate deliverable exists
const deliverablePath = getDeliverablePath(agentName, expectedRepoPath);
const deliverableExists = await fileExists(deliverablePath);
if (!deliverableExists) {
console.log(
chalk.yellow(`Agent ${agentName} shows success but deliverable missing, will re-run`)
);
continue;
}
// Agent completed successfully and deliverable exists
completedAgents.push(agentName);
}
// Find latest checkpoint from completed agents
const checkpoints = completedAgents
.map((name) => agents[name]?.checkpoint)
.filter((hash): hash is string => hash != null);
if (checkpoints.length === 0) {
const successAgents = Object.entries(agents)
.filter(([, data]) => data.status === 'success')
.map(([name]) => name);
throw ApplicationFailure.nonRetryable(
`Cannot resume workspace ${workspaceName}: ` +
(successAgents.length > 0
? `${successAgents.length} agent(s) show success in session.json (${successAgents.join(', ')}) ` +
`but their deliverable files are missing from disk. ` +
`Start a fresh run instead.`
: `No agents completed successfully. Start a fresh run instead.`),
'NoCheckpointsError'
);
}
// Find most recent commit among checkpoints
const checkpointHash = await findLatestCommit(expectedRepoPath, checkpoints);
const originalWorkflowId = session.session.originalWorkflowId || session.session.id;
console.log(chalk.cyan(`=== RESUME STATE ===`));
console.log(`Workspace: ${workspaceName}`);
console.log(`Completed agents: ${completedAgents.length}`);
console.log(`Checkpoint: ${checkpointHash}`);
return {
workspaceName,
originalUrl: session.session.webUrl,
completedAgents,
checkpointHash,
originalWorkflowId,
};
}
/**
* Find the most recent commit among a list of commit hashes.
* Uses git rev-list to determine which commit is newest.
*/
async function findLatestCommit(repoPath: string, commitHashes: string[]): Promise<string> {
if (commitHashes.length === 1) {
const hash = commitHashes[0];
if (!hash) {
throw new Error('Empty commit hash in array');
}
return hash;
}
// Use git rev-list to find the most recent commit among all hashes
const result = await executeGitCommandWithRetry(
['git', 'rev-list', '--max-count=1', ...commitHashes],
repoPath,
'find latest commit'
);
return result.stdout.trim();
}
/**
* Restore git workspace to a checkpoint and clean up partial deliverables.
*
* @param repoPath - Repository path
* @param checkpointHash - Git commit hash to reset to
* @param incompleteAgents - Agents that didn't complete (will have deliverables cleaned up)
*/
export async function restoreGitCheckpoint(
repoPath: string,
checkpointHash: string,
incompleteAgents: AgentName[]
): Promise<void> {
console.log(chalk.blue(`Restoring git workspace to ${checkpointHash}...`));
// Checkpoint hash points to the success commit (after commitGitSuccess),
// so git reset --hard naturally preserves all completed agent deliverables.
await executeGitCommandWithRetry(
['git', 'reset', '--hard', checkpointHash],
repoPath,
'reset to checkpoint for resume'
);
await executeGitCommandWithRetry(
['git', 'clean', '-fd'],
repoPath,
'clean untracked files for resume'
);
// Clean up any partial deliverables from incomplete agents
for (const agentName of incompleteAgents) {
const deliverablePath = getDeliverablePath(agentName, repoPath);
try {
const exists = await fileExists(deliverablePath);
if (exists) {
console.log(chalk.yellow(`Cleaning partial deliverable: ${agentName}`));
await fs.unlink(deliverablePath);
}
} catch (error) {
console.log(chalk.gray(`Note: Failed to delete ${deliverablePath}: ${error}`));
}
}
console.log(chalk.green('Workspace restored to clean state'));
}
/**
* Record a resume attempt in session.json.
* Tracks the new workflow ID, terminated workflows, and checkpoint hash.
*/
export async function recordResumeAttempt(
input: ActivityInput,
terminatedWorkflows: string[],
checkpointHash: string
): Promise<void> {
const { webUrl, repoPath, outputPath, sessionId, workflowId } = input;
const sessionMetadata: SessionMetadata = {
id: sessionId,
webUrl,
repoPath,
...(outputPath && { outputPath }),
};
const auditSession = new AuditSession(sessionMetadata);
await auditSession.initialize();
await auditSession.addResumeAttempt(workflowId, terminatedWorkflows, checkpointHash);
}
/**
* Log phase transition to the unified workflow log.
* Called at phase boundaries for per-workflow logging.
@@ -458,17 +678,17 @@ export async function logPhaseTransition(
phase: string,
event: 'start' | 'complete'
): Promise<void> {
const { webUrl, repoPath, outputPath, workflowId } = input;
const { webUrl, repoPath, outputPath, sessionId, workflowId } = input;
const sessionMetadata: SessionMetadata = {
id: workflowId,
id: sessionId,
webUrl,
repoPath,
...(outputPath && { outputPath }),
};
const auditSession = new AuditSession(sessionMetadata);
await auditSession.initialize();
await auditSession.initialize(workflowId);
if (event === 'start') {
await auditSession.logPhaseStart(phase);
@@ -485,16 +705,54 @@ export async function logWorkflowComplete(
input: ActivityInput,
summary: WorkflowSummary
): Promise<void> {
const { webUrl, repoPath, outputPath, workflowId } = input;
const { webUrl, repoPath, outputPath, sessionId, workflowId } = input;
const sessionMetadata: SessionMetadata = {
id: workflowId,
id: sessionId,
webUrl,
repoPath,
...(outputPath && { outputPath }),
};
const auditSession = new AuditSession(sessionMetadata);
await auditSession.initialize();
await auditSession.logWorkflowComplete(summary);
await auditSession.initialize(workflowId);
await auditSession.updateSessionStatus(summary.status);
// Use cumulative metrics from session.json (includes all resume attempts)
const sessionData = await auditSession.getMetrics() as {
metrics: {
total_duration_ms: number;
total_cost_usd: number;
agents: Record<string, { final_duration_ms: number; total_cost_usd: number }>;
};
};
// Fill in metrics for skipped agents (completed in previous runs)
const agentMetrics = { ...summary.agentMetrics };
for (const agentName of summary.completedAgents) {
if (!agentMetrics[agentName]) {
const agentData = sessionData.metrics.agents[agentName];
if (agentData) {
agentMetrics[agentName] = {
durationMs: agentData.final_duration_ms,
costUsd: agentData.total_cost_usd,
};
}
}
}
const cumulativeSummary: WorkflowSummary = {
...summary,
totalDurationMs: sessionData.metrics.total_duration_ms,
totalCostUsd: sessionData.metrics.total_cost_usd,
agentMetrics,
};
await auditSession.logWorkflowComplete(cumulativeSummary);
// Copy all deliverables to audit-logs once at workflow end (non-fatal)
try {
await copyDeliverablesToAudit(sessionMetadata, repoPath);
} catch (copyErr) {
console.error('Failed to copy deliverables to audit-logs:', copyErr);
}
}
+165 -7
View File
@@ -26,19 +26,97 @@
* TEMPORAL_ADDRESS - Temporal server address (default: localhost:7233)
*/
import { Connection, Client } from '@temporalio/client';
import { Connection, Client, WorkflowNotFoundError } from '@temporalio/client';
import dotenv from 'dotenv';
import chalk from 'chalk';
import { displaySplashScreen } from '../splash-screen.js';
import { sanitizeHostname } from '../audit/utils.js';
import { readJson, fileExists } from '../audit/utils.js';
import path from 'path';
// Import types only - these don't pull in workflow runtime code
import type { PipelineInput, PipelineState, PipelineProgress } from './shared.js';
/**
* Session.json structure for resume validation
*/
interface SessionJson {
session: {
id: string;
webUrl: string;
originalWorkflowId?: string;
resumeAttempts?: Array<{ workflowId: string }>;
};
metrics: {
total_cost_usd: number;
};
}
dotenv.config();
// Query name must match the one defined in workflows.ts
const PROGRESS_QUERY = 'getProgress';
/**
* Terminate any running workflows associated with a workspace.
* Returns the list of terminated workflow IDs.
*/
async function terminateExistingWorkflows(
client: Client,
workspaceName: string
): Promise<string[]> {
const sessionPath = path.join('./audit-logs', workspaceName, 'session.json');
if (!(await fileExists(sessionPath))) {
throw new Error(
`Workspace not found: ${workspaceName}\n` +
`Expected path: ${sessionPath}`
);
}
const session = await readJson<SessionJson>(sessionPath);
// Collect all workflow IDs associated with this workspace
const workflowIds = [
session.session.originalWorkflowId || session.session.id,
...(session.session.resumeAttempts?.map((r) => r.workflowId) || []),
].filter((id): id is string => id != null);
const terminated: string[] = [];
for (const wfId of workflowIds) {
try {
const handle = client.workflow.getHandle(wfId);
const description = await handle.describe();
if (description.status.name === 'RUNNING') {
console.log(chalk.yellow(`Terminating running workflow: ${wfId}`));
await handle.terminate('Superseded by resume workflow');
terminated.push(wfId);
console.log(chalk.green(`Terminated: ${wfId}`));
} else {
console.log(chalk.gray(`Workflow already ${description.status.name}: ${wfId}`));
}
} catch (error) {
if (error instanceof WorkflowNotFoundError) {
console.log(chalk.gray(`Workflow not found (already cleaned up): ${wfId}`));
} else {
console.log(chalk.red(`Failed to terminate ${wfId}: ${error}`));
// Continue anyway - don't block resume on termination failure
}
}
}
return terminated;
}
/**
* Validate workspace name: alphanumeric, hyphens, underscores, 1-128 chars,
* must start with alphanumeric.
*/
function isValidWorkspaceName(name: string): boolean {
return /^[a-zA-Z0-9][a-zA-Z0-9_-]{0,127}$/.test(name);
}
function showUsage(): void {
console.log(chalk.cyan.bold('\nShannon Temporal Client'));
console.log(chalk.gray('Start a pentest pipeline workflow\n'));
@@ -50,6 +128,7 @@ function showUsage(): void {
console.log(' --config <path> Configuration file path');
console.log(' --output <path> Output directory for audit logs');
console.log(' --pipeline-testing Use minimal prompts for fast testing');
console.log(' --workspace <name> Resume from existing workspace');
console.log(
' --workflow-id <id> Custom workflow ID (default: shannon-<timestamp>)'
);
@@ -78,6 +157,7 @@ async function startPipeline(): Promise<void> {
let pipelineTestingMode = false;
let customWorkflowId: string | undefined;
let waitForCompletion = false;
let resumeFromWorkspace: string | undefined;
for (let i = 0; i < args.length; i++) {
const arg = args[i];
@@ -107,6 +187,12 @@ async function startPipeline(): Promise<void> {
}
} else if (arg === '--pipeline-testing') {
pipelineTestingMode = true;
} else if (arg === '--workspace') {
const nextArg = args[i + 1];
if (nextArg && !nextArg.startsWith('-')) {
resumeFromWorkspace = nextArg;
i++;
}
} else if (arg === '--wait') {
waitForCompletion = true;
} else if (arg && !arg.startsWith('-')) {
@@ -134,26 +220,87 @@ async function startPipeline(): Promise<void> {
const client = new Client({ connection });
try {
const hostname = sanitizeHostname(webUrl);
const workflowId = customWorkflowId || `${hostname}_shannon-${Date.now()}`;
let terminatedWorkflows: string[] = [];
let workflowId: string;
let sessionId: string; // Workspace name (persistent directory)
let isResume = false;
if (resumeFromWorkspace) {
const sessionPath = path.join('./audit-logs', resumeFromWorkspace, 'session.json');
const workspaceExists = await fileExists(sessionPath);
if (workspaceExists) {
// === Resume Mode: existing workspace ===
isResume = true;
console.log(chalk.cyan('=== RESUME MODE ==='));
console.log(`Workspace: ${resumeFromWorkspace}\n`);
// Terminate any running workflows for this workspace
terminatedWorkflows = await terminateExistingWorkflows(client, resumeFromWorkspace);
if (terminatedWorkflows.length > 0) {
console.log(chalk.yellow(`Terminated ${terminatedWorkflows.length} previous workflow(s)\n`));
}
// Validate URL matches workspace
const session = await readJson<SessionJson>(sessionPath);
if (session.session.webUrl !== webUrl) {
console.error(chalk.red('ERROR: URL mismatch with workspace'));
console.error(` Workspace URL: ${session.session.webUrl}`);
console.error(` Provided URL: ${webUrl}`);
process.exit(1);
}
// Generate resume workflow ID
workflowId = `${resumeFromWorkspace}_resume_${Date.now()}`;
sessionId = resumeFromWorkspace;
} else {
// === New Named Workspace ===
if (!isValidWorkspaceName(resumeFromWorkspace)) {
console.error(chalk.red(`ERROR: Invalid workspace name: "${resumeFromWorkspace}"`));
console.error(chalk.gray(' Must be 1-128 characters, alphanumeric/hyphens/underscores, starting with alphanumeric'));
process.exit(1);
}
console.log(chalk.cyan('=== NEW NAMED WORKSPACE ==='));
console.log(`Workspace: ${resumeFromWorkspace}\n`);
workflowId = `${resumeFromWorkspace}_shannon-${Date.now()}`;
sessionId = resumeFromWorkspace;
}
} else {
// === New Auto-Named Workflow ===
const hostname = sanitizeHostname(webUrl);
workflowId = customWorkflowId || `${hostname}_shannon-${Date.now()}`;
sessionId = workflowId;
}
const input: PipelineInput = {
webUrl,
repoPath,
workflowId, // Add for audit correlation
sessionId, // Workspace directory name
...(configPath && { configPath }),
...(outputPath && { outputPath }),
...(pipelineTestingMode && { pipelineTestingMode }),
...(isResume && resumeFromWorkspace && { resumeFromWorkspace }),
...(terminatedWorkflows.length > 0 && { terminatedWorkflows }),
};
// Determine output directory for display
// Determine output directory for display (use sessionId for persistent directory)
// Use displayOutputPath (host path) if provided, otherwise fall back to outputPath or default
const effectiveDisplayPath = displayOutputPath || outputPath || './audit-logs';
const outputDir = `${effectiveDisplayPath}/${workflowId}`;
const outputDir = `${effectiveDisplayPath}/${sessionId}`;
console.log(chalk.green.bold(`✓ Workflow started: ${workflowId}`));
if (isResume) {
console.log(chalk.gray(` (Resuming workspace: ${sessionId})`));
}
console.log();
console.log(chalk.white(' Target: ') + chalk.cyan(webUrl));
console.log(chalk.white(' Repository: ') + chalk.cyan(repoPath));
console.log(chalk.white(' Workspace: ') + chalk.cyan(sessionId));
if (configPath) {
console.log(chalk.white(' Config: ') + chalk.cyan(configPath));
}
@@ -179,7 +326,6 @@ async function startPipeline(): Promise<void> {
console.log(chalk.bold('Monitor progress:'));
console.log(chalk.white(' Web UI: ') + chalk.blue(`http://localhost:8233/namespaces/default/workflows/${workflowId}`));
console.log(chalk.white(' Logs: ') + chalk.gray(`./shannon logs ID=${workflowId}`));
console.log(chalk.white(' Query: ') + chalk.gray(`./shannon query ID=${workflowId}`));
console.log();
console.log(chalk.bold('Output:'));
console.log(chalk.white(' Reports: ') + chalk.cyan(outputDir));
@@ -212,7 +358,19 @@ async function startPipeline(): Promise<void> {
console.log(chalk.gray(`Duration: ${Math.floor(result.summary.totalDurationMs / 1000)}s`));
console.log(chalk.gray(`Agents completed: ${result.summary.agentCount}`));
console.log(chalk.gray(`Total turns: ${result.summary.totalTurns}`));
console.log(chalk.gray(`Total cost: $${result.summary.totalCostUsd.toFixed(4)}`));
console.log(chalk.gray(`Run cost: $${result.summary.totalCostUsd.toFixed(4)}`));
// Show cumulative cost from session.json (includes all resume attempts)
if (isResume) {
try {
const session = await readJson<SessionJson>(
path.join('./audit-logs', sessionId, 'session.json')
);
console.log(chalk.gray(`Cumulative cost: $${session.metrics.total_cost_usd.toFixed(4)}`));
} catch {
// Non-fatal, skip cumulative cost display
}
}
}
} catch (error) {
clearInterval(progressInterval);
-158
View File
@@ -1,158 +0,0 @@
#!/usr/bin/env node
// Copyright (C) 2025 Keygraph, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License version 3
// as published by the Free Software Foundation.
/**
* Temporal query tool for inspecting Shannon workflow progress.
*
* Queries a running or completed workflow and displays its state.
*
* Usage:
* npm run temporal:query -- <workflowId>
* # or
* node dist/temporal/query.js <workflowId>
*
* Environment:
* TEMPORAL_ADDRESS - Temporal server address (default: localhost:7233)
*/
import { Connection, Client } from '@temporalio/client';
import dotenv from 'dotenv';
import chalk from 'chalk';
dotenv.config();
// Query name must match the one defined in workflows.ts
const PROGRESS_QUERY = 'getProgress';
// Types duplicated from shared.ts to avoid importing workflow APIs
interface AgentMetrics {
durationMs: number;
inputTokens: number | null;
outputTokens: number | null;
costUsd: number | null;
numTurns: number | null;
model?: string | undefined;
}
interface PipelineProgress {
status: 'running' | 'completed' | 'failed';
currentPhase: string | null;
currentAgent: string | null;
completedAgents: string[];
failedAgent: string | null;
error: string | null;
startTime: number;
agentMetrics: Record<string, AgentMetrics>;
workflowId: string;
elapsedMs: number;
}
function showUsage(): void {
console.log(chalk.cyan.bold('\nShannon Temporal Query Tool'));
console.log(chalk.gray('Query progress of a running workflow\n'));
console.log(chalk.yellow('Usage:'));
console.log(' node dist/temporal/query.js <workflowId>\n');
console.log(chalk.yellow('Examples:'));
console.log(' node dist/temporal/query.js shannon-1704672000000\n');
}
function getStatusColor(status: string): string {
switch (status) {
case 'running':
return chalk.yellow(status);
case 'completed':
return chalk.green(status);
case 'failed':
return chalk.red(status);
default:
return status;
}
}
function formatDuration(ms: number): string {
const seconds = Math.floor(ms / 1000);
const minutes = Math.floor(seconds / 60);
const hours = Math.floor(minutes / 60);
if (hours > 0) {
return `${hours}h ${minutes % 60}m`;
} else if (minutes > 0) {
return `${minutes}m ${seconds % 60}s`;
}
return `${seconds}s`;
}
async function queryWorkflow(): Promise<void> {
const workflowId = process.argv[2];
if (!workflowId || workflowId === '--help' || workflowId === '-h') {
showUsage();
process.exit(workflowId ? 0 : 1);
}
const address = process.env.TEMPORAL_ADDRESS || 'localhost:7233';
const connection = await Connection.connect({ address });
const client = new Client({ connection });
try {
const handle = client.workflow.getHandle(workflowId);
const progress = await handle.query<PipelineProgress>(PROGRESS_QUERY);
console.log(chalk.cyan.bold('\nWorkflow Progress'));
console.log(chalk.gray('\u2500'.repeat(40)));
console.log(`${chalk.white('Workflow ID:')} ${progress.workflowId}`);
console.log(`${chalk.white('Status:')} ${getStatusColor(progress.status)}`);
console.log(
`${chalk.white('Current Phase:')} ${progress.currentPhase || 'none'}`
);
console.log(
`${chalk.white('Current Agent:')} ${progress.currentAgent || 'none'}`
);
console.log(`${chalk.white('Elapsed:')} ${formatDuration(progress.elapsedMs)}`);
console.log(
`${chalk.white('Completed:')} ${progress.completedAgents.length}/13 agents`
);
if (progress.completedAgents.length > 0) {
console.log(chalk.gray('\nCompleted agents:'));
for (const agent of progress.completedAgents) {
const metrics = progress.agentMetrics[agent];
const duration = metrics ? formatDuration(metrics.durationMs) : 'unknown';
const cost = metrics?.costUsd ? `$${metrics.costUsd.toFixed(4)}` : '';
const model = metrics?.model ? ` [${metrics.model}]` : '';
console.log(
chalk.green(` - ${agent}`) +
chalk.blue(model) +
chalk.gray(` (${duration}${cost ? ', ' + cost : ''})`)
);
}
}
if (progress.error) {
console.log(chalk.red(`\nError: ${progress.error}`));
console.log(chalk.red(`Failed agent: ${progress.failedAgent}`));
}
console.log();
} catch (error) {
const err = error as Error;
if (err.message?.includes('not found')) {
console.log(chalk.red(`Workflow not found: ${workflowId}`));
} else {
console.error(chalk.red('Query failed:'), err.message);
}
process.exit(1);
} finally {
await connection.close();
}
}
queryWorkflow().catch((err) => {
console.error(chalk.red('Query error:'), err);
process.exit(1);
});
+11
View File
@@ -9,6 +9,17 @@ export interface PipelineInput {
outputPath?: string;
pipelineTestingMode?: boolean;
workflowId?: string; // Added by client, used for audit correlation
sessionId?: string; // Workspace directory name (distinct from workflowId for named workspaces)
resumeFromWorkspace?: string; // Workspace name to resume from
terminatedWorkflows?: string[]; // Workflows terminated during resume
}
export interface ResumeState {
workspaceName: string;
originalUrl: string;
completedAgents: string[];
checkpointHash: string;
originalWorkflowId: string;
}
export interface AgentMetrics {
+192 -64
View File
@@ -38,8 +38,11 @@ import {
type PipelineSummary,
type VulnExploitPipelineResult,
type AgentMetrics,
type ResumeState,
} from './shared.js';
import type { VulnType } from '../queue-validation.js';
import type { AgentName } from '../types/agents.js';
import { ALL_AGENTS } from '../types/agents.js';
// Retry configuration for production (long intervals for billing recovery)
const PRODUCTION_RETRY = {
@@ -127,10 +130,14 @@ export async function pentestPipelineWorkflow(
// Build ActivityInput with required workflowId for audit correlation
// Activities require workflowId (non-optional), PipelineInput has it optional
// Use spread to conditionally include optional properties (exactOptionalPropertyTypes)
// sessionId is workspace name for resume, or workflowId for new runs
const sessionId = input.sessionId || input.resumeFromWorkspace || workflowId;
const activityInput: ActivityInput = {
webUrl: input.webUrl,
repoPath: input.repoPath,
workflowId,
sessionId,
...(input.configPath !== undefined && { configPath: input.configPath }),
...(input.outputPath !== undefined && { outputPath: input.outputPath }),
...(input.pipelineTestingMode !== undefined && {
@@ -138,23 +145,79 @@ export async function pentestPipelineWorkflow(
}),
};
// === RESUME LOGIC ===
let resumeState: ResumeState | null = null;
if (input.resumeFromWorkspace) {
// Load resume state from existing workspace
resumeState = await a.loadResumeState(
input.resumeFromWorkspace,
input.webUrl,
input.repoPath
);
// Restore git checkpoint and clean up partial deliverables
const incompleteAgents = ALL_AGENTS.filter(
(agentName) => !resumeState!.completedAgents.includes(agentName)
) as AgentName[];
await a.restoreGitCheckpoint(
input.repoPath,
resumeState.checkpointHash,
incompleteAgents
);
// Check if all agents are already complete
if (resumeState.completedAgents.length === ALL_AGENTS.length) {
console.log(`All ${ALL_AGENTS.length} agents already completed. Nothing to resume.`);
state.status = 'completed';
state.completedAgents = [...resumeState.completedAgents];
state.summary = computeSummary(state);
return state;
}
// Record resume attempt in session.json
await a.recordResumeAttempt(
activityInput,
input.terminatedWorkflows || [],
resumeState.checkpointHash
);
console.log('Resume state loaded and workspace restored');
}
// Helper to check if an agent should be skipped
const shouldSkip = (agentName: string): boolean => {
return resumeState?.completedAgents.includes(agentName) ?? false;
};
try {
// === Phase 1: Pre-Reconnaissance ===
state.currentPhase = 'pre-recon';
state.currentAgent = 'pre-recon';
await a.logPhaseTransition(activityInput, 'pre-recon', 'start');
state.agentMetrics['pre-recon'] =
await a.runPreReconAgent(activityInput);
state.completedAgents.push('pre-recon');
await a.logPhaseTransition(activityInput, 'pre-recon', 'complete');
if (!shouldSkip('pre-recon')) {
state.currentPhase = 'pre-recon';
state.currentAgent = 'pre-recon';
await a.logPhaseTransition(activityInput, 'pre-recon', 'start');
state.agentMetrics['pre-recon'] =
await a.runPreReconAgent(activityInput);
state.completedAgents.push('pre-recon');
await a.logPhaseTransition(activityInput, 'pre-recon', 'complete');
} else {
console.log('Skipping pre-recon (already complete)');
state.completedAgents.push('pre-recon');
}
// === Phase 2: Reconnaissance ===
state.currentPhase = 'recon';
state.currentAgent = 'recon';
await a.logPhaseTransition(activityInput, 'recon', 'start');
state.agentMetrics['recon'] = await a.runReconAgent(activityInput);
state.completedAgents.push('recon');
await a.logPhaseTransition(activityInput, 'recon', 'complete');
if (!shouldSkip('recon')) {
state.currentPhase = 'recon';
state.currentAgent = 'recon';
await a.logPhaseTransition(activityInput, 'recon', 'start');
state.agentMetrics['recon'] = await a.runReconAgent(activityInput);
state.completedAgents.push('recon');
await a.logPhaseTransition(activityInput, 'recon', 'complete');
} else {
console.log('Skipping recon (already complete)');
state.completedAgents.push('recon');
}
// === Phases 3-4: Vulnerability Analysis + Exploitation (Pipelined) ===
// Each vuln type runs as an independent pipeline:
@@ -165,22 +228,34 @@ export async function pentestPipelineWorkflow(
state.currentAgent = 'pipelines';
await a.logPhaseTransition(activityInput, 'vulnerability-exploitation', 'start');
// Helper: Run a single vuln→exploit pipeline
// Helper: Run a single vuln→exploit pipeline with skip logic
async function runVulnExploitPipeline(
vulnType: VulnType,
runVulnAgent: () => Promise<AgentMetrics>,
runExploitAgent: () => Promise<AgentMetrics>
): Promise<VulnExploitPipelineResult> {
// Step 1: Run vulnerability agent
const vulnMetrics = await runVulnAgent();
const vulnAgentName = `${vulnType}-vuln`;
const exploitAgentName = `${vulnType}-exploit`;
// Step 2: Check exploitation queue (starts immediately after vuln)
// Step 1: Run vulnerability agent (or skip if completed)
let vulnMetrics: AgentMetrics | null = null;
if (!shouldSkip(vulnAgentName)) {
vulnMetrics = await runVulnAgent();
} else {
console.log(`Skipping ${vulnAgentName} (already complete)`);
}
// Step 2: Check exploitation queue (only if vuln agent ran or completed previously)
const decision = await a.checkExploitationQueue(activityInput, vulnType);
// Step 3: Conditionally run exploit agent
// Step 3: Conditionally run exploit agent (skip if already completed)
let exploitMetrics: AgentMetrics | null = null;
if (decision.shouldExploit) {
exploitMetrics = await runExploitAgent();
if (!shouldSkip(exploitAgentName)) {
exploitMetrics = await runExploitAgent();
} else {
console.log(`Skipping ${exploitAgentName} (already complete)`);
}
}
return {
@@ -195,35 +270,75 @@ export async function pentestPipelineWorkflow(
};
}
// Run all 5 pipelines in parallel with graceful failure handling
// Determine which pipelines to run (skip if both vuln and exploit completed)
const pipelinesToRun: Array<Promise<VulnExploitPipelineResult>> = [];
// Only run pipeline if at least one agent (vuln or exploit) is incomplete
const pipelineConfigs: Array<{
vulnType: VulnType;
vulnAgent: string;
exploitAgent: string;
runVuln: () => Promise<AgentMetrics>;
runExploit: () => Promise<AgentMetrics>;
}> = [
{
vulnType: 'injection',
vulnAgent: 'injection-vuln',
exploitAgent: 'injection-exploit',
runVuln: () => a.runInjectionVulnAgent(activityInput),
runExploit: () => a.runInjectionExploitAgent(activityInput),
},
{
vulnType: 'xss',
vulnAgent: 'xss-vuln',
exploitAgent: 'xss-exploit',
runVuln: () => a.runXssVulnAgent(activityInput),
runExploit: () => a.runXssExploitAgent(activityInput),
},
{
vulnType: 'auth',
vulnAgent: 'auth-vuln',
exploitAgent: 'auth-exploit',
runVuln: () => a.runAuthVulnAgent(activityInput),
runExploit: () => a.runAuthExploitAgent(activityInput),
},
{
vulnType: 'ssrf',
vulnAgent: 'ssrf-vuln',
exploitAgent: 'ssrf-exploit',
runVuln: () => a.runSsrfVulnAgent(activityInput),
runExploit: () => a.runSsrfExploitAgent(activityInput),
},
{
vulnType: 'authz',
vulnAgent: 'authz-vuln',
exploitAgent: 'authz-exploit',
runVuln: () => a.runAuthzVulnAgent(activityInput),
runExploit: () => a.runAuthzExploitAgent(activityInput),
},
];
for (const config of pipelineConfigs) {
const vulnComplete = shouldSkip(config.vulnAgent);
const exploitComplete = shouldSkip(config.exploitAgent);
// Only run pipeline if at least one agent needs to run
if (!vulnComplete || !exploitComplete) {
pipelinesToRun.push(
runVulnExploitPipeline(config.vulnType, config.runVuln, config.runExploit)
);
} else {
console.log(
`Skipping entire ${config.vulnType} pipeline (both agents complete)`
);
// Still need to mark them as completed in state
state.completedAgents.push(config.vulnAgent, config.exploitAgent);
}
}
// Run pipelines in parallel with graceful failure handling
// Promise.allSettled ensures other pipelines continue if one fails
const pipelineResults = await Promise.allSettled([
runVulnExploitPipeline(
'injection',
() => a.runInjectionVulnAgent(activityInput),
() => a.runInjectionExploitAgent(activityInput)
),
runVulnExploitPipeline(
'xss',
() => a.runXssVulnAgent(activityInput),
() => a.runXssExploitAgent(activityInput)
),
runVulnExploitPipeline(
'auth',
() => a.runAuthVulnAgent(activityInput),
() => a.runAuthExploitAgent(activityInput)
),
runVulnExploitPipeline(
'ssrf',
() => a.runSsrfVulnAgent(activityInput),
() => a.runSsrfExploitAgent(activityInput)
),
runVulnExploitPipeline(
'authz',
() => a.runAuthzVulnAgent(activityInput),
() => a.runAuthzExploitAgent(activityInput)
),
]);
const pipelineResults = await Promise.allSettled(pipelinesToRun);
// Aggregate results from all pipelines
const failedPipelines: string[] = [];
@@ -231,16 +346,24 @@ export async function pentestPipelineWorkflow(
if (result.status === 'fulfilled') {
const { vulnType, vulnMetrics, exploitMetrics } = result.value;
// Record vuln agent metrics
// Record vuln agent
const vulnAgentName = `${vulnType}-vuln`;
if (vulnMetrics) {
state.agentMetrics[`${vulnType}-vuln`] = vulnMetrics;
state.completedAgents.push(`${vulnType}-vuln`);
state.agentMetrics[vulnAgentName] = vulnMetrics;
state.completedAgents.push(vulnAgentName);
} else if (shouldSkip(vulnAgentName)) {
// Agent was skipped because already complete
state.completedAgents.push(vulnAgentName);
}
// Record exploit agent metrics (if it ran)
// Record exploit agent (if it ran)
const exploitAgentName = `${vulnType}-exploit`;
if (exploitMetrics) {
state.agentMetrics[`${vulnType}-exploit`] = exploitMetrics;
state.completedAgents.push(`${vulnType}-exploit`);
state.agentMetrics[exploitAgentName] = exploitMetrics;
state.completedAgents.push(exploitAgentName);
} else if (shouldSkip(exploitAgentName)) {
// Agent was skipped because already complete
state.completedAgents.push(exploitAgentName);
}
} else {
// Pipeline failed - log error but continue with others
@@ -266,21 +389,26 @@ export async function pentestPipelineWorkflow(
await a.logPhaseTransition(activityInput, 'vulnerability-exploitation', 'complete');
// === Phase 5: Reporting ===
state.currentPhase = 'reporting';
state.currentAgent = 'report';
await a.logPhaseTransition(activityInput, 'reporting', 'start');
if (!shouldSkip('report')) {
state.currentPhase = 'reporting';
state.currentAgent = 'report';
await a.logPhaseTransition(activityInput, 'reporting', 'start');
// First, assemble the concatenated report from exploitation evidence files
await a.assembleReportActivity(activityInput);
// First, assemble the concatenated report from exploitation evidence files
await a.assembleReportActivity(activityInput);
// Then run the report agent to add executive summary and clean up
state.agentMetrics['report'] = await a.runReportAgent(activityInput);
state.completedAgents.push('report');
// Then run the report agent to add executive summary and clean up
state.agentMetrics['report'] = await a.runReportAgent(activityInput);
state.completedAgents.push('report');
// Inject model metadata into the final report
await a.injectReportMetadataActivity(activityInput);
// Inject model metadata into the final report
await a.injectReportMetadataActivity(activityInput);
await a.logPhaseTransition(activityInput, 'reporting', 'complete');
await a.logPhaseTransition(activityInput, 'reporting', 'complete');
} else {
console.log('Skipping report (already complete)');
state.completedAgents.push('report');
}
// === Complete ===
state.status = 'completed';
+185
View File
@@ -0,0 +1,185 @@
#!/usr/bin/env node
// Copyright (C) 2025 Keygraph, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License version 3
// as published by the Free Software Foundation.
/**
* Workspace listing tool for Shannon.
*
* Reads audit-logs/ directories, parses session.json files, and displays
* a formatted table of all workspaces with status, duration, and cost.
*
* Usage:
* node dist/temporal/workspaces.js
*
* Environment:
* AUDIT_LOGS_DIR - Override audit-logs directory (default: ./audit-logs)
*/
import fs from 'fs/promises';
import path from 'path';
import chalk from 'chalk';
interface SessionJson {
session: {
id: string;
webUrl: string;
status: 'in-progress' | 'completed' | 'failed';
createdAt: string;
completedAt?: string;
};
metrics: {
total_cost_usd: number;
};
}
interface WorkspaceInfo {
name: string;
url: string;
status: 'in-progress' | 'completed' | 'failed';
createdAt: Date;
completedAt: Date | null;
costUsd: number;
}
function formatDuration(ms: number): string {
const seconds = Math.floor(ms / 1000);
const minutes = Math.floor(seconds / 60);
const hours = Math.floor(minutes / 60);
if (hours > 0) {
return `${hours}h ${minutes % 60}m`;
}
if (minutes > 0) {
return `${minutes}m`;
}
return `${seconds}s`;
}
function getStatusDisplay(status: string): string {
switch (status) {
case 'completed':
return chalk.green(status);
case 'in-progress':
return chalk.yellow(status);
case 'failed':
return chalk.red(status);
default:
return status;
}
}
function truncate(str: string, maxLen: number): string {
if (str.length <= maxLen) return str;
return str.slice(0, maxLen - 1) + '\u2026';
}
async function listWorkspaces(): Promise<void> {
const auditDir = process.env.AUDIT_LOGS_DIR || './audit-logs';
let entries: string[];
try {
entries = await fs.readdir(auditDir);
} catch {
console.log(chalk.yellow('No audit-logs directory found.'));
console.log(chalk.gray(`Expected: ${auditDir}`));
return;
}
const workspaces: WorkspaceInfo[] = [];
for (const entry of entries) {
const sessionPath = path.join(auditDir, entry, 'session.json');
try {
const content = await fs.readFile(sessionPath, 'utf8');
const data = JSON.parse(content) as SessionJson;
workspaces.push({
name: entry,
url: data.session.webUrl,
status: data.session.status,
createdAt: new Date(data.session.createdAt),
completedAt: data.session.completedAt ? new Date(data.session.completedAt) : null,
costUsd: data.metrics.total_cost_usd,
});
} catch {
// Skip directories without valid session.json
}
}
if (workspaces.length === 0) {
console.log(chalk.yellow('\nNo workspaces found.'));
console.log(chalk.gray('Run a pipeline first: ./shannon start URL=<url> REPO=<repo>'));
return;
}
// Sort by creation date (most recent first)
workspaces.sort((a, b) => b.createdAt.getTime() - a.createdAt.getTime());
console.log(chalk.cyan.bold('\n=== Shannon Workspaces ===\n'));
// Column widths
const nameWidth = 30;
const urlWidth = 30;
const statusWidth = 14;
const durationWidth = 10;
const costWidth = 10;
// Header
console.log(
chalk.gray(
' ' +
'WORKSPACE'.padEnd(nameWidth) +
'URL'.padEnd(urlWidth) +
'STATUS'.padEnd(statusWidth) +
'DURATION'.padEnd(durationWidth) +
'COST'.padEnd(costWidth)
)
);
console.log(chalk.gray(' ' + '\u2500'.repeat(nameWidth + urlWidth + statusWidth + durationWidth + costWidth)));
let resumableCount = 0;
for (const ws of workspaces) {
const now = new Date();
const endTime = ws.completedAt || now;
const durationMs = endTime.getTime() - ws.createdAt.getTime();
const duration = formatDuration(durationMs);
const cost = `$${ws.costUsd.toFixed(2)}`;
const isResumable = ws.status !== 'completed';
if (isResumable) {
resumableCount++;
}
const resumeTag = isResumable ? chalk.cyan(' (resumable)') : '';
console.log(
' ' +
chalk.white(truncate(ws.name, nameWidth - 2).padEnd(nameWidth)) +
chalk.gray(truncate(ws.url, urlWidth - 2).padEnd(urlWidth)) +
getStatusDisplay(ws.status).padEnd(statusWidth + 10) + // +10 for chalk escape codes
chalk.gray(duration.padEnd(durationWidth)) +
chalk.gray(cost.padEnd(costWidth)) +
resumeTag
);
}
console.log();
const summary = `${workspaces.length} workspace${workspaces.length === 1 ? '' : 's'} found`;
const resumeSummary = resumableCount > 0 ? ` (${resumableCount} resumable)` : '';
console.log(chalk.gray(`${summary}${resumeSummary}`));
if (resumableCount > 0) {
console.log(chalk.gray('\nResume with: ./shannon start URL=<url> REPO=<repo> WORKSPACE=<name>'));
}
console.log();
}
listWorkspaces().catch((err) => {
console.error(chalk.red('Error listing workspaces:'), err);
process.exit(1);
});
+50 -14
View File
@@ -8,20 +8,31 @@
* Agent type definitions
*/
export type AgentName =
| 'pre-recon'
| 'recon'
| 'injection-vuln'
| 'xss-vuln'
| 'auth-vuln'
| 'ssrf-vuln'
| 'authz-vuln'
| 'injection-exploit'
| 'xss-exploit'
| 'auth-exploit'
| 'ssrf-exploit'
| 'authz-exploit'
| 'report';
/**
* List of all agents in execution order.
* Used for iteration during resume state checking.
*/
export const ALL_AGENTS = [
'pre-recon',
'recon',
'injection-vuln',
'xss-vuln',
'auth-vuln',
'ssrf-vuln',
'authz-vuln',
'injection-exploit',
'xss-exploit',
'auth-exploit',
'ssrf-exploit',
'authz-exploit',
'report',
] as const;
/**
* Agent name type derived from ALL_AGENTS.
* This ensures type safety and prevents drift between type and array.
*/
export type AgentName = typeof ALL_AGENTS[number];
export type PromptName =
| 'pre-recon-code'
@@ -82,3 +93,28 @@ export function getPromptNameForAgent(agentName: AgentName): PromptName {
return mappings[agentName];
}
/**
* Maps an agent name to its deliverable file path.
* Must match mcp-server/src/types/deliverables.ts:DELIVERABLE_FILENAMES
*/
export function getDeliverablePath(agentName: AgentName, repoPath: string): string {
const deliverableMap: Record<AgentName, string> = {
'pre-recon': 'code_analysis_deliverable.md',
'recon': 'recon_deliverable.md',
'injection-vuln': 'injection_analysis_deliverable.md',
'xss-vuln': 'xss_analysis_deliverable.md',
'auth-vuln': 'auth_analysis_deliverable.md',
'ssrf-vuln': 'ssrf_analysis_deliverable.md',
'authz-vuln': 'authz_analysis_deliverable.md',
'injection-exploit': 'injection_exploitation_evidence.md',
'xss-exploit': 'xss_exploitation_evidence.md',
'auth-exploit': 'auth_exploitation_evidence.md',
'ssrf-exploit': 'ssrf_exploitation_evidence.md',
'authz-exploit': 'authz_exploitation_evidence.md',
'report': 'comprehensive_security_assessment_report.md',
};
const filename = deliverableMap[agentName];
return `${repoPath}/deliverables/${filename}`;
}