refactor: modularize claude-executor and extract shared utilities

- Extract message handling into src/ai/message-handlers.ts with pure functions
- Extract output formatting into src/ai/output-formatters.ts
- Extract progress management into src/ai/progress-manager.ts
- Add audit-logger.ts with Null Object pattern for optional logging
- Add shared utilities: formatting.ts, file-io.ts, functional.ts
- Consolidate getPromptNameForAgent into src/types/agents.ts
This commit is contained in:
ajmallesh
2026-01-12 12:14:49 -08:00
parent bc52d67dd5
commit f84414d5ca
21 changed files with 1636 additions and 1107 deletions
+2 -3
View File
@@ -31,13 +31,12 @@ type UnlockFunction = () => void;
* }
* ```
*/
// Promise-based mutex with queue semantics - safe for parallel agents on same session
export class SessionMutex {
// Map of sessionId -> Promise (represents active lock)
private locks: Map<string, Promise<void>> = new Map();
/**
* Acquire lock for a session
*/
// Wait for existing lock, then acquire. Queue ensures FIFO ordering.
async lock(sessionId: string): Promise<UnlockFunction> {
if (this.locks.has(sessionId)) {
// Wait for existing lock to be released
+73
View File
@@ -0,0 +1,73 @@
// Copyright (C) 2025 Keygraph, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License version 3
// as published by the Free Software Foundation.
/**
* File I/O Utilities
*
* Core utility functions for file operations including atomic writes,
* directory creation, and JSON file handling.
*/
import fs from 'fs/promises';
/**
* Ensure directory exists (idempotent, race-safe)
*/
export async function ensureDirectory(dirPath: string): Promise<void> {
try {
await fs.mkdir(dirPath, { recursive: true });
} catch (error) {
// Ignore EEXIST errors (race condition safe)
if ((error as NodeJS.ErrnoException).code !== 'EEXIST') {
throw error;
}
}
}
/**
* Atomic write using temp file + rename pattern
* Guarantees no partial writes or corruption on crash
*/
export async function atomicWrite(filePath: string, data: object | string): Promise<void> {
const tempPath = `${filePath}.tmp`;
const content = typeof data === 'string' ? data : JSON.stringify(data, null, 2);
try {
// Write to temp file
await fs.writeFile(tempPath, content, 'utf8');
// Atomic rename (POSIX guarantee: atomic on same filesystem)
await fs.rename(tempPath, filePath);
} catch (error) {
// Clean up temp file on failure
try {
await fs.unlink(tempPath);
} catch {
// Ignore cleanup errors
}
throw error;
}
}
/**
* Read and parse JSON file
*/
export async function readJson<T = unknown>(filePath: string): Promise<T> {
const content = await fs.readFile(filePath, 'utf8');
return JSON.parse(content) as T;
}
/**
* Check if file exists
*/
export async function fileExists(filePath: string): Promise<boolean> {
try {
await fs.access(filePath);
return true;
} catch {
return false;
}
}
+60
View File
@@ -0,0 +1,60 @@
// Copyright (C) 2025 Keygraph, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License version 3
// as published by the Free Software Foundation.
/**
* Formatting Utilities
*
* Generic formatting functions for durations, timestamps, and percentages.
*/
/**
* Format duration in milliseconds to human-readable string
*/
export function formatDuration(ms: number): string {
if (ms < 1000) {
return `${ms}ms`;
}
const seconds = ms / 1000;
if (seconds < 60) {
return `${seconds.toFixed(1)}s`;
}
const minutes = Math.floor(seconds / 60);
const remainingSeconds = Math.floor(seconds % 60);
return `${minutes}m ${remainingSeconds}s`;
}
/**
* Format timestamp to ISO 8601 string
*/
export function formatTimestamp(timestamp: number = Date.now()): string {
return new Date(timestamp).toISOString();
}
/**
* Calculate percentage
*/
export function calculatePercentage(part: number, total: number): number {
if (total === 0) return 0;
return (part / total) * 100;
}
/**
* Extract agent type from description string for display purposes
*/
export function extractAgentType(description: string): string {
if (description.includes('Pre-recon')) {
return 'pre-reconnaissance';
}
if (description.includes('Recon')) {
return 'reconnaissance';
}
if (description.includes('Report')) {
return 'report generation';
}
return 'analysis';
}
+29
View File
@@ -0,0 +1,29 @@
// Copyright (C) 2025 Keygraph, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License version 3
// as published by the Free Software Foundation.
/**
* Functional Programming Utilities
*
* Generic functional composition patterns for async operations.
*/
// eslint-disable-next-line @typescript-eslint/no-explicit-any
type PipelineFunction = (x: any) => any | Promise<any>;
/**
* Async pipeline that passes result through a series of functions.
* Clearer than reduce-based pipe and easier to debug.
*/
export async function asyncPipe<TResult>(
initial: unknown,
...fns: PipelineFunction[]
): Promise<TResult> {
let result = initial;
for (const fn of fns) {
result = await fn(result);
}
return result as TResult;
}
+132 -144
View File
@@ -13,7 +13,57 @@ interface GitOperationResult {
error?: Error;
}
// Global git operations semaphore to prevent index.lock conflicts during parallel execution
/**
* Get list of changed files from git status --porcelain output
*/
async function getChangedFiles(
sourceDir: string,
operationDescription: string
): Promise<string[]> {
const status = await executeGitCommandWithRetry(
['git', 'status', '--porcelain'],
sourceDir,
operationDescription
);
return status.stdout
.trim()
.split('\n')
.filter((line) => line.length > 0);
}
/**
* Log a summary of changed files with truncation for long lists
*/
function logChangeSummary(
changes: string[],
messageWithChanges: string,
messageWithoutChanges: string,
color: typeof chalk.green,
maxToShow: number = 5
): void {
if (changes.length > 0) {
console.log(color(messageWithChanges.replace('{count}', String(changes.length))));
changes.slice(0, maxToShow).forEach((change) => console.log(chalk.gray(` ${change}`)));
if (changes.length > maxToShow) {
console.log(chalk.gray(` ... and ${changes.length - maxToShow} more files`));
}
} else {
console.log(color(messageWithoutChanges));
}
}
/**
* Convert unknown error to GitOperationResult
*/
function toErrorResult(error: unknown): GitOperationResult {
const errMsg = error instanceof Error ? error.message : String(error);
return {
success: false,
error: error instanceof Error ? error : new Error(errMsg),
};
}
// Serializes git operations to prevent index.lock conflicts during parallel agent execution
class GitSemaphore {
private queue: Array<() => void> = [];
private running: boolean = false;
@@ -41,33 +91,38 @@ class GitSemaphore {
const gitSemaphore = new GitSemaphore();
// Execute git commands with retry logic for index.lock conflicts
export const executeGitCommandWithRetry = async (
const GIT_LOCK_ERROR_PATTERNS = [
'index.lock',
'unable to lock',
'Another git process',
'fatal: Unable to create',
'fatal: index file',
];
function isGitLockError(errorMessage: string): boolean {
return GIT_LOCK_ERROR_PATTERNS.some((pattern) => errorMessage.includes(pattern));
}
// Retries git commands on lock conflicts with exponential backoff
export async function executeGitCommandWithRetry(
commandArgs: string[],
sourceDir: string,
description: string,
maxRetries: number = 5
): Promise<{ stdout: string; stderr: string }> => {
): Promise<{ stdout: string; stderr: string }> {
await gitSemaphore.acquire();
try {
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
// For arrays like ['git', 'status', '--porcelain'], execute parts separately
const [cmd, ...args] = commandArgs;
const result = await $`cd ${sourceDir} && ${cmd} ${args}`;
return result;
} catch (error) {
const errMsg = error instanceof Error ? error.message : String(error);
const isLockError =
errMsg.includes('index.lock') ||
errMsg.includes('unable to lock') ||
errMsg.includes('Another git process') ||
errMsg.includes('fatal: Unable to create') ||
errMsg.includes('fatal: index file');
if (isLockError && attempt < maxRetries) {
const delay = Math.pow(2, attempt - 1) * 1000; // Exponential backoff: 1s, 2s, 4s, 8s, 16s
if (isGitLockError(errMsg) && attempt < maxRetries) {
const delay = Math.pow(2, attempt - 1) * 1000;
console.log(
chalk.yellow(
` ⚠️ Git lock conflict during ${description} (attempt ${attempt}/${maxRetries}). Retrying in ${delay}ms...`
@@ -80,84 +135,69 @@ export const executeGitCommandWithRetry = async (
throw error;
}
}
// Should never reach here but TypeScript needs a return
throw new Error(`Git command failed after ${maxRetries} retries`);
} finally {
gitSemaphore.release();
}
};
}
// Pure functions for Git workspace management
const cleanWorkspace = async (
// Two-phase reset: hard reset (tracked files) + clean (untracked files)
export async function rollbackGitWorkspace(
sourceDir: string,
reason: string = 'clean start'
): Promise<GitOperationResult> => {
console.log(chalk.blue(` 🧹 Cleaning workspace for ${reason}`));
reason: string = 'retry preparation'
): Promise<GitOperationResult> {
console.log(chalk.yellow(` 🔄 Rolling back workspace for ${reason}`));
try {
// Check for uncommitted changes
const status = await $`cd ${sourceDir} && git status --porcelain`;
const hasChanges = status.stdout.trim().length > 0;
const changes = await getChangedFiles(sourceDir, 'status check for rollback');
if (hasChanges) {
// Show what we're about to remove
const changes = status.stdout
.trim()
.split('\n')
.filter((line) => line.length > 0);
console.log(chalk.yellow(` 🔄 Rolling back workspace for ${reason}`));
await executeGitCommandWithRetry(
['git', 'reset', '--hard', 'HEAD'],
sourceDir,
'hard reset for rollback'
);
await executeGitCommandWithRetry(
['git', 'clean', '-fd'],
sourceDir,
'cleaning untracked files for rollback'
);
await $`cd ${sourceDir} && git reset --hard HEAD`;
await $`cd ${sourceDir} && git clean -fd`;
console.log(
chalk.yellow(` ✅ Rollback completed - removed ${changes.length} contaminated changes:`)
);
changes.slice(0, 3).forEach((change) => console.log(chalk.gray(` ${change}`)));
if (changes.length > 3) {
console.log(chalk.gray(` ... and ${changes.length - 3} more files`));
}
} else {
console.log(chalk.blue(` ✅ Workspace already clean (no changes to remove)`));
}
return { success: true, hadChanges: hasChanges };
logChangeSummary(
changes,
' ✅ Rollback completed - removed {count} contaminated changes:',
' ✅ Rollback completed - no changes to remove',
chalk.yellow,
3
);
return { success: true };
} catch (error) {
const errMsg = error instanceof Error ? error.message : String(error);
console.log(chalk.yellow(` ⚠️ Workspace cleanup failed: ${errMsg}`));
return { success: false, error: error instanceof Error ? error : new Error(errMsg) };
const result = toErrorResult(error);
console.log(chalk.red(` ❌ Rollback failed after retries: ${result.error?.message}`));
return result;
}
};
}
export const createGitCheckpoint = async (
// Creates checkpoint before each attempt. First attempt preserves workspace; retries clean it.
export async function createGitCheckpoint(
sourceDir: string,
description: string,
attempt: number
): Promise<GitOperationResult> => {
): Promise<GitOperationResult> {
console.log(chalk.blue(` 📍 Creating checkpoint for ${description} (attempt ${attempt})`));
try {
// Only clean workspace on retry attempts (attempt > 1), not on first attempts
// This preserves deliverables between agents while still cleaning on actual retries
// First attempt: preserve existing deliverables. Retries: clean workspace to prevent pollution
if (attempt > 1) {
const cleanResult = await cleanWorkspace(sourceDir, `${description} (retry cleanup)`);
const cleanResult = await rollbackGitWorkspace(sourceDir, `${description} (retry cleanup)`);
if (!cleanResult.success) {
const errMsg = cleanResult.error?.message || 'Unknown error';
console.log(
chalk.yellow(` ⚠️ Workspace cleanup failed, continuing anyway: ${errMsg}`)
chalk.yellow(` ⚠️ Workspace cleanup failed, continuing anyway: ${cleanResult.error?.message}`)
);
}
}
// Check for uncommitted changes with retry logic
const status = await executeGitCommandWithRetry(
['git', 'status', '--porcelain'],
sourceDir,
'status check'
);
const hasChanges = status.stdout.trim().length > 0;
const changes = await getChangedFiles(sourceDir, 'status check');
const hasChanges = changes.length > 0;
// Stage changes with retry logic
await executeGitCommandWithRetry(['git', 'add', '-A'], sourceDir, 'staging changes');
// Create commit with retry logic
await executeGitCommandWithRetry(
['git', 'commit', '-m', `📍 Checkpoint: ${description} (attempt ${attempt})`, '--allow-empty'],
sourceDir,
@@ -171,106 +211,54 @@ export const createGitCheckpoint = async (
}
return { success: true };
} catch (error) {
const errMsg = error instanceof Error ? error.message : String(error);
console.log(chalk.yellow(` ⚠️ Checkpoint creation failed after retries: ${errMsg}`));
return { success: false, error: error instanceof Error ? error : new Error(errMsg) };
const result = toErrorResult(error);
console.log(chalk.yellow(` ⚠️ Checkpoint creation failed after retries: ${result.error?.message}`));
return result;
}
};
}
export const commitGitSuccess = async (
export async function commitGitSuccess(
sourceDir: string,
description: string
): Promise<GitOperationResult> => {
): Promise<GitOperationResult> {
console.log(chalk.green(` 💾 Committing successful results for ${description}`));
try {
// Check what we're about to commit with retry logic
const status = await executeGitCommandWithRetry(
['git', 'status', '--porcelain'],
sourceDir,
'status check for success commit'
);
const changes = status.stdout
.trim()
.split('\n')
.filter((line) => line.length > 0);
const changes = await getChangedFiles(sourceDir, 'status check for success commit');
// Stage changes with retry logic
await executeGitCommandWithRetry(
['git', 'add', '-A'],
sourceDir,
'staging changes for success commit'
);
// Create success commit with retry logic
await executeGitCommandWithRetry(
['git', 'commit', '-m', `${description}: completed successfully`, '--allow-empty'],
sourceDir,
'creating success commit'
);
if (changes.length > 0) {
console.log(chalk.green(` ✅ Success commit created with ${changes.length} file changes:`));
changes.slice(0, 5).forEach((change) => console.log(chalk.gray(` ${change}`)));
if (changes.length > 5) {
console.log(chalk.gray(` ... and ${changes.length - 5} more files`));
}
} else {
console.log(chalk.green(` ✅ Empty success commit created (agent made no file changes)`));
}
logChangeSummary(
changes,
' ✅ Success commit created with {count} file changes:',
' ✅ Empty success commit created (agent made no file changes)',
chalk.green,
5
);
return { success: true };
} catch (error) {
const errMsg = error instanceof Error ? error.message : String(error);
console.log(chalk.yellow(` ⚠️ Success commit failed after retries: ${errMsg}`));
return { success: false, error: error instanceof Error ? error : new Error(errMsg) };
const result = toErrorResult(error);
console.log(chalk.yellow(` ⚠️ Success commit failed after retries: ${result.error?.message}`));
return result;
}
};
}
export const rollbackGitWorkspace = async (
sourceDir: string,
reason: string = 'retry preparation'
): Promise<GitOperationResult> => {
console.log(chalk.yellow(` 🔄 Rolling back workspace for ${reason}`));
/**
* Get current git commit hash
*/
export async function getGitCommitHash(sourceDir: string): Promise<string | null> {
try {
// Show what we're about to remove with retry logic
const status = await executeGitCommandWithRetry(
['git', 'status', '--porcelain'],
sourceDir,
'status check for rollback'
);
const changes = status.stdout
.trim()
.split('\n')
.filter((line) => line.length > 0);
// Reset to HEAD with retry logic
await executeGitCommandWithRetry(
['git', 'reset', '--hard', 'HEAD'],
sourceDir,
'hard reset for rollback'
);
// Clean untracked files with retry logic
await executeGitCommandWithRetry(
['git', 'clean', '-fd'],
sourceDir,
'cleaning untracked files for rollback'
);
if (changes.length > 0) {
console.log(
chalk.yellow(` ✅ Rollback completed - removed ${changes.length} contaminated changes:`)
);
changes.slice(0, 3).forEach((change) => console.log(chalk.gray(` ${change}`)));
if (changes.length > 3) {
console.log(chalk.gray(` ... and ${changes.length - 3} more files`));
}
} else {
console.log(chalk.yellow(` ✅ Rollback completed - no changes to remove`));
}
return { success: true };
} catch (error) {
const errMsg = error instanceof Error ? error.message : String(error);
console.log(chalk.red(` ❌ Rollback failed after retries: ${errMsg}`));
return { success: false, error: error instanceof Error ? error : new Error(errMsg) };
const result = await $`cd ${sourceDir} && git rev-parse HEAD`;
return result.stdout.trim();
} catch {
return null;
}
};
}
+1 -1
View File
@@ -5,7 +5,7 @@
// as published by the Free Software Foundation.
import chalk from 'chalk';
import { formatDuration } from '../audit/utils.js';
import { formatDuration } from './formatting.js';
// Timing utilities