redis cache

This commit is contained in:
tdurieux
2026-05-07 15:55:28 +03:00
parent b37a814f3a
commit 369fd8edb2
3 changed files with 98 additions and 51 deletions
+2 -2
View File
@@ -995,7 +995,7 @@ angular
getQueues();
getMetrics();
}
}, 5000);
}, 15000);
$scope.$on("$destroy", () => $interval.cancel(stop));
$scope.refreshNow = function () { getQueues(); getMetrics(); };
@@ -1732,7 +1732,7 @@ angular
load();
const stop = $interval(() => {
if ($scope.query.autoRefresh) load();
}, 5000);
}, 15000);
$scope.$on("$destroy", () => $interval.cancel(stop));
$scope.$watch("query.search", recompute);
+16 -6
View File
@@ -69,10 +69,18 @@ export interface MetricPoint {
avgMs: number;
}
const METRIC_FIELDS = ["c", "f", "ms"];
const metricsCache = new Map<string, { data: MetricPoint[]; ts: number }>();
const METRICS_CACHE_TTL = 30_000;
export async function queryMetrics(
queue: string,
rangeMinutes: number
): Promise<MetricPoint[]> {
const cacheKey = `${queue}:${rangeMinutes}`;
const cached = metricsCache.get(cacheKey);
if (cached && Date.now() - cached.ts < METRICS_CACHE_TTL) return cached.data;
const c = getClient();
if (!c || !c.isOpen) return [];
@@ -88,14 +96,14 @@ export async function queryMetrics(
try {
const pipe = c.multi();
for (const k of keys) pipe.hGetAll(k);
for (const k of keys) pipe.hmGet(k, METRIC_FIELDS);
const results = await pipe.exec();
return timestamps.map((ts, i) => {
const h = (results[i] as unknown as Record<string, string>) || {};
const completed = parseInt(h.c || "0", 10) || 0;
const failed = parseInt(h.f || "0", 10) || 0;
const totalMs = parseInt(h.ms || "0", 10) || 0;
const points = timestamps.map((ts, i) => {
const vals = (results[i] as unknown as (string | null)[]) || [];
const completed = parseInt(vals[0] || "0", 10) || 0;
const failed = parseInt(vals[1] || "0", 10) || 0;
const totalMs = parseInt(vals[2] || "0", 10) || 0;
const total = completed + failed;
return {
ts,
@@ -104,6 +112,8 @@ export async function queryMetrics(
avgMs: total > 0 ? Math.round(totalMs / total) : 0,
};
});
metricsCache.set(cacheKey, { data: points, ts: Date.now() });
return points;
} catch {
return [];
}
+80 -43
View File
@@ -28,6 +28,9 @@ import config from "../../config";
const logger = createLogger("admin");
let errorLogClient: RedisClientType | null = null;
let errorStatsCache: { data: unknown; ts: number } | null = null;
const ERROR_STATS_CACHE_TTL = 30_000;
async function getErrorLogClient(): Promise<RedisClientType | null> {
if (errorLogClient && errorLogClient.isOpen) return errorLogClient;
try {
@@ -69,6 +72,15 @@ router.use(
router.use("/tokens", adminTokensRouter);
function dashboardCache(
_req: express.Request,
res: express.Response,
next: express.NextFunction
) {
res.set("Cache-Control", "private, max-age=10, must-revalidate");
next();
}
const QUEUE_STATES = [
"waiting",
"active",
@@ -256,7 +268,16 @@ router.post("/queues/pause-all", async (_req, res) => {
}
});
const queueStatsCache = new Map<
string,
{ data: Record<string, unknown>; ts: number }
>();
const QUEUE_STATS_CACHE_TTL = 15_000;
async function queueStats(queueKey: string, queue: Queue) {
const cached = queueStatsCache.get(queueKey);
if (cached && Date.now() - cached.ts < QUEUE_STATS_CACHE_TTL) return cached.data;
const [counts, workers, paused, metrics24h] =
await Promise.all([
queue.getJobCounts(...QUEUE_STATES),
@@ -275,7 +296,7 @@ async function queueStats(queueKey: string, queue: Queue) {
failed24h += p.failed;
}
return {
const data = {
counts,
paused,
workers: workerCount,
@@ -283,6 +304,8 @@ async function queueStats(queueKey: string, queue: Queue) {
completed24h,
failed24h,
};
queueStatsCache.set(queueKey, { data, ts: Date.now() });
return data;
}
const RANGE_MINUTES: Record<string, number> = {
@@ -292,7 +315,7 @@ const RANGE_MINUTES: Record<string, number> = {
"7d": 10080,
};
router.get("/queues/metrics", async (req, res) => {
router.get("/queues/metrics", dashboardCache, async (req, res) => {
const queueName = String(req.query.queue || "download");
if (!pickQueue(queueName)) return res.status(404).json({ error: "queue_not_found" });
const range = String(req.query.range || "1h");
@@ -305,9 +328,17 @@ router.get("/queues/metrics", async (req, res) => {
}
});
router.get("/queues", async (req, res) => {
const queuesCache = new Map<string, { data: unknown; ts: number }>();
const QUEUES_CACHE_TTL = 10_000;
router.get("/queues", dashboardCache, async (req, res) => {
const search = req.query.search ? String(req.query.search).toLowerCase() : "";
const queueName = req.query.queue ? String(req.query.queue) : "";
const cacheKey = `${queueName}|${search}`;
const cached = queuesCache.get(cacheKey);
if (cached && Date.now() - cached.ts < QUEUES_CACHE_TTL) {
return res.json(cached.data);
}
const allQueues: { key: string; label: string; queue: Queue }[] = [
{ key: "download", label: "Download", queue: downloadQueue },
@@ -315,19 +346,31 @@ router.get("/queues", async (req, res) => {
{ key: "cache", label: "Cache cleanup", queue: cacheQueue },
];
const statsResults = await Promise.all(
allQueues.map(async (q) => ({
key: q.key,
label: q.label,
...(await queueStats(q.key, q.queue)),
}))
);
const target = queueName
? allQueues.find((q) => q.key === queueName)
: allQueues[0];
const targetQueue = target ? target.queue : downloadQueue;
const [statsResults, ...jobsByState] = await Promise.all([
Promise.all(
allQueues.map(async (q) => ({
key: q.key,
label: q.label,
...(await queueStats(q.key, q.queue)),
}))
),
...QUEUE_STATES.map(async (state) => {
const jobs = await targetQueue.getJobs([state], 0, 199);
return jobs.map((j) => {
const json: Record<string, unknown> = { ...j.asJSON(), _state: state };
if (state === "delayed" && j.delay > 0) {
json.delayUntil = j.timestamp + j.delay;
}
return json;
});
}),
]);
const matches = (job: { id?: string | undefined; name?: string }) => {
if (!search) return true;
return (
@@ -335,33 +378,20 @@ router.get("/queues", async (req, res) => {
(job.name || "").toLowerCase().includes(search)
);
};
const allJobs = (jobsByState as Record<string, unknown>[][]).flat().filter(matches);
// Fetch all states in parallel, tag each job with its state
const jobsByState = await Promise.all(
QUEUE_STATES.map(async (state) => {
const jobs = await targetQueue.getJobs([state]);
return jobs.map((j) => {
const json: Record<string, unknown> = { ...j.asJSON(), _state: state };
if (state === "delayed" && j.delay > 0) {
json.delayUntil = j.timestamp + j.delay;
}
return json;
});
})
);
const allJobs = jobsByState.flat().filter(matches);
// Sort: active first, then waiting, delayed, failed, completed
const stateOrder: Record<string, number> = {
active: 0, waiting: 1, delayed: 2, failed: 3, completed: 4,
};
allJobs.sort((a, b) => (stateOrder[a._state as string] ?? 9) - (stateOrder[b._state as string] ?? 9));
res.json({
const data = {
queues: statsResults,
selectedQueue: target?.key || "download",
jobs: allJobs,
});
};
queuesCache.set(cacheKey, { data, ts: Date.now() });
res.json(data);
});
// Errors captured by the logger sink. Server-paginated to avoid pulling
@@ -412,11 +442,18 @@ router.get("/errors", async (req, res) => {
// Aggregated stats from the precomputed hourly counters (HINCRBY on each
// persistError). No JSON parsing of stored entries — O(48 small HGETALLs).
router.get("/errors/stats", async (req, res) => {
router.get("/errors/stats", dashboardCache, async (req, res) => {
try {
if (
errorStatsCache &&
Date.now() - errorStatsCache.ts < ERROR_STATS_CACHE_TTL
) {
return res.json(errorStatsCache.data);
}
const client = await getErrorLogClient();
if (!client) {
return res.json({
const data = {
available: false,
last24h: 0,
prev24h: 0,
@@ -424,10 +461,10 @@ router.get("/errors/stats", async (req, res) => {
unique: { error: 0, warn: 0, info: 0 },
buckets: [],
dropped: getInProcessDropped(),
});
};
return res.json(data);
}
const now = new Date();
// Build the 48 hour keys to fetch (24 for current window + 24 for prev).
function hourKey(d: Date) {
const y = d.getUTCFullYear();
const m = String(d.getUTCMonth() + 1).padStart(2, "0");
@@ -440,8 +477,6 @@ router.get("/errors/stats", async (req, res) => {
const bucketHourTs: number[] = [];
for (let i = 23; i >= 0; i--) {
const d = new Date(now.getTime() - i * 3600 * 1000);
// Anchor each bar at the end of its hour so a "9s ago" event lands in
// the rightmost bar.
const anchor = new Date(
Date.UTC(
d.getUTCFullYear(),
@@ -467,17 +502,17 @@ router.get("/errors/stats", async (req, res) => {
}
const pipe = client.multi();
for (const k of currentKeys) pipe.hGetAll(k);
for (const k of prevKeys) pipe.hGetAll(k);
for (const k of prevKeys) pipe.hmGet(k, ["total"]);
pipe.get(ERROR_LOG_DROPPED_KEY);
const results = (await pipe.exec()) as unknown[];
const currentHashes = results.slice(0, currentKeys.length) as Record<
string,
string
>[];
const prevHashes = results.slice(
const prevTotals = results.slice(
currentKeys.length,
currentKeys.length + prevKeys.length
) as Record<string, string>[];
) as (string | null)[][];
const droppedRedis =
parseInt(String(results[results.length - 1] || "0"), 10) || 0;
@@ -504,7 +539,6 @@ router.get("/errors/stats", async (req, res) => {
sev.warn += w;
sev.info += inf;
last24h += parseInt(flat.total || "0", 10) || 0;
// cb:<bucket>:<code> fields → unique code sets.
for (const k of Object.keys(flat)) {
if (!k.startsWith("cb:")) continue;
const sep = k.indexOf(":", 3);
@@ -515,11 +549,11 @@ router.get("/errors/stats", async (req, res) => {
}
});
let prev24h = 0;
for (const h of prevHashes) {
prev24h += parseInt((h || {}).total || "0", 10) || 0;
for (const row of prevTotals) {
prev24h += parseInt((row && row[0]) || "0", 10) || 0;
}
res.json({
const data = {
available: true,
last24h,
prev24h,
@@ -531,7 +565,9 @@ router.get("/errors/stats", async (req, res) => {
},
buckets,
dropped: droppedRedis + getInProcessDropped(),
});
};
errorStatsCache = { data, ts: Date.now() };
res.json(data);
} catch (error) {
handleError(error, res, req);
}
@@ -559,6 +595,7 @@ router.delete("/errors", async (req, res) => {
pipe.del(ERROR_LOG_DROPPED_KEY);
if (hourlyKeys.length) pipe.del(hourlyKeys);
await pipe.exec();
errorStatsCache = null;
res.json({ ok: true, cleared: len, hourlyCleared: hourlyKeys.length });
} catch (error) {
handleError(error, res, req);