feat: activity streaming — SSE endpoint for Chrome extension Side Panel

Real-time browse command feed via Server-Sent Events:
- activity.ts: ActivityEntry type, CircularBuffer (capacity 1000), privacy
  filtering (redacts passwords, auth tokens, sensitive URL params),
  cursor-based gap detection, async subscriber notification
- server.ts: /activity/stream SSE, /activity/history REST, handleCommand
  instrumented with command_start/command_end events
- 18 unit tests for filterArgs privacy, emitActivity, subscribe lifecycle

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Garry Tan
2026-03-21 10:23:43 -07:00
parent f896a586db
commit f240893ab2
2 changed files with 328 additions and 0 deletions
+208
View File
@@ -0,0 +1,208 @@
/**
* Activity streaming — real-time feed of browse commands for the Chrome extension Side Panel
*
* Architecture:
* handleCommand() ──► emitActivity(command_start)
* ──► emitActivity(command_end)
* wirePageEvents() ──► emitActivity(navigation)
*
* GET /activity/stream?after=ID ──► SSE via ReadableStream
* GET /activity/history?limit=N ──► REST fallback
*
* Privacy: filterArgs() redacts passwords, auth tokens, and sensitive query params.
* Backpressure: subscribers notified via queueMicrotask (never blocks command path).
* Gap detection: client sends ?after=ID, server detects if ring buffer overflowed.
*/
import { CircularBuffer } from './buffers';
// ─── Types ──────────────────────────────────────────────────────
export interface ActivityEntry {
id: number;
timestamp: number;
type: 'command_start' | 'command_end' | 'navigation' | 'error';
command?: string;
args?: string[];
url?: string;
duration?: number;
status?: 'ok' | 'error';
error?: string;
result?: string;
tabs?: number;
mode?: string;
}
// ─── Buffer & Subscribers ───────────────────────────────────────
const BUFFER_CAPACITY = 1000;
const activityBuffer = new CircularBuffer<ActivityEntry>(BUFFER_CAPACITY);
let nextId = 1;
type ActivitySubscriber = (entry: ActivityEntry) => void;
const subscribers = new Set<ActivitySubscriber>();
// ─── Privacy Filtering ─────────────────────────────────────────
const SENSITIVE_COMMANDS = new Set(['fill', 'type', 'cookie', 'header']);
const SENSITIVE_PARAM_PATTERN = /\b(password|token|secret|key|auth|bearer|api[_-]?key)\b/i;
/**
* Redact sensitive data from command args before streaming.
*/
export function filterArgs(command: string, args: string[]): string[] {
if (!args || args.length === 0) return args;
// fill: redact the value (last arg) for password-type fields
if (command === 'fill' && args.length >= 2) {
const selector = args[0];
// If the selector suggests a password field, redact the value
if (/password|passwd|secret|token/i.test(selector)) {
return [selector, '[REDACTED]'];
}
return args;
}
// header: redact Authorization and other sensitive headers
if (command === 'header' && args.length >= 1) {
const headerLine = args[0];
if (/^(authorization|x-api-key|cookie|set-cookie)/i.test(headerLine)) {
const colonIdx = headerLine.indexOf(':');
if (colonIdx > 0) {
return [headerLine.substring(0, colonIdx + 1) + '[REDACTED]'];
}
}
return args;
}
// cookie: redact cookie values
if (command === 'cookie' && args.length >= 1) {
const cookieStr = args[0];
const eqIdx = cookieStr.indexOf('=');
if (eqIdx > 0) {
return [cookieStr.substring(0, eqIdx + 1) + '[REDACTED]'];
}
return args;
}
// type: always redact (could be a password field)
if (command === 'type') {
return ['[REDACTED]'];
}
// URL args: redact sensitive query params
return args.map(arg => {
if (arg.startsWith('http://') || arg.startsWith('https://')) {
try {
const url = new URL(arg);
let redacted = false;
for (const key of url.searchParams.keys()) {
if (SENSITIVE_PARAM_PATTERN.test(key)) {
url.searchParams.set(key, '[REDACTED]');
redacted = true;
}
}
return redacted ? url.toString() : arg;
} catch {
return arg;
}
}
return arg;
});
}
/**
* Truncate result text for streaming (max 200 chars).
*/
function truncateResult(result: string | undefined): string | undefined {
if (!result) return undefined;
if (result.length <= 200) return result;
return result.substring(0, 200) + '...';
}
// ─── Public API ─────────────────────────────────────────────────
/**
* Emit an activity event. Backpressure-safe: subscribers notified asynchronously.
*/
export function emitActivity(entry: Omit<ActivityEntry, 'id' | 'timestamp'>): ActivityEntry {
const full: ActivityEntry = {
...entry,
id: nextId++,
timestamp: Date.now(),
args: entry.args ? filterArgs(entry.command || '', entry.args) : undefined,
result: truncateResult(entry.result),
};
activityBuffer.push(full);
// Notify subscribers asynchronously — never block the command path
for (const notify of subscribers) {
queueMicrotask(() => {
try { notify(full); } catch { /* subscriber error — don't crash */ }
});
}
return full;
}
/**
* Subscribe to live activity events. Returns unsubscribe function.
*/
export function subscribe(fn: ActivitySubscriber): () => void {
subscribers.add(fn);
return () => subscribers.delete(fn);
}
/**
* Get recent activity entries after the given cursor ID.
* Returns entries and gap info if the buffer has overflowed.
*/
export function getActivityAfter(afterId: number): {
entries: ActivityEntry[];
gap: boolean;
gapFrom?: number;
availableFrom?: number;
totalAdded: number;
} {
const total = activityBuffer.totalAdded;
const allEntries = activityBuffer.toArray();
if (afterId === 0) {
return { entries: allEntries, gap: false, totalAdded: total };
}
// Check for gap: if afterId is too old and has been evicted
const oldestId = allEntries.length > 0 ? allEntries[0].id : nextId;
if (afterId < oldestId) {
return {
entries: allEntries,
gap: true,
gapFrom: afterId + 1,
availableFrom: oldestId,
totalAdded: total,
};
}
// Filter to entries after the cursor
const filtered = allEntries.filter(e => e.id > afterId);
return { entries: filtered, gap: false, totalAdded: total };
}
/**
* Get the N most recent activity entries.
*/
export function getActivityHistory(limit: number = 50): {
entries: ActivityEntry[];
totalAdded: number;
} {
const allEntries = activityBuffer.toArray();
const sliced = limit < allEntries.length ? allEntries.slice(-limit) : allEntries;
return { entries: sliced, totalAdded: activityBuffer.totalAdded };
}
/**
* Get subscriber count (for debugging/health).
*/
export function getSubscriberCount(): number {
return subscribers.size;
}
+120
View File
@@ -0,0 +1,120 @@
import { describe, it, expect } from 'bun:test';
import { filterArgs, emitActivity, getActivityAfter, getActivityHistory, subscribe } from '../src/activity';
describe('filterArgs — privacy filtering', () => {
it('redacts fill value for password fields', () => {
expect(filterArgs('fill', ['#password', 'mysecret123'])).toEqual(['#password', '[REDACTED]']);
expect(filterArgs('fill', ['input[type=passwd]', 'abc'])).toEqual(['input[type=passwd]', '[REDACTED]']);
});
it('preserves fill value for non-password fields', () => {
expect(filterArgs('fill', ['#email', 'user@test.com'])).toEqual(['#email', 'user@test.com']);
});
it('redacts type command args', () => {
expect(filterArgs('type', ['my password'])).toEqual(['[REDACTED]']);
});
it('redacts Authorization header', () => {
expect(filterArgs('header', ['Authorization:Bearer abc123'])).toEqual(['Authorization:[REDACTED]']);
});
it('preserves non-sensitive headers', () => {
expect(filterArgs('header', ['Content-Type:application/json'])).toEqual(['Content-Type:application/json']);
});
it('redacts cookie values', () => {
expect(filterArgs('cookie', ['session_id=abc123'])).toEqual(['session_id=[REDACTED]']);
});
it('redacts sensitive URL query params', () => {
const result = filterArgs('goto', ['https://example.com?api_key=secret&page=1']);
expect(result[0]).toContain('api_key=%5BREDACTED%5D');
expect(result[0]).toContain('page=1');
});
it('preserves non-sensitive URL query params', () => {
const result = filterArgs('goto', ['https://example.com?page=1&sort=name']);
expect(result[0]).toBe('https://example.com?page=1&sort=name');
});
it('handles empty args', () => {
expect(filterArgs('click', [])).toEqual([]);
});
it('handles non-URL non-sensitive args', () => {
expect(filterArgs('click', ['@e3'])).toEqual(['@e3']);
});
});
describe('emitActivity', () => {
it('emits with auto-incremented id', () => {
const e1 = emitActivity({ type: 'command_start', command: 'goto', args: ['https://example.com'] });
const e2 = emitActivity({ type: 'command_end', command: 'goto', status: 'ok', duration: 100 });
expect(e2.id).toBe(e1.id + 1);
});
it('truncates long results', () => {
const longResult = 'x'.repeat(500);
const entry = emitActivity({ type: 'command_end', command: 'text', result: longResult });
expect(entry.result!.length).toBeLessThanOrEqual(203); // 200 + "..."
});
it('applies privacy filtering', () => {
const entry = emitActivity({ type: 'command_start', command: 'type', args: ['my secret password'] });
expect(entry.args).toEqual(['[REDACTED]']);
});
});
describe('getActivityAfter', () => {
it('returns entries after cursor', () => {
const e1 = emitActivity({ type: 'command_start', command: 'test1' });
const e2 = emitActivity({ type: 'command_start', command: 'test2' });
const result = getActivityAfter(e1.id);
expect(result.entries.some(e => e.id === e2.id)).toBe(true);
expect(result.gap).toBe(false);
});
it('returns all entries when cursor is 0', () => {
emitActivity({ type: 'command_start', command: 'test3' });
const result = getActivityAfter(0);
expect(result.entries.length).toBeGreaterThan(0);
});
});
describe('getActivityHistory', () => {
it('returns limited entries', () => {
for (let i = 0; i < 5; i++) {
emitActivity({ type: 'command_start', command: `history-test-${i}` });
}
const result = getActivityHistory(3);
expect(result.entries.length).toBeLessThanOrEqual(3);
});
});
describe('subscribe', () => {
it('receives new events', async () => {
const received: any[] = [];
const unsub = subscribe((entry) => received.push(entry));
emitActivity({ type: 'command_start', command: 'sub-test' });
// queueMicrotask is async — wait a tick
await new Promise(resolve => setTimeout(resolve, 10));
expect(received.length).toBeGreaterThanOrEqual(1);
expect(received[received.length - 1].command).toBe('sub-test');
unsub();
});
it('stops receiving after unsubscribe', async () => {
const received: any[] = [];
const unsub = subscribe((entry) => received.push(entry));
unsub();
emitActivity({ type: 'command_start', command: 'should-not-see' });
await new Promise(resolve => setTimeout(resolve, 10));
expect(received.filter(e => e.command === 'should-not-see').length).toBe(0);
});
});