feat: stream-json NDJSON parser for real-time E2E progress

Switch session-runner from buffered `--output-format json` to streaming
`--output-format stream-json --verbose`. Parses NDJSON line-by-line for
real-time tool-by-tool progress on stderr during 3-5 min E2E runs.

- Extract testable `parseNDJSON()` function (pure, no I/O)
- Count turns per assistant event (not per text block)
- Add `transcript: any[]` to SkillTestResult, remove dead `messages` field
- Reconstruct allText from transcript for browse error scanning
- 8 unit tests for parser (malformed lines, empty input, turn counting)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Garry Tan
2026-03-14 03:49:36 -05:00
parent 3d750d89af
commit e7347c2f8f
2 changed files with 235 additions and 58 deletions
+139 -58
View File
@@ -2,8 +2,8 @@
* Claude CLI subprocess runner for skill E2E testing.
*
* Spawns `claude -p` as a completely independent process (not via Agent SDK),
* so it works inside Claude Code sessions. Pipes prompt via stdin, collects
* JSON output, scans for browse errors.
* so it works inside Claude Code sessions. Pipes prompt via stdin, streams
* NDJSON output for real-time progress, scans for browse errors.
*/
import * as fs from 'fs';
@@ -18,13 +18,13 @@ export interface CostEstimate {
}
export interface SkillTestResult {
messages: any[];
toolCalls: Array<{ tool: string; input: any; output: string }>;
browseErrors: string[];
exitReason: string;
duration: number;
output: string;
costEstimate: CostEstimate;
transcript: any[];
}
const BROWSE_ERROR_PATTERNS = [
@@ -36,6 +36,63 @@ const BROWSE_ERROR_PATTERNS = [
/no such file or directory.*browse/i,
];
// --- Testable NDJSON parser ---
export interface ParsedNDJSON {
transcript: any[];
resultLine: any | null;
turnCount: number;
toolCallCount: number;
toolCalls: Array<{ tool: string; input: any; output: string }>;
}
/**
* Parse an array of NDJSON lines into structured transcript data.
* Pure function — no I/O, no side effects. Used by both the streaming
* reader and unit tests.
*/
export function parseNDJSON(lines: string[]): ParsedNDJSON {
const transcript: any[] = [];
let resultLine: any = null;
let turnCount = 0;
let toolCallCount = 0;
const toolCalls: ParsedNDJSON['toolCalls'] = [];
for (const line of lines) {
if (!line.trim()) continue;
try {
const event = JSON.parse(line);
transcript.push(event);
// Track turns and tool calls from assistant events
if (event.type === 'assistant') {
turnCount++;
const content = event.message?.content || [];
for (const item of content) {
if (item.type === 'tool_use') {
toolCallCount++;
toolCalls.push({
tool: item.name || 'unknown',
input: item.input || {},
output: '',
});
}
}
}
if (event.type === 'result') resultLine = event;
} catch { /* skip malformed lines */ }
}
return { transcript, resultLine, turnCount, toolCallCount, toolCalls };
}
function truncate(s: string, max: number): string {
return s.length > max ? s.slice(0, max) + '…' : s;
}
// --- Main runner ---
export async function runSkillTest(options: {
prompt: string;
workingDirectory: string;
@@ -53,12 +110,12 @@ export async function runSkillTest(options: {
const startTime = Date.now();
// Spawn claude -p with JSON output. Prompt piped via stdin to avoid
// shell escaping issues. Env is passed through (child claude strips
// its own parent-detection vars internally).
// Spawn claude -p with streaming NDJSON output. Prompt piped via stdin to
// avoid shell escaping issues. --verbose is required for stream-json mode.
const args = [
'-p',
'--output-format', 'json',
'--output-format', 'stream-json',
'--verbose',
'--dangerously-skip-permissions',
'--max-turns', String(maxTurns),
'--allowed-tools', ...allowedTools,
@@ -75,7 +132,6 @@ export async function runSkillTest(options: {
});
// Race against timeout
let stdout = '';
let stderr = '';
let exitReason = 'unknown';
let timedOut = false;
@@ -85,50 +141,76 @@ export async function runSkillTest(options: {
proc.kill();
}, timeout);
// Stream NDJSON from stdout for real-time progress
const collectedLines: string[] = [];
let liveTurnCount = 0;
let liveToolCount = 0;
const stderrPromise = new Response(proc.stderr).text();
const reader = proc.stdout.getReader();
const decoder = new TextDecoder();
let buf = '';
try {
const [outBuf, errBuf] = await Promise.all([
new Response(proc.stdout).text(),
new Response(proc.stderr).text(),
]);
stdout = outBuf;
stderr = errBuf;
while (true) {
const { done, value } = await reader.read();
if (done) break;
buf += decoder.decode(value, { stream: true });
const lines = buf.split('\n');
buf = lines.pop() || '';
for (const line of lines) {
if (!line.trim()) continue;
collectedLines.push(line);
const exitCode = await proc.exited;
clearTimeout(timeoutId);
if (timedOut) {
exitReason = 'timeout';
} else if (exitCode === 0) {
exitReason = 'success';
} else {
exitReason = `exit_code_${exitCode}`;
// Real-time progress to stderr
try {
const event = JSON.parse(line);
if (event.type === 'assistant') {
liveTurnCount++;
const content = event.message?.content || [];
for (const item of content) {
if (item.type === 'tool_use') {
liveToolCount++;
const elapsed = Math.round((Date.now() - startTime) / 1000);
process.stderr.write(
` [${elapsed}s] turn ${liveTurnCount} tool #${liveToolCount}: ${item.name}(${truncate(JSON.stringify(item.input || {}), 80)})\n`
);
}
}
}
} catch { /* skip — parseNDJSON will handle it later */ }
}
}
} catch (err: any) {
clearTimeout(timeoutId);
exitReason = timedOut ? 'timeout' : `error: ${err.message}`;
} finally {
try { fs.unlinkSync(promptFile); } catch { /* non-fatal */ }
} catch { /* stream read error — fall through to exit code handling */ }
// Flush remaining buffer
if (buf.trim()) {
collectedLines.push(buf);
}
stderr = await stderrPromise;
const exitCode = await proc.exited;
clearTimeout(timeoutId);
try { fs.unlinkSync(promptFile); } catch { /* non-fatal */ }
if (timedOut) {
exitReason = 'timeout';
} else if (exitCode === 0) {
exitReason = 'success';
} else {
exitReason = `exit_code_${exitCode}`;
}
const duration = Date.now() - startTime;
// Parse JSON output
let messages: any[] = [];
let toolCalls: SkillTestResult['toolCalls'] = [];
// Parse all collected NDJSON lines
const parsed = parseNDJSON(collectedLines);
const { transcript, resultLine, toolCalls } = parsed;
const browseErrors: string[] = [];
let result: any = null;
try {
// stdout may have stderr warnings prefixed (e.g., "[WARN] Fast mode...")
// Find the JSON object in the output
const jsonStart = stdout.indexOf('{');
if (jsonStart >= 0) {
result = JSON.parse(stdout.slice(jsonStart));
}
} catch { /* non-JSON output */ }
// Scan all output for browse errors
const allText = stdout + '\n' + stderr;
// Scan transcript + stderr for browse errors
const allText = transcript.map(e => JSON.stringify(e)).join('\n') + '\n' + stderr;
for (const pattern of BROWSE_ERROR_PATTERNS) {
const match = allText.match(pattern);
if (match) {
@@ -136,13 +218,12 @@ export async function runSkillTest(options: {
}
}
// If JSON parsed, use the structured result
if (result) {
// Check result type for success
if (result.type === 'result' && result.subtype === 'success') {
// Use resultLine for structured result data
if (resultLine) {
if (resultLine.subtype === 'success') {
exitReason = 'success';
} else if (result.type === 'result' && result.subtype) {
exitReason = result.subtype;
} else if (resultLine.subtype) {
exitReason = resultLine.subtype;
}
}
@@ -160,20 +241,20 @@ export async function runSkillTest(options: {
browseErrors,
duration,
stderr: stderr.slice(0, 2000),
result: result ? { type: result.type, subtype: result.subtype, result: result.result?.slice?.(0, 500) } : null,
result: resultLine ? { type: resultLine.type, subtype: resultLine.subtype, result: resultLine.result?.slice?.(0, 500) } : null,
}, null, 2),
);
} catch { /* non-fatal */ }
}
// Cost from JSON result (exact) or estimate from chars
const turnsUsed = result?.num_turns || 0;
const estimatedCost = result?.total_cost_usd || 0;
// Cost from result line (exact) or estimate from chars
const turnsUsed = resultLine?.num_turns || 0;
const estimatedCost = resultLine?.total_cost_usd || 0;
const inputChars = prompt.length;
const outputChars = (result?.result || stdout).length;
const estimatedTokens = (result?.usage?.input_tokens || 0)
+ (result?.usage?.output_tokens || 0)
+ (result?.usage?.cache_read_input_tokens || 0);
const outputChars = (resultLine?.result || '').length;
const estimatedTokens = (resultLine?.usage?.input_tokens || 0)
+ (resultLine?.usage?.output_tokens || 0)
+ (resultLine?.usage?.cache_read_input_tokens || 0);
const costEstimate: CostEstimate = {
inputChars,
@@ -183,5 +264,5 @@ export async function runSkillTest(options: {
turnsUsed,
};
return { messages, toolCalls, browseErrors, exitReason, duration, output: result?.result || stdout, costEstimate };
return { toolCalls, browseErrors, exitReason, duration, output: resultLine?.result || '', costEstimate, transcript };
}