From 575465a7414177fb761b81a201c246799e6cfcd2 Mon Sep 17 00:00:00 2001 From: ezl-keygraph Date: Mon, 15 Jun 2026 16:16:46 +0530 Subject: [PATCH] feat(worker): pi-event-driven output formatting --- apps/worker/src/ai/output-formatters.ts | 225 +++++++----------------- apps/worker/src/ai/pi-executor.ts | 25 ++- apps/worker/src/ai/types.ts | 10 -- 3 files changed, 86 insertions(+), 174 deletions(-) diff --git a/apps/worker/src/ai/output-formatters.ts b/apps/worker/src/ai/output-formatters.ts index db351bb..0268cc1 100644 --- a/apps/worker/src/ai/output-formatters.ts +++ b/apps/worker/src/ai/output-formatters.ts @@ -4,36 +4,31 @@ // it under the terms of the GNU Affero General Public License version 3 // as published by the Free Software Foundation. +/** + * Human-readable console formatting for the agent executor. + * + * Driven by the pi harness event stream: `turn_end` (assistant text) and + * `tool_execution_start` (structured tool calls). Unlike the previous harness — + * where tool calls were tool_use JSON embedded in assistant text and had to be + * parsed out — pi delivers tool name + args as discrete events, so formatting is + * a direct mapping. + */ + import { AGENTS } from '../session-manager.js'; import { extractAgentType, formatDuration } from '../utils/formatting.js'; -import type { ExecutionContext, ResultData } from './types.js'; +import type { ExecutionContext } from './types.js'; interface ToolCallInput { url?: string; - element?: string; - key?: string; - fields?: unknown[]; - text?: string; - action?: string; - description?: string; command?: string; - todos?: Array<{ - status: string; - content: string; - }>; + description?: string; + path?: string; + todos?: Array<{ status: string; content: string }>; [key: string]: unknown; } -interface ToolCall { - name: string; - input?: ToolCallInput; -} - -/** - * Get agent prefix for parallel execution - */ +/** Agent prefix used to attribute output when parallel agents interleave on one stream. */ export function getAgentPrefix(description: string): string { - // Map agent names to their prefixes const agentPrefixes: Record = { 'injection-vuln': '[Injection]', 'xss-vuln': '[XSS]', @@ -47,7 +42,6 @@ export function getAgentPrefix(description: string): string { 'ssrf-exploit': '[SSRF]', }; - // First try to match by agent name directly for (const [agentName, prefix] of Object.entries(agentPrefixes)) { const agent = AGENTS[agentName as keyof typeof AGENTS]; if (agent && description.includes(agent.displayName)) { @@ -55,7 +49,6 @@ export function getAgentPrefix(description: string): string { } } - // Fallback to partial matches for backwards compatibility if (description.includes('injection')) return '[Injection]'; if (description.includes('xss')) return '[XSS]'; if (description.includes('authz')) return '[Authz]'; // Check authz before auth @@ -65,9 +58,7 @@ export function getAgentPrefix(description: string): string { return '[Agent]'; } -/** - * Extract domain from URL for display - */ +/** Extract domain from URL for display. */ function extractDomain(url: string): string { try { const urlObj = new URL(url); @@ -77,11 +68,8 @@ function extractDomain(url: string): string { } } -/** - * Format playwright-cli commands into clean progress indicators - */ +/** Format a playwright-cli command (run via the bash tool) into a clean progress indicator. */ function formatBrowserAction(command: string): string | null { - // Extract subcommand after optional session flag (e.g., "playwright-cli -s=session1 navigate https://example.com") const match = command.match(/playwright-cli\s+(?:-s=\S+\s+)?(\S+)(?:\s+(.*))?/); if (!match) return null; @@ -151,26 +139,19 @@ function formatBrowserAction(command: string): string | null { } } -/** - * Summarize TodoWrite updates into clean progress indicators - */ +/** Summarize a todo_write update into a clean progress indicator. */ function summarizeTodoUpdate(input: ToolCallInput | undefined): string | null { if (!input?.todos || !Array.isArray(input.todos)) { return null; } const todos = input.todos; - const completed = todos.filter((t) => t.status === 'completed'); - const inProgress = todos.filter((t) => t.status === 'in_progress'); - - // Show recently completed tasks - const recent = completed.at(-1); + const recent = todos.filter((t) => t.status === 'completed').at(-1); if (recent) { return `✅ ${recent.content}`; } - // Show current in-progress task - const current = inProgress.at(0); + const current = todos.filter((t) => t.status === 'in_progress').at(0); if (current) { return `🔄 ${current.content}`; } @@ -178,69 +159,6 @@ function summarizeTodoUpdate(input: ToolCallInput | undefined): string | null { return null; } -/** - * Filter out JSON tool calls from content, with special handling for Task calls - */ -export function filterJsonToolCalls(content: string | null | undefined): string { - if (!content || typeof content !== 'string') { - return content || ''; - } - - const lines = content.split('\n'); - const processedLines: string[] = []; - - for (const line of lines) { - const trimmed = line.trim(); - - // Skip empty lines - if (trimmed === '') { - continue; - } - - // Check if this is a JSON tool call - if (trimmed.startsWith('{"type":"tool_use"')) { - try { - const toolCall = JSON.parse(trimmed) as ToolCall; - - // Special handling for Task tool calls - if (toolCall.name === 'Task') { - const description = toolCall.input?.description || 'analysis agent'; - processedLines.push(`🚀 Launching ${description}`); - continue; - } - - // Special handling for TodoWrite tool calls - if (toolCall.name === 'TodoWrite') { - const summary = summarizeTodoUpdate(toolCall.input); - if (summary) { - processedLines.push(summary); - } - continue; - } - - // Special handling for browser tool calls (playwright-cli via Bash) - if (toolCall.name === 'Bash') { - const command = toolCall.input?.command || ''; - if (command.includes('playwright-cli')) { - const browserAction = formatBrowserAction(command); - if (browserAction) { - processedLines.push(browserAction); - } - } - } - } catch { - // If JSON parsing fails, treat as regular text - processedLines.push(line); - } - } else { - // Keep non-JSON lines (assistant text) - processedLines.push(line); - } - } - - return processedLines.join('\n'); -} - export function detectExecutionContext(description: string): ExecutionContext { const isParallelExecution = description.includes('vuln agent') || description.includes('exploit agent'); @@ -252,62 +170,69 @@ export function detectExecutionContext(description: string): ExecutionContext { description.includes('exploit agent'); const agentType = extractAgentType(description); - const agentKey = description.toLowerCase().replace(/\s+/g, '-'); return { isParallelExecution, useCleanOutput, agentType, agentKey }; } +/** Format assistant turn text (from a pi `turn_end` event). */ export function formatAssistantOutput( - cleanedContent: string, + text: string, context: ExecutionContext, turnCount: number, description: string, ): string[] { - if (!cleanedContent.trim()) { + if (!text.trim()) { return []; } - const lines: string[] = []; - if (context.isParallelExecution) { - // Compact output for parallel agents with prefixes - const prefix = getAgentPrefix(description); - lines.push(`${prefix} ${cleanedContent}`); - } else { - // Full turn output for sequential agents - lines.push(`\n Turn ${turnCount} (${description}):`); - lines.push(` ${cleanedContent}`); + // Compact, attributed output for interleaved parallel agents. + return [`${getAgentPrefix(description)} ${text}`]; } - - return lines; + // Full turn output for sequential agents. + return [`\n Turn ${turnCount} (${description}):`, ` ${text}`]; } -export function formatResultOutput(data: ResultData, showFullResult: boolean): string[] { - const lines: string[] = []; +/** + * Format a pi `tool_execution_start` event into a clean one-line progress indicator. + * + * Maps the common tool surfaces — `task` (sub-agent delegation), `todo_write` + * (plan updates), `bash` (incl. playwright-cli browser actions), read-only file + * tools, and the structured collector/submit tools — to friendly lines. Returns + * `[]` when there's nothing worth surfacing (e.g. a todo update with no active item). + */ +export function formatToolCall( + toolName: string, + args: Record | undefined, + context: ExecutionContext, + description: string, +): string[] { + const input = (args ?? {}) as ToolCallInput; + let line: string | null; - lines.push(`\n COMPLETED:`); - lines.push(` Duration: ${(data.duration_ms / 1000).toFixed(1)}s, Cost: $${data.cost.toFixed(4)}`); - - if (data.subtype === 'error_max_turns') { - lines.push(` Stopped: Hit maximum turns limit`); - } else if (data.subtype === 'error_during_execution') { - lines.push(` Stopped: Execution error`); + if (toolName === 'task') { + line = `🚀 Launching ${input.description ?? 'sub-agent'}`; + } else if (toolName === 'todo_write') { + line = summarizeTodoUpdate(input); + } else if (toolName === 'bash') { + const command = typeof input.command === 'string' ? input.command : ''; + line = command.includes('playwright-cli') ? formatBrowserAction(command) : `💻 ${command.slice(0, 60)}`; + } else if (toolName === 'read' || toolName === 'grep' || toolName === 'find' || toolName === 'ls') { + const path = typeof input.path === 'string' ? ` ${input.path.slice(0, 60)}` : ''; + line = `📖 ${toolName}${path}`; + } else if (toolName.startsWith('set_') || toolName.startsWith('add_') || toolName.startsWith('submit_')) { + line = `📊 ${toolName.replace(/_/g, ' ')}`; + } else { + line = `🔧 ${toolName}`; } - if (data.permissionDenials > 0) { - lines.push(` ${data.permissionDenials} permission denials`); - } + if (!line) return []; - if (showFullResult && data.result && typeof data.result === 'string') { - if (data.result.length > 1000) { - lines.push(` ${data.result.slice(0, 1000)}... [${data.result.length} total chars]`); - } else { - lines.push(` ${data.result}`); - } + if (context.isParallelExecution) { + return [`${getAgentPrefix(description)} ${line}`]; } - - return lines; + return [` ${line}`]; } export function formatErrorOutput( @@ -321,8 +246,7 @@ export function formatErrorOutput( const lines: string[] = []; if (context.isParallelExecution) { - const prefix = getAgentPrefix(description); - lines.push(`${prefix} Failed (${formatDuration(duration)})`); + lines.push(`${getAgentPrefix(description)} Failed (${formatDuration(duration)})`); } else if (context.useCleanOutput) { lines.push(`${context.agentType} failed (${formatDuration(duration)})`); } else { @@ -352,8 +276,7 @@ export function formatCompletionMessage( duration: number, ): string { if (context.isParallelExecution) { - const prefix = getAgentPrefix(description); - return `${prefix} Complete (${turnCount} turns, ${formatDuration(duration)})`; + return `${getAgentPrefix(description)} Complete (${turnCount} turns, ${formatDuration(duration)})`; } if (context.useCleanOutput) { @@ -362,25 +285,3 @@ export function formatCompletionMessage( return ` pi agent completed: ${description} (${turnCount} turns) in ${formatDuration(duration)}`; } - -export function formatToolUseOutput(toolName: string, input: Record | undefined): string[] { - const lines: string[] = []; - - lines.push(`\n Using Tool: ${toolName}`); - if (input && Object.keys(input).length > 0) { - lines.push(` Input: ${JSON.stringify(input, null, 2)}`); - } - - return lines; -} - -export function formatToolResultOutput(displayContent: string): string[] { - const lines: string[] = []; - - lines.push(` Tool Result:`); - if (displayContent) { - lines.push(` ${displayContent}`); - } - - return lines; -} diff --git a/apps/worker/src/ai/pi-executor.ts b/apps/worker/src/ai/pi-executor.ts index 809b126..b8ea197 100644 --- a/apps/worker/src/ai/pi-executor.ts +++ b/apps/worker/src/ai/pi-executor.ts @@ -31,7 +31,13 @@ import { formatTimestamp } from '../utils/formatting.js'; import { Timer } from '../utils/metrics.js'; import { createAuditLogger } from './audit-logger.js'; import { type ModelTier, resolveModelSelection } from './models.js'; -import { detectExecutionContext, formatCompletionMessage, formatErrorOutput } from './output-formatters.js'; +import { + detectExecutionContext, + formatAssistantOutput, + formatCompletionMessage, + formatErrorOutput, + formatToolCall, +} from './output-formatters.js'; import { createProgressManager } from './progress-manager.js'; import { permissionConfigPath } from './settings-writer.js'; import { createTaskTool, createTodoWriteTool } from './tools.js'; @@ -278,6 +284,9 @@ export async function runPiPrompt( const text = extractAssistantText(msg); if (text.trim()) { void auditLogger.logLlmResponse(turnCount, text); + progress.stop(); + outputLines(formatAssistantOutput(text, execContext, turnCount, description)); + progress.start(); const billing = classifyErrorText(text); if (billing) pendingError = billing; } @@ -290,9 +299,21 @@ export async function runPiPrompt( } break; } - case 'tool_execution_start': + case 'tool_execution_start': { void auditLogger.logToolStart(event.toolName, event.args); + const toolLines = formatToolCall( + event.toolName, + event.args as Record, + execContext, + description, + ); + if (toolLines.length > 0) { + progress.stop(); + outputLines(toolLines); + progress.start(); + } break; + } case 'tool_execution_end': void auditLogger.logToolEnd(event.result); break; diff --git a/apps/worker/src/ai/types.ts b/apps/worker/src/ai/types.ts index 0bebc44..99321f7 100644 --- a/apps/worker/src/ai/types.ts +++ b/apps/worker/src/ai/types.ts @@ -12,13 +12,3 @@ export interface ExecutionContext { agentType: string; agentKey: string; } - -export interface ResultData { - result: string | null; - cost: number; - duration_ms: number; - subtype?: string; - stop_reason?: string | null; - permissionDenials: number; - structuredOutput?: unknown; -}