// proxy.ts // Minimal OpenAI-compatible proxy with streaming & PostgreSQL logging. // Node 18+ required. import "dotenv/config"; import http, { IncomingMessage, ServerResponse } from "http"; import { TextDecoder } from "util"; import { Pool } from "pg"; import { calculateCost } from "./cost"; const PORT = Number(process.env.PORT || 3007); const UPSTREAM_URL = (process.env.UPSTREAM_URL || "").replace(/\/+$/, ""); // --- PostgreSQL connection --- const pool = new Pool({ connectionString: process.env.DATABASE_URL, }); // --- Helper functions --- function setCors(res: ServerResponse) { res.setHeader("Access-Control-Allow-Origin", "*"); res.setHeader("Access-Control-Allow-Headers", "Authorization, Content-Type, X-Requested-With"); res.setHeader("Access-Control-Allow-Methods", "GET,POST,OPTIONS"); } function readBody(req: IncomingMessage): Promise { return new Promise((resolve, reject) => { const chunks: Buffer[] = []; req.on("data", (c: Buffer) => chunks.push(c)); req.on("end", () => resolve(Buffer.concat(chunks))); req.on("error", reject); }); } function okPath(path: string) { return path.startsWith("/chat/completions") || path.startsWith("/completions") || path.startsWith("/models"); } // --- Logging to Postgres --- async function logToPG(data: Record) { const keys = Object.keys(data); const cols = keys.map(k => `"${k}"`).join(","); const vals = keys.map((_, i) => `$${i + 1}`).join(","); const values = Object.values(data); await pool.query(`INSERT INTO ${process.env.DATABASE_TABLE || "llm_proxy"} (${cols}) VALUES (${vals})`, values); } // --- Main proxy server --- const server = http.createServer(async (req, res) => { const start = Date.now(); setCors(res); if (req.method === "OPTIONS") { res.statusCode = 204; return res.end(); } try { const url = new URL(req.url || "/", `http://${req.headers.host}`); const path = url.pathname; const method = req.method || "GET"; if (!okPath(path)) { res.statusCode = 404; res.end(JSON.stringify({ error: "Not found" })); return; } const auth = req.headers["authorization"]; if (!auth?.toString().startsWith("Bearer ")) { res.statusCode = 401; res.end(JSON.stringify({ error: "Missing or invalid Authorization header" })); return; } const bodyBuf = method === "POST" ? await readBody(req) : Buffer.from(""); const requestJson = bodyBuf.length ? JSON.parse(bodyBuf.toString()) : null; const upstreamRes = await fetch(UPSTREAM_URL + path + url.search, { method, headers: { "Content-Type": (req.headers["content-type"] as string) || "application/json", Authorization: auth.toString(), }, // @ts-ignore duplex: "half", body: method === "POST" ? bodyBuf.toString() : undefined, }); const contentType = upstreamRes.headers.get("content-type") || "application/json"; res.statusCode = upstreamRes.status; res.setHeader("Content-Type", contentType); // --- Streaming or non-streaming response handling --- let responseBody: any = null; if (contentType.includes("text/event-stream")) { res.setHeader("Cache-Control", "no-cache"); res.setHeader("Connection", "keep-alive"); res.setHeader("Transfer-Encoding", "chunked"); const reader = upstreamRes.body?.getReader(); const decoder = new TextDecoder(); let rawText = ""; let usageFromStream: any = null; if (reader) { let buffer = ""; while (true) { const { done, value } = await reader.read(); if (done) break; if (value) { const chunk = decoder.decode(value); res.write(value); rawText += chunk; buffer += chunk; const lines = buffer.split(/\r?\n/); buffer = lines.pop() || ""; for (const line of lines) { if (!line.startsWith("data:")) continue; const jsonStr = line.slice(5).trim(); if (jsonStr === "[DONE]") continue; try { const obj = JSON.parse(jsonStr); if (obj.usage) usageFromStream = obj.usage; } catch { /* ignore partial lines */ } } } } } res.end(); // Use whatever we captured from the stream responseBody = { streamed: true, preview: rawText.slice(0, 5000), usage: usageFromStream, }; } else { const text = await upstreamRes.text(); res.end(text); try { responseBody = JSON.parse(text); } catch { responseBody = text; } } // --- Token usage and metadata --- const usage = responseBody?.usage || {}; const totalCost = calculateCost(requestJson?.model || "default", usage); const logData = { timestamp: new Date(), request_method: method, request_path: path, model: requestJson?.model || "default", completion_tokens: usage.completion_tokens || null, prompt_tokens: usage.prompt_tokens || null, total_tokens: usage.total_tokens || null, cached_tokens: usage.cached_tokens || null, total_cost: totalCost, response_time: Date.now() - start, request_body: requestJson, response_body: responseBody, response_status: upstreamRes.status, provider_url: UPSTREAM_URL, provider: "Upstream", }; logToPG(logData).catch(err => console.error("PG log error:", err)); } catch (err: any) { console.error("Proxy error:", err); res.statusCode = 502; res.end(JSON.stringify({ error: "Proxy error", message: err?.message })); } }); server.listen(PORT, () => { console.log(`✅ Proxy running at http://localhost:${PORT}`); });