diff --git a/lib/llm-summarize.ts b/lib/llm-summarize.ts new file mode 100644 index 00000000..03c0d7fb --- /dev/null +++ b/lib/llm-summarize.ts @@ -0,0 +1,125 @@ +/** + * LLM session summarization via raw fetch() to Anthropic Messages API. + * + * No SDK dependency — matches the Supabase raw-fetch pattern. + * Uses eval-cache for SHA-based caching (reruns are instant). + * + * Retry strategy (per Anthropic docs): + * 429: read retry-after header, wait that duration, max 2 retries + * 5xx: exponential backoff (1s, 2s), max 2 retries + * All other errors: return null immediately + */ + +import { computeCacheKey, cacheRead, cacheWrite } from './eval-cache'; + +const ANTHROPIC_API_URL = 'https://api.anthropic.com/v1/messages'; +const MODEL = 'claude-haiku-4-5-20251001'; +const MAX_RETRIES = 2; +const TIMEOUT_MS = 10_000; + +/** + * Generate a 1-sentence summary of a Claude Code session. + * Returns null if: no API key, API error, or malformed response. + */ +export async function summarizeSession( + messages: Array<{ display: string; timestamp: number }>, + toolsUsed: string[] | null, +): Promise { + const apiKey = process.env.ANTHROPIC_API_KEY; + if (!apiKey) return null; + if (messages.length === 0) return null; + + // Build cache key from session content + const contentForHash = messages.map(m => m.display).join('\n').slice(0, 10_000); + const toolsStr = toolsUsed ? toolsUsed.join(',') : ''; + const cacheKey = computeCacheKey([], `summary:${MODEL}:${contentForHash}:${toolsStr}`); + + const cached = cacheRead('transcript-summaries', cacheKey); + if (cached !== null && typeof cached === 'string') return cached; + + const promptLines = messages.slice(0, 50).map(m => + m.display.length > 200 ? m.display.slice(0, 200) + '...' : m.display, + ); + const toolInfo = toolsUsed && toolsUsed.length > 0 + ? `\nTools used: ${toolsUsed.join(', ')}` + : ''; + + const userPrompt = `Summarize this Claude Code session in exactly one sentence. Focus on what the user accomplished, not the process. Be specific and concise. + +User prompts (${messages.length} turns): +${promptLines.join('\n')} +${toolInfo} + +Respond with ONLY the summary sentence, nothing else.`; + + const body = JSON.stringify({ + model: MODEL, + max_tokens: 150, + messages: [{ role: 'user', content: userPrompt }], + }); + + const summary = await fetchWithRetry(apiKey, body); + if (summary) { + cacheWrite('transcript-summaries', cacheKey, summary, { model: MODEL }); + } + return summary; +} + +async function fetchWithRetry(apiKey: string, body: string): Promise { + for (let attempt = 0; attempt <= MAX_RETRIES; attempt++) { + try { + const controller = new AbortController(); + const timeout = setTimeout(() => controller.abort(), TIMEOUT_MS); + + const res = await fetch(ANTHROPIC_API_URL, { + method: 'POST', + signal: controller.signal, + headers: { + 'Content-Type': 'application/json', + 'x-api-key': apiKey, + 'anthropic-version': '2023-06-01', + }, + body, + }); + + clearTimeout(timeout); + + if (res.ok) { + const data = await res.json() as Record; + const content = (data.content as any[])?.[0]; + if (content?.type === 'text' && typeof content.text === 'string') { + return content.text.trim().slice(0, 500); + } + return null; + } + + // 429: use retry-after header + if (res.status === 429 && attempt < MAX_RETRIES) { + const retryAfter = parseInt(res.headers.get('retry-after') || '2', 10); + await sleep(retryAfter * 1000); + continue; + } + + // 5xx: exponential backoff + if (res.status >= 500 && attempt < MAX_RETRIES) { + await sleep(1000 * Math.pow(2, attempt)); + continue; + } + + // 4xx (not 429): don't retry + return null; + } catch { + // Network error, timeout, abort — retry with backoff + if (attempt < MAX_RETRIES) { + await sleep(1000 * Math.pow(2, attempt)); + continue; + } + return null; + } + } + return null; +} + +function sleep(ms: number): Promise { + return new Promise(resolve => setTimeout(resolve, ms)); +} diff --git a/lib/sync.ts b/lib/sync.ts index ca7f5c6b..72e63c2b 100644 --- a/lib/sync.ts +++ b/lib/sync.ts @@ -213,6 +213,11 @@ export function pushHeartbeat(): Promise { return pushWithSync('sync_heartbeats', { hostname: os.hostname() }, { addRepoSlug: false }); } +/** Push a session transcript to Supabase. repo_slug is in the data (from getRemoteSlugForPath). */ +export function pushTranscript(data: Record): Promise { + return pushWithSync('session_transcripts', data, { addRepoSlug: false }); +} + // --- Pull operations --- /** @@ -277,6 +282,18 @@ export async function pullRetros(opts?: { repoSlug?: string; limit?: number }): return pullTable('retro_snapshots', parts.join('&')); } +/** Pull team session transcripts. */ +export async function pullTranscripts(opts?: { repoSlug?: string; limit?: number }): Promise[]> { + const config = resolveSyncConfig(); + if (!config) return []; + + const parts = [`team_id=eq.${config.auth.team_id}`, 'order=started_at.desc']; + if (opts?.repoSlug) parts.push(`repo_slug=eq.${opts.repoSlug}`); + parts.push(`limit=${opts?.limit || 50}`); + + return pullTable('session_transcripts', parts.join('&')); +} + // --- Offline queue --- function enqueue(entry: QueueEntry): void { diff --git a/lib/transcript-sync.ts b/lib/transcript-sync.ts new file mode 100644 index 00000000..1ae5d37e --- /dev/null +++ b/lib/transcript-sync.ts @@ -0,0 +1,395 @@ +/** + * Transcript sync — parse Claude Code session history, enrich with + * tool usage and LLM summaries, push to Supabase. + * + * Data sources: + * ~/.claude/history.jsonl — user prompts (always available) + * ~/.claude/projects/{hash}/{sid}.jsonl — full transcript (when available, ~19%) + * + * Degradation cascade: + * history.jsonl only → user prompts, turn count, duration + * + session file → + tools_used, full turn count + * + ANTHROPIC_API_KEY → + 1-sentence LLM summary + * + * All operations are non-fatal. If any step fails, we degrade gracefully. + */ + +import * as fs from 'fs'; +import * as path from 'path'; +import * as os from 'os'; +import { readJSON, atomicWriteJSON, GSTACK_STATE_DIR } from './util'; +import { resolveSyncConfig } from './sync-config'; +import { pushTranscript } from './sync'; +import { summarizeSession } from './llm-summarize'; + +const HISTORY_FILE = path.join(os.homedir(), '.claude', 'history.jsonl'); +const CLAUDE_PROJECTS_DIR = path.join(os.homedir(), '.claude', 'projects'); +const MARKER_FILE = path.join(GSTACK_STATE_DIR, 'transcript-sync-marker.json'); +const MAX_HISTORY_SIZE = 50 * 1024 * 1024; // 50MB warn threshold +const MAX_SESSION_FILE_SIZE = 10 * 1024 * 1024; // 10MB skip threshold +const PUSH_CONCURRENCY = 10; +const SUMMARY_CONCURRENCY = 5; + +// --- Types --- + +export interface HistoryEntry { + display: string; + pastedContents: Record; + timestamp: number; + project: string; + sessionId: string; +} + +export interface TranscriptSyncMarker { + pushed_sessions: Record; + last_file_size: number; + updated_at: string; +} + +export interface SessionFileData { + tools_used: string[]; + totalTurns: number; +} + +export interface TranscriptData { + session_id: string; + repo_slug: string; + messages: Array<{ display: string; timestamp: number }>; + total_turns: number; + tools_used: string[] | null; + summary: string | null; + started_at: string; + ended_at: string; +} + +// --- History parsing --- + +/** + * Parse ~/.claude/history.jsonl into HistoryEntry[]. + * Returns [] on ENOENT, EBUSY, EACCES, or any error. Skips malformed lines. + */ +export function parseHistoryFile(historyPath: string = HISTORY_FILE): HistoryEntry[] { + try { + const stat = fs.statSync(historyPath); + if (stat.size > MAX_HISTORY_SIZE) { + console.error(`Warning: history.jsonl is ${(stat.size / 1024 / 1024).toFixed(1)}MB — parsing may be slow.`); + } + const content = fs.readFileSync(historyPath, 'utf-8'); + const entries: HistoryEntry[] = []; + for (const line of content.split('\n')) { + if (!line.trim()) continue; + try { + const d = JSON.parse(line); + if (d.sessionId && d.timestamp && d.project) { + entries.push({ + display: typeof d.display === 'string' ? d.display : '', + pastedContents: d.pastedContents || {}, + timestamp: d.timestamp, + project: d.project, + sessionId: d.sessionId, + }); + } + } catch { /* skip malformed line */ } + } + return entries; + } catch { + return []; + } +} + +/** + * Group history entries by sessionId. + */ +export function groupBySession(entries: HistoryEntry[]): Map { + const map = new Map(); + for (const entry of entries) { + const group = map.get(entry.sessionId); + if (group) { + group.push(entry); + } else { + map.set(entry.sessionId, [entry]); + } + } + return map; +} + +// --- Session file enrichment --- + +/** + * Find the rich session file for a given sessionId and project path. + * Returns the file path or null if not found. + * + * Claude Code stores session files at: + * ~/.claude/projects/-{project.replaceAll('/', '-')}/{sessionId}.jsonl + */ +export function findSessionFile(sessionId: string, projectPath: string): string | null { + try { + const projectHash = '-' + projectPath.replace(/\//g, '-'); + const sessionFile = path.join(CLAUDE_PROJECTS_DIR, projectHash, `${sessionId}.jsonl`); + + // Security: validate the resolved path stays within ~/.claude/projects/ + const resolved = path.resolve(sessionFile); + if (!resolved.startsWith(path.resolve(CLAUDE_PROJECTS_DIR))) return null; + + if (!fs.existsSync(sessionFile)) return null; + + const stat = fs.statSync(sessionFile); + if (stat.size > MAX_SESSION_FILE_SIZE) return null; // Skip large files + if (stat.size === 0) return null; + + return sessionFile; + } catch { + return null; + } +} + +/** + * Parse a session JSONL file to extract tool usage and turn counts. + */ +export function parseSessionFile(sessionFilePath: string): SessionFileData | null { + try { + const content = fs.readFileSync(sessionFilePath, 'utf-8'); + const toolSet = new Set(); + let totalTurns = 0; + + for (const line of content.split('\n')) { + if (!line.trim()) continue; + try { + const d = JSON.parse(line); + const type = d.type; + if (type === 'user' || type === 'assistant') { + totalTurns++; + } + if (type === 'assistant') { + const content = d.message?.content; + if (Array.isArray(content)) { + for (const block of content) { + if (block?.type === 'tool_use' && typeof block.name === 'string') { + toolSet.add(block.name); + } + } + } + } + } catch { /* skip malformed line */ } + } + + return { + tools_used: Array.from(toolSet).sort(), + totalTurns, + }; + } catch { + return null; + } +} + +// --- Repo slug resolution --- + +const slugCache = new Map(); + +/** + * Get the repo slug for a project path. Memoized. + * Runs `git remote get-url origin` with cwd set to the project path. + * Falls back to path.basename() if git fails. + */ +export function getRemoteSlugForPath(projectPath: string): string { + const cached = slugCache.get(projectPath); + if (cached) return cached; + + let slug = path.basename(projectPath); + try { + if (fs.existsSync(projectPath)) { + const { spawnSync } = require('child_process'); + const result = spawnSync('git', ['remote', 'get-url', 'origin'], { + cwd: projectPath, + stdio: 'pipe', + timeout: 3_000, + }); + if (result.status === 0 && result.stdout) { + const url = result.stdout.toString().trim(); + // Parse "git@github.com:org/repo.git" or "https://github.com/org/repo.git" + const match = url.match(/[/:]([\w.-]+\/[\w.-]+?)(?:\.git)?$/); + if (match) slug = match[1]; + } + } + } catch { /* fall back to basename */ } + + slugCache.set(projectPath, slug); + return slug; +} + +/** Clear the slug cache (for testing). */ +export function clearSlugCache(): void { + slugCache.clear(); +} + +// --- Transcript data assembly --- + +/** + * Convert a session's data into the shape expected by the session_transcripts table. + */ +export function sessionToTranscriptData( + sessionId: string, + historyEntries: HistoryEntry[], + sessionFileData: SessionFileData | null, + summary: string | null, +): TranscriptData { + const messages = historyEntries.map(e => ({ + display: e.display.length > 2000 ? e.display.slice(0, 2000) : e.display, + timestamp: e.timestamp, + })); + + const timestamps = historyEntries.map(e => e.timestamp); + const startedAt = new Date(Math.min(...timestamps)).toISOString(); + const endedAt = new Date(Math.max(...timestamps)).toISOString(); + + return { + session_id: sessionId, + repo_slug: getRemoteSlugForPath(historyEntries[0].project), + messages, + total_turns: sessionFileData?.totalTurns || historyEntries.length, + tools_used: sessionFileData?.tools_used || null, + summary, + started_at: startedAt, + ended_at: endedAt, + }; +} + +// --- Sync marker --- + +export function readSyncMarker(): TranscriptSyncMarker | null { + return readJSON(MARKER_FILE); +} + +export function writeSyncMarker(marker: TranscriptSyncMarker): void { + try { + fs.mkdirSync(GSTACK_STATE_DIR, { recursive: true }); + atomicWriteJSON(MARKER_FILE, marker); + } catch { /* non-fatal */ } +} + +// --- Orchestrator --- + +/** + * Main sync function. Parses history, enriches sessions, pushes to Supabase. + * Returns stats. All operations are non-fatal. + */ +export async function syncTranscripts(): Promise<{ pushed: number; skipped: number; errors: number }> { + const config = resolveSyncConfig(); + if (!config || !config.syncTranscripts) { + return { pushed: 0, skipped: 0, errors: 0 }; + } + + // Quick check: file size unchanged = nothing new + let fileSize = 0; + try { + fileSize = fs.statSync(HISTORY_FILE).size; + } catch { + return { pushed: 0, skipped: 0, errors: 0 }; + } + + const marker = readSyncMarker() || { + pushed_sessions: {}, + last_file_size: 0, + updated_at: '', + }; + + if (fileSize === marker.last_file_size) { + return { pushed: 0, skipped: 0, errors: 0 }; + } + + // Parse and group + const entries = parseHistoryFile(); + if (entries.length === 0) return { pushed: 0, skipped: 0, errors: 0 }; + + const sessions = groupBySession(entries); + + // Filter to sessions that need pushing + const toPush: Array<{ sessionId: string; entries: HistoryEntry[] }> = []; + let skipped = 0; + for (const [sessionId, sessionEntries] of sessions) { + const prev = marker.pushed_sessions[sessionId]; + if (prev && prev.turns_pushed >= sessionEntries.length) { + skipped++; + continue; + } + toPush.push({ sessionId, entries: sessionEntries }); + } + + if (toPush.length === 0) { + // Update file size even if nothing to push (prevents re-parsing) + marker.last_file_size = fileSize; + marker.updated_at = new Date().toISOString(); + writeSyncMarker(marker); + return { pushed: 0, skipped, errors: 0 }; + } + + // Enrich with session files + const enriched = toPush.map(({ sessionId, entries: sessionEntries }) => { + const sessionFile = findSessionFile(sessionId, sessionEntries[0].project); + const sessionFileData = sessionFile ? parseSessionFile(sessionFile) : null; + return { sessionId, entries: sessionEntries, sessionFileData }; + }); + + // Summarize in batches (5-concurrent) + const withSummaries: Array<{ + sessionId: string; + entries: HistoryEntry[]; + sessionFileData: SessionFileData | null; + summary: string | null; + }> = []; + + for (let i = 0; i < enriched.length; i += SUMMARY_CONCURRENCY) { + const batch = enriched.slice(i, i + SUMMARY_CONCURRENCY); + const summaries = await Promise.allSettled( + batch.map(({ entries: sessionEntries, sessionFileData }) => { + const messages = sessionEntries.map(e => ({ + display: e.display.length > 200 ? e.display.slice(0, 200) : e.display, + timestamp: e.timestamp, + })); + return summarizeSession(messages, sessionFileData?.tools_used || null); + }), + ); + + batch.forEach((item, idx) => { + const result = summaries[idx]; + withSummaries.push({ + ...item, + summary: result.status === 'fulfilled' ? result.value : null, + }); + }); + } + + // Push in batches (10-concurrent) + let pushed = 0; + let errors = 0; + + for (let i = 0; i < withSummaries.length; i += PUSH_CONCURRENCY) { + const batch = withSummaries.slice(i, i + PUSH_CONCURRENCY); + const results = await Promise.allSettled( + batch.map(({ sessionId, entries: sessionEntries, sessionFileData, summary }) => { + const data = sessionToTranscriptData(sessionId, sessionEntries, sessionFileData, summary); + return pushTranscript(data as Record); + }), + ); + + results.forEach((result, idx) => { + const item = batch[idx]; + if (result.status === 'fulfilled' && result.value) { + pushed++; + marker.pushed_sessions[item.sessionId] = { + turns_pushed: item.entries.length, + last_push: new Date().toISOString(), + }; + } else { + errors++; + } + }); + } + + // Update marker + marker.last_file_size = fileSize; + marker.updated_at = new Date().toISOString(); + writeSyncMarker(marker); + + return { pushed, skipped, errors }; +} diff --git a/supabase/migrations/006_transcript_sync.sql b/supabase/migrations/006_transcript_sync.sql new file mode 100644 index 00000000..1b8f9fa5 --- /dev/null +++ b/supabase/migrations/006_transcript_sync.sql @@ -0,0 +1,15 @@ +-- 006_transcript_sync.sql — Unique index for idempotent transcript upsert + RLS fix. + +-- Unique index on (team_id, session_id) for upsert via Prefer: resolution=merge-duplicates. +-- session_id is a UUID from Claude Code — globally unique. No need for user_id in the key +-- (which is nullable and breaks PostgreSQL unique index dedup on NULL values). +create unique index if not exists idx_transcript_natural_key + on session_transcripts(team_id, session_id); + +-- Change transcript RLS from admin-only read to team-wide read. +-- Matches the pattern used by eval_runs, retro_snapshots, qa_reports, ship_logs, greptile_triage. +-- Opt-in transcript sync already requires user consent (sync_transcripts=true). +drop policy if exists "admin_read" on session_transcripts; +create policy "team_read" on session_transcripts for select using ( + team_id in (select team_id from team_members where user_id = auth.uid()) +);