mirror of
https://github.com/tdurieux/anonymous_github.git
synced 2026-06-29 18:50:00 +02:00
improve queue
This commit is contained in:
+161
-3
@@ -1,5 +1,6 @@
|
||||
import { Octokit } from "@octokit/rest";
|
||||
import { throttling } from "@octokit/plugin-throttling";
|
||||
import { createClient, RedisClientType } from "redis";
|
||||
|
||||
import AnonymousError from "./AnonymousError";
|
||||
import Repository from "./Repository";
|
||||
@@ -53,6 +54,155 @@ function rateLimitDetail(err: OctokitRequestErrorLike): string {
|
||||
|
||||
const ThrottledOctokit = Octokit.plugin(throttling);
|
||||
|
||||
/**
|
||||
* Per-token gate that blocks all callers when a rate limit is active.
|
||||
* When any request for a given token hits a rate limit, the gate records
|
||||
* the reset time and makes every subsequent caller wait until then —
|
||||
* preventing a stampede of doomed requests.
|
||||
*/
|
||||
const tokenGates = new Map<string, { resetAt: number }>();
|
||||
|
||||
function setTokenGate(token: string, retryAfterSec: number) {
|
||||
const key = token.slice(-8);
|
||||
const resetAt = Date.now() + retryAfterSec * 1000;
|
||||
const existing = tokenGates.get(key);
|
||||
if (!existing || resetAt > existing.resetAt) {
|
||||
tokenGates.set(key, { resetAt });
|
||||
logger.warn("rate limit gate set", {
|
||||
code: "rate_limit_gate",
|
||||
retryAfterSec,
|
||||
resetAt: new Date(resetAt).toISOString(),
|
||||
});
|
||||
setRedisGate(retryAfterSec).catch(() => {});
|
||||
}
|
||||
}
|
||||
|
||||
export class RateLimitDelayError extends Error {
|
||||
resetAt: number;
|
||||
constructor(resetAt: number) {
|
||||
const delaySec = Math.ceil((resetAt - Date.now()) / 1000);
|
||||
super(`github_rate_limit_delay:${delaySec}s`);
|
||||
this.name = "RateLimitDelayError";
|
||||
this.resetAt = resetAt;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a rate limit gate is active for a token.
|
||||
* Returns the reset timestamp, or 0 if no gate is active.
|
||||
*/
|
||||
export function getTokenGateResetAt(token: string): number {
|
||||
const key = token.slice(-8);
|
||||
const gate = tokenGates.get(key);
|
||||
if (!gate) return 0;
|
||||
if (gate.resetAt <= Date.now()) {
|
||||
tokenGates.delete(key);
|
||||
return 0;
|
||||
}
|
||||
return gate.resetAt;
|
||||
}
|
||||
|
||||
async function waitForTokenGate(token: string): Promise<void> {
|
||||
const key = token.slice(-8);
|
||||
const localGate = tokenGates.get(key);
|
||||
let waitMs = 0;
|
||||
let resetAt = 0;
|
||||
|
||||
if (localGate && localGate.resetAt > Date.now()) {
|
||||
resetAt = localGate.resetAt;
|
||||
waitMs = resetAt - Date.now();
|
||||
}
|
||||
|
||||
const redisResetAt = await getRedisGateResetAt();
|
||||
if (redisResetAt > resetAt) {
|
||||
resetAt = redisResetAt;
|
||||
waitMs = resetAt - Date.now();
|
||||
}
|
||||
|
||||
if (waitMs <= 0) {
|
||||
if (localGate) tokenGates.delete(key);
|
||||
return;
|
||||
}
|
||||
|
||||
logger.info("waiting for rate limit gate", {
|
||||
code: "rate_limit_gate_wait",
|
||||
waitMs,
|
||||
resetAt: new Date(resetAt).toISOString(),
|
||||
});
|
||||
await new Promise((resolve) => setTimeout(resolve, waitMs));
|
||||
if (localGate) tokenGates.delete(key);
|
||||
}
|
||||
|
||||
const REDIS_GATE_PREFIX = "gh_rate_gate:";
|
||||
|
||||
let redisGateDisabled = false;
|
||||
let redisGateReady: Promise<RedisClientType | null> | null = null;
|
||||
|
||||
function ensureRedisGateClient(): Promise<RedisClientType | null> {
|
||||
if (redisGateDisabled) return Promise.resolve(null);
|
||||
if (redisGateReady) return redisGateReady;
|
||||
redisGateReady = (async () => {
|
||||
try {
|
||||
const c = createClient({
|
||||
socket: {
|
||||
host: config.REDIS_HOSTNAME,
|
||||
port: config.REDIS_PORT,
|
||||
reconnectStrategy: () => false as any,
|
||||
},
|
||||
}) as RedisClientType;
|
||||
c.on("error", () => {
|
||||
redisGateDisabled = true;
|
||||
c.disconnect().catch(() => {});
|
||||
redisGateReady = null;
|
||||
});
|
||||
await c.connect();
|
||||
return c;
|
||||
} catch {
|
||||
redisGateDisabled = true;
|
||||
redisGateReady = null;
|
||||
return null;
|
||||
}
|
||||
})();
|
||||
return redisGateReady;
|
||||
}
|
||||
|
||||
async function setRedisGate(retryAfterSec: number): Promise<void> {
|
||||
const c = await ensureRedisGateClient();
|
||||
if (!c || !c.isOpen) return;
|
||||
const resetAt = Date.now() + retryAfterSec * 1000;
|
||||
const ttl = Math.ceil(retryAfterSec) + 10;
|
||||
try {
|
||||
await c.set(REDIS_GATE_PREFIX + "global", String(resetAt), { EX: ttl });
|
||||
logger.info("redis rate limit gate written", {
|
||||
code: "redis_gate_set",
|
||||
resetAt: new Date(resetAt).toISOString(),
|
||||
ttl,
|
||||
});
|
||||
} catch {
|
||||
// non-critical
|
||||
}
|
||||
}
|
||||
|
||||
export async function setRedisGateFromWorker(resetAt: number): Promise<void> {
|
||||
const retryAfterSec = Math.max(0, (resetAt - Date.now()) / 1000);
|
||||
if (retryAfterSec <= 0) return;
|
||||
await setRedisGate(retryAfterSec);
|
||||
}
|
||||
|
||||
export async function getRedisGateResetAt(): Promise<number> {
|
||||
const c = await ensureRedisGateClient();
|
||||
if (!c || !c.isOpen) return 0;
|
||||
try {
|
||||
const val = await c.get(REDIS_GATE_PREFIX + "global");
|
||||
if (!val) return 0;
|
||||
const resetAt = parseInt(val, 10);
|
||||
if (isNaN(resetAt) || resetAt <= Date.now()) return 0;
|
||||
return resetAt;
|
||||
} catch {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
export function octokit(token: string) {
|
||||
const oct = new ThrottledOctokit({
|
||||
auth: token,
|
||||
@@ -69,8 +219,7 @@ export function octokit(token: string) {
|
||||
retryAfter,
|
||||
retryCount,
|
||||
});
|
||||
// Retry once; if GitHub is still throttling after that, surface the
|
||||
// error to the caller so the UI shows github_rate_limit_exceeded.
|
||||
setTokenGate(token, retryAfter);
|
||||
return retryCount < 1;
|
||||
},
|
||||
onSecondaryRateLimit: (retryAfter, options, _o, retryCount) => {
|
||||
@@ -82,6 +231,7 @@ export function octokit(token: string) {
|
||||
retryAfter,
|
||||
retryCount,
|
||||
});
|
||||
setTokenGate(token, retryAfter);
|
||||
return retryCount < 1;
|
||||
},
|
||||
},
|
||||
@@ -99,12 +249,20 @@ export function octokit(token: string) {
|
||||
return oct;
|
||||
}
|
||||
|
||||
export { waitForTokenGate };
|
||||
|
||||
export async function checkToken(token: string) {
|
||||
const oct = octokit(token);
|
||||
try {
|
||||
await oct.users.getAuthenticated();
|
||||
return true;
|
||||
} catch {
|
||||
} catch (err) {
|
||||
if (
|
||||
err instanceof AnonymousError &&
|
||||
err.message === "github_rate_limit_exceeded"
|
||||
) {
|
||||
throw err;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
+17
-1
@@ -19,7 +19,7 @@ import {
|
||||
getRepositoryFromGitHub,
|
||||
GitHubRepository,
|
||||
} from "./source/GitHubRepository";
|
||||
import { getToken } from "./GitHubUtils";
|
||||
import { getToken, getRedisGateResetAt } from "./GitHubUtils";
|
||||
import config from "../config";
|
||||
import FileModel from "./model/files/files.model";
|
||||
import AnonymizedRepositoryModel from "./model/anonymizedRepositories/anonymizedRepositories.model";
|
||||
@@ -234,14 +234,30 @@ export default class Repository {
|
||||
httpStatus: 410,
|
||||
});
|
||||
}
|
||||
const redisGateReset = await getRedisGateResetAt();
|
||||
if (redisGateReset > 0) {
|
||||
throw new AnonymousError("rate_limited", {
|
||||
httpStatus: 425,
|
||||
object: { resetAt: redisGateReset },
|
||||
});
|
||||
}
|
||||
|
||||
const fiveMinuteAgo = new Date();
|
||||
fiveMinuteAgo.setMinutes(fiveMinuteAgo.getMinutes() - 5);
|
||||
|
||||
if (
|
||||
this.status == RepositoryStatus.PREPARING ||
|
||||
this.status == RepositoryStatus.QUEUE ||
|
||||
(this.status == RepositoryStatus.DOWNLOAD &&
|
||||
this._model.statusDate > fiveMinuteAgo)
|
||||
) {
|
||||
const rlMatch = (this._model.statusMessage || "").match(/^rate_limited:(\d+)$/);
|
||||
if (rlMatch) {
|
||||
throw new AnonymousError("rate_limited", {
|
||||
httpStatus: 425,
|
||||
object: { resetAt: parseInt(rlMatch[1], 10) },
|
||||
});
|
||||
}
|
||||
throw new AnonymousError("repository_not_ready", {
|
||||
object: this,
|
||||
httpStatus: 425,
|
||||
|
||||
@@ -11,7 +11,7 @@ import { basename, dirname } from "path";
|
||||
import * as stream from "stream";
|
||||
import AnonymousError from "../AnonymousError";
|
||||
import { FILE_TYPE } from "../storage/Storage";
|
||||
import { octokit } from "../GitHubUtils";
|
||||
import { octokit, waitForTokenGate } from "../GitHubUtils";
|
||||
import FileModel from "../model/files/files.model";
|
||||
import { IFile } from "../model/files/files.types";
|
||||
import { createLogger, serializeError } from "../logger";
|
||||
@@ -20,6 +20,27 @@ import config from "../../config";
|
||||
|
||||
const logger = createLogger("gh-stream");
|
||||
|
||||
const GH_API_CONCURRENCY = 6;
|
||||
|
||||
async function pMap<T, R>(
|
||||
items: T[],
|
||||
fn: (item: T, index: number) => Promise<R>,
|
||||
concurrency: number
|
||||
): Promise<R[]> {
|
||||
const results: R[] = new Array(items.length);
|
||||
let next = 0;
|
||||
async function worker() {
|
||||
while (next < items.length) {
|
||||
const i = next++;
|
||||
results[i] = await fn(items[i], i);
|
||||
}
|
||||
}
|
||||
await Promise.all(
|
||||
Array.from({ length: Math.min(concurrency, items.length) }, () => worker())
|
||||
);
|
||||
return results;
|
||||
}
|
||||
|
||||
export function githubRawFileUrl(
|
||||
owner: string,
|
||||
repo: string,
|
||||
@@ -354,11 +375,13 @@ export default class GitHubStream extends GitHubBase {
|
||||
}
|
||||
|
||||
private async getGHTree(
|
||||
oct: ReturnType<typeof octokit>,
|
||||
token: string,
|
||||
sha: string,
|
||||
count = { request: 0, file: 0 },
|
||||
opt = { recursive: true, callback: () => {} }
|
||||
) {
|
||||
const oct = octokit(await this.data.getToken());
|
||||
await waitForTokenGate(token);
|
||||
const ghRes = await oct.git.getTree({
|
||||
owner: this.data.organization,
|
||||
repo: this.data.repoName,
|
||||
@@ -378,6 +401,8 @@ export default class GitHubStream extends GitHubBase {
|
||||
progress?: (status: string) => void,
|
||||
parentPath: string = ""
|
||||
) {
|
||||
const token = await this.data.getToken();
|
||||
const oct = octokit(token);
|
||||
const count = {
|
||||
request: 0,
|
||||
file: 0,
|
||||
@@ -385,7 +410,7 @@ export default class GitHubStream extends GitHubBase {
|
||||
const output: IFile[] = [];
|
||||
let data;
|
||||
try {
|
||||
data = await this.getGHTree(sha, count, {
|
||||
data = await this.getGHTree(oct, token, sha, count, {
|
||||
recursive: false,
|
||||
callback: () => {
|
||||
if (progress) {
|
||||
@@ -423,29 +448,33 @@ export default class GitHubStream extends GitHubBase {
|
||||
cause: error as Error,
|
||||
});
|
||||
}
|
||||
const promises: ReturnType<GitHubStream["getGHTree"]>[] = [];
|
||||
const parentPaths: string[] = [];
|
||||
const subtrees: { sha: string; parentPath: string }[] = [];
|
||||
for (const file of data.tree) {
|
||||
if (file.type == "tree" && file.path && file.sha) {
|
||||
const elementPath = path.join(parentPath, file.path);
|
||||
parentPaths.push(elementPath);
|
||||
promises.push(
|
||||
this.getGHTree(file.sha, count, {
|
||||
recursive: true,
|
||||
callback: () => {
|
||||
if (progress) {
|
||||
progress("List file: " + count.file);
|
||||
}
|
||||
},
|
||||
})
|
||||
);
|
||||
subtrees.push({
|
||||
sha: file.sha,
|
||||
parentPath: path.join(parentPath, file.path),
|
||||
});
|
||||
}
|
||||
}
|
||||
(await Promise.all(promises)).forEach((data, i) => {
|
||||
const results = await pMap(
|
||||
subtrees,
|
||||
async (entry) =>
|
||||
this.getGHTree(oct, token, entry.sha, count, {
|
||||
recursive: true,
|
||||
callback: () => {
|
||||
if (progress) {
|
||||
progress("List file: " + count.file);
|
||||
}
|
||||
},
|
||||
}),
|
||||
GH_API_CONCURRENCY
|
||||
);
|
||||
results.forEach((data, i) => {
|
||||
if (data.truncated) {
|
||||
this._truncatedFolders.push(parentPaths[i]);
|
||||
this._truncatedFolders.push(subtrees[i].parentPath);
|
||||
}
|
||||
output.push(...this.tree2Tree(data.tree, parentPaths[i]));
|
||||
output.push(...this.tree2Tree(data.tree, subtrees[i].parentPath));
|
||||
});
|
||||
return output;
|
||||
}
|
||||
|
||||
+11
-3
@@ -4,6 +4,7 @@ import AnonymizedRepositoryModel from "../core/model/anonymizedRepositories/anon
|
||||
import { RepositoryStatus } from "../core/types";
|
||||
import * as path from "path";
|
||||
import { createLogger, serializeError } from "../core/logger";
|
||||
import { recordMetric } from "./queueMetrics";
|
||||
|
||||
const logger = createLogger("queue");
|
||||
|
||||
@@ -119,12 +120,15 @@ export function startWorker() {
|
||||
concurrency: 5,
|
||||
connection,
|
||||
autorun: true,
|
||||
metrics: { maxDataPoints: 120 },
|
||||
}
|
||||
);
|
||||
cacheWorker.on("completed", async (job) => {
|
||||
recordMetric("cache", "completed", (job.finishedOn || Date.now()) - (job.processedOn || job.timestamp));
|
||||
await job.remove();
|
||||
});
|
||||
cacheWorker.on("failed", async (job) => {
|
||||
if (job) recordMetric("cache", "failed", Date.now() - (job.processedOn || job.timestamp));
|
||||
});
|
||||
const removeWorker = new Worker<RepoJobData>(
|
||||
removeQueue.name,
|
||||
path.resolve("build/queue/processes/removeRepository.js"),
|
||||
@@ -132,12 +136,15 @@ export function startWorker() {
|
||||
concurrency: 5,
|
||||
connection,
|
||||
autorun: true,
|
||||
metrics: { maxDataPoints: 120 },
|
||||
}
|
||||
);
|
||||
removeWorker.on("completed", async (job) => {
|
||||
recordMetric("remove", "completed", (job.finishedOn || Date.now()) - (job.processedOn || job.timestamp));
|
||||
await job.remove();
|
||||
});
|
||||
removeWorker.on("failed", async (job) => {
|
||||
if (job) recordMetric("remove", "failed", Date.now() - (job.processedOn || job.timestamp));
|
||||
});
|
||||
|
||||
const downloadWorker = new Worker<RepoJobData>(
|
||||
downloadQueue.name,
|
||||
@@ -146,7 +153,6 @@ export function startWorker() {
|
||||
concurrency: 3,
|
||||
connection,
|
||||
autorun: true,
|
||||
metrics: { maxDataPoints: 120 },
|
||||
}
|
||||
);
|
||||
if (!downloadWorker.isRunning()) downloadWorker.run();
|
||||
@@ -156,8 +162,10 @@ export function startWorker() {
|
||||
});
|
||||
downloadWorker.on("completed", async (job) => {
|
||||
logger.info("download completed", { repoId: job.data.repoId });
|
||||
recordMetric("download", "completed", (job.finishedOn || Date.now()) - (job.processedOn || job.timestamp));
|
||||
});
|
||||
downloadWorker.on("failed", async (job, err) => {
|
||||
if (job) recordMetric("download", "failed", Date.now() - (job.processedOn || job.timestamp));
|
||||
const repoId = job?.data?.repoId;
|
||||
logger.error("download failed", {
|
||||
...serializeError(err),
|
||||
|
||||
@@ -5,6 +5,8 @@ import { getRepository as getRepositoryImport } from "../../server/database";
|
||||
import { RepositoryStatus } from "../../core/types";
|
||||
import { RepoJobData } from "../index";
|
||||
import { createLogger, serializeError } from "../../core/logger";
|
||||
import { RateLimitDelayError, getRedisGateResetAt, setRedisGateFromWorker } from "../../core/GitHubUtils";
|
||||
import { DelayedError } from "bullmq";
|
||||
|
||||
const logger = createLogger("queue:download");
|
||||
|
||||
@@ -20,6 +22,24 @@ export default async function (job: SandboxedJob<RepoJobData, void>) {
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
let statusInterval: any = null;
|
||||
await connect();
|
||||
|
||||
const gateResetAt = await getRedisGateResetAt();
|
||||
if (gateResetAt > 0) {
|
||||
const delaySec = Math.ceil((gateResetAt - Date.now()) / 1000);
|
||||
logger.info("rate limit gate active, delaying job before work", {
|
||||
repoId: job.data.repoId,
|
||||
delaySec,
|
||||
resetAt: new Date(gateResetAt).toISOString(),
|
||||
});
|
||||
const repo = await getRepository(job.data.repoId);
|
||||
await repo.updateStatus(
|
||||
RepositoryStatus.QUEUE,
|
||||
`rate_limited:${gateResetAt}`
|
||||
);
|
||||
await job.moveToDelayed(gateResetAt);
|
||||
throw new DelayedError();
|
||||
}
|
||||
|
||||
const repo = await getRepository(job.data.repoId);
|
||||
let tickPromise: Promise<void> | null = null;
|
||||
try {
|
||||
@@ -68,6 +88,30 @@ export default async function (job: SandboxedJob<RepoJobData, void>) {
|
||||
} catch (error) {
|
||||
clearInterval(statusInterval);
|
||||
if (tickPromise) await tickPromise;
|
||||
|
||||
// Rate-limited: delay the job and free the worker slot
|
||||
const isRateDelay = error instanceof RateLimitDelayError;
|
||||
const isRateError = !isRateDelay && error instanceof Error &&
|
||||
(error.message === "github_rate_limit_exceeded" || error.message.includes("rate limit"));
|
||||
if (isRateDelay || isRateError) {
|
||||
const resetAt = isRateDelay
|
||||
? (error as RateLimitDelayError).resetAt
|
||||
: Date.now() + 60_000; // fallback: retry in 1 min
|
||||
const delaySec = Math.ceil(Math.max(0, resetAt - Date.now()) / 1000);
|
||||
logger.info("rate-limited, delaying job", {
|
||||
repoId: job.data.repoId,
|
||||
delaySec,
|
||||
resetAt: new Date(resetAt).toISOString(),
|
||||
});
|
||||
await setRedisGateFromWorker(resetAt);
|
||||
await repo.updateStatus(
|
||||
RepositoryStatus.QUEUE,
|
||||
`rate_limited:${resetAt}`
|
||||
);
|
||||
await job.moveToDelayed(resetAt);
|
||||
throw new DelayedError();
|
||||
}
|
||||
|
||||
updateProgress({ status: "error" });
|
||||
if (error instanceof Error) {
|
||||
await repo.updateStatus(RepositoryStatus.ERROR, error.message);
|
||||
@@ -83,6 +127,9 @@ export default async function (job: SandboxedJob<RepoJobData, void>) {
|
||||
await tickPromise;
|
||||
} catch { /* ignored */ }
|
||||
}
|
||||
if (error instanceof DelayedError || (error instanceof Error && error.name === "DelayedError")) {
|
||||
throw error;
|
||||
}
|
||||
logger.error("finished with error", {
|
||||
...serializeError(error),
|
||||
repoId: job.data.repoId,
|
||||
|
||||
@@ -0,0 +1,110 @@
|
||||
import { createClient, RedisClientType } from "redis";
|
||||
import config from "../config";
|
||||
|
||||
const KEY_PREFIX = "qmetrics";
|
||||
const TTL_SECONDS = 7 * 24 * 3600 + 3600;
|
||||
|
||||
let client: RedisClientType | null = null;
|
||||
let disabled = false;
|
||||
|
||||
function getClient(): RedisClientType | null {
|
||||
if (disabled) return null;
|
||||
if (client) return client;
|
||||
try {
|
||||
client = createClient({
|
||||
socket: {
|
||||
host: config.REDIS_HOSTNAME,
|
||||
port: config.REDIS_PORT,
|
||||
reconnectStrategy: () => false as any,
|
||||
},
|
||||
}) as RedisClientType;
|
||||
client.on("error", () => {
|
||||
disabled = true;
|
||||
client?.disconnect().catch(() => {});
|
||||
client = null;
|
||||
});
|
||||
client.connect().catch(() => {
|
||||
disabled = true;
|
||||
client = null;
|
||||
});
|
||||
return client;
|
||||
} catch {
|
||||
disabled = true;
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
function minuteTs(now?: number): number {
|
||||
return Math.floor((now || Date.now()) / 60000) * 60000;
|
||||
}
|
||||
|
||||
function key(queue: string, ts: number): string {
|
||||
return `${KEY_PREFIX}:${queue}:${ts}`;
|
||||
}
|
||||
|
||||
export async function recordMetric(
|
||||
queue: string,
|
||||
type: "completed" | "failed",
|
||||
durationMs: number
|
||||
): Promise<void> {
|
||||
const c = getClient();
|
||||
if (!c || !c.isOpen) return;
|
||||
const k = key(queue, minuteTs());
|
||||
const field = type === "completed" ? "c" : "f";
|
||||
try {
|
||||
const pipe = c.multi();
|
||||
pipe.hIncrBy(k, field, 1);
|
||||
pipe.hIncrBy(k, "ms", Math.round(Math.max(0, durationMs)));
|
||||
pipe.expire(k, TTL_SECONDS);
|
||||
await pipe.exec();
|
||||
} catch {
|
||||
// non-critical, don't crash workers
|
||||
}
|
||||
}
|
||||
|
||||
export interface MetricPoint {
|
||||
ts: number;
|
||||
completed: number;
|
||||
failed: number;
|
||||
avgMs: number;
|
||||
}
|
||||
|
||||
export async function queryMetrics(
|
||||
queue: string,
|
||||
rangeMinutes: number
|
||||
): Promise<MetricPoint[]> {
|
||||
const c = getClient();
|
||||
if (!c || !c.isOpen) return [];
|
||||
|
||||
const now = minuteTs();
|
||||
const start = now - (rangeMinutes - 1) * 60000;
|
||||
|
||||
const keys: string[] = [];
|
||||
const timestamps: number[] = [];
|
||||
for (let t = start; t <= now; t += 60000) {
|
||||
keys.push(key(queue, t));
|
||||
timestamps.push(t);
|
||||
}
|
||||
|
||||
try {
|
||||
const pipe = c.multi();
|
||||
for (const k of keys) pipe.hGetAll(k);
|
||||
const results = await pipe.exec();
|
||||
|
||||
return timestamps.map((ts, i) => {
|
||||
const h = (results[i] as unknown as Record<string, string>) || {};
|
||||
const completed = parseInt(h.c || "0", 10) || 0;
|
||||
const failed = parseInt(h.f || "0", 10) || 0;
|
||||
const totalMs = parseInt(h.ms || "0", 10) || 0;
|
||||
const total = completed + failed;
|
||||
return {
|
||||
ts,
|
||||
completed,
|
||||
failed,
|
||||
avgMs: total > 0 ? Math.round(totalMs / total) : 0,
|
||||
};
|
||||
});
|
||||
} catch {
|
||||
return [];
|
||||
}
|
||||
}
|
||||
+84
-31
@@ -7,6 +7,7 @@ import AnonymizedRepositoryModel from "../../core/model/anonymizedRepositories/a
|
||||
import ConferenceModel from "../../core/model/conference/conferences.model";
|
||||
import UserModel from "../../core/model/users/users.model";
|
||||
import { cacheQueue, downloadQueue, removeQueue } from "../../queue";
|
||||
import { queryMetrics } from "../../queue/queueMetrics";
|
||||
import User from "../../core/User";
|
||||
import { ensureAuthenticated } from "./connection";
|
||||
import { handleError, getUser, isOwnerOrAdmin, getRepo } from "./route-utils";
|
||||
@@ -131,25 +132,25 @@ function sendCsv(
|
||||
router.post("/queue/:name/:repo_id", async (req, res) => {
|
||||
const queue = pickQueue(req.params.name);
|
||||
if (!queue) return res.status(404).json({ error: "queue_not_found" });
|
||||
let job;
|
||||
try {
|
||||
job = await queue.getJob(req.params.repo_id);
|
||||
const job = await queue.getJob(req.params.repo_id);
|
||||
if (!job) {
|
||||
return res.status(404).json({ error: "job_not_found" });
|
||||
}
|
||||
|
||||
await job.retry();
|
||||
res.send("ok");
|
||||
} catch {
|
||||
try {
|
||||
if (job) {
|
||||
await job.remove();
|
||||
queue.add(job.name, job.data, job.opts);
|
||||
}
|
||||
res.send("ok");
|
||||
} catch {
|
||||
res.status(500).json({ error: "error_retrying_job" });
|
||||
const state = await job.getState();
|
||||
if (state === "active") {
|
||||
return res.status(409).json({ error: "job_is_active", message: "Cannot retry an active job — wait for it to finish or remove it first." });
|
||||
}
|
||||
try {
|
||||
await job.retry();
|
||||
} catch {
|
||||
const { name, data, opts } = job;
|
||||
await job.remove().catch(() => {});
|
||||
await queue.add(name, data, opts);
|
||||
}
|
||||
res.json({ ok: true });
|
||||
} catch (error) {
|
||||
handleError(error, res, req);
|
||||
}
|
||||
});
|
||||
|
||||
@@ -157,12 +158,24 @@ router.delete("/queue/:name/:repo_id", async (req, res) => {
|
||||
const queue = pickQueue(req.params.name);
|
||||
if (!queue) return res.status(404).json({ error: "queue_not_found" });
|
||||
try {
|
||||
const job = await queue.getJob(req.params.repo_id);
|
||||
const jobId = req.params.repo_id;
|
||||
const job = await queue.getJob(jobId);
|
||||
if (!job) {
|
||||
return res.status(404).json({ error: "job_not_found" });
|
||||
}
|
||||
|
||||
const state = await job.getState();
|
||||
|
||||
if (state === "active") {
|
||||
// Active jobs hold a worker lock — delete it so remove() succeeds
|
||||
const client = await (queue as any).client;
|
||||
const lockKey = queue.toKey(jobId) + ":lock";
|
||||
await client.del(lockKey);
|
||||
logger.info("cleared lock for active job", { queue: queue.name, jobId });
|
||||
}
|
||||
|
||||
await job.remove();
|
||||
res.send("ok");
|
||||
res.json({ ok: true });
|
||||
} catch (error) {
|
||||
handleError(error, res, req);
|
||||
}
|
||||
@@ -243,37 +256,58 @@ router.post("/queues/pause-all", async (_req, res) => {
|
||||
}
|
||||
});
|
||||
|
||||
async function queueStats(queue: Queue) {
|
||||
const [counts, workers, paused, completedMetrics, failedMetrics] =
|
||||
async function queueStats(queueKey: string, queue: Queue) {
|
||||
const [counts, workers, paused, metrics24h] =
|
||||
await Promise.all([
|
||||
queue.getJobCounts(...QUEUE_STATES),
|
||||
queue.getWorkers().catch(() => []),
|
||||
queue.isPaused().catch(() => false),
|
||||
queue.getMetrics("completed", 0, 119).catch(() => ({ data: [], count: 0 })),
|
||||
queue.getMetrics("failed", 0, 119).catch(() => ({ data: [], count: 0 })),
|
||||
queryMetrics(queueKey, 1440),
|
||||
]);
|
||||
|
||||
const workerCount = workers.length;
|
||||
const concurrency = workerCount > 0 ? (workers as any)[0]?.opts?.concurrency ?? null : null;
|
||||
|
||||
let completed24h = 0;
|
||||
let failed24h = 0;
|
||||
for (const p of metrics24h) {
|
||||
completed24h += p.completed;
|
||||
failed24h += p.failed;
|
||||
}
|
||||
|
||||
return {
|
||||
counts,
|
||||
paused,
|
||||
workers: workerCount,
|
||||
concurrency,
|
||||
throughput: completedMetrics.data || [],
|
||||
completed24h: completedMetrics.count || 0,
|
||||
failed24h: failedMetrics.count || 0,
|
||||
completed24h,
|
||||
failed24h,
|
||||
};
|
||||
}
|
||||
|
||||
const RANGE_MINUTES: Record<string, number> = {
|
||||
"1h": 60,
|
||||
"6h": 360,
|
||||
"24h": 1440,
|
||||
"7d": 10080,
|
||||
};
|
||||
|
||||
router.get("/queues/metrics", async (req, res) => {
|
||||
const queueName = String(req.query.queue || "download");
|
||||
if (!pickQueue(queueName)) return res.status(404).json({ error: "queue_not_found" });
|
||||
const range = String(req.query.range || "1h");
|
||||
const minutes = RANGE_MINUTES[range] || 60;
|
||||
try {
|
||||
const points = await queryMetrics(queueName, minutes);
|
||||
res.json({ queue: queueName, range, points });
|
||||
} catch (error) {
|
||||
handleError(error, res, req);
|
||||
}
|
||||
});
|
||||
|
||||
router.get("/queues", async (req, res) => {
|
||||
const search = req.query.search ? String(req.query.search).toLowerCase() : "";
|
||||
const queueName = req.query.queue ? String(req.query.queue) : "";
|
||||
const stateFilter: JobType | null = req.query.state ? String(req.query.state) as JobType : null;
|
||||
const states: JobType[] = stateFilter && QUEUE_STATES.includes(stateFilter)
|
||||
? [stateFilter]
|
||||
: QUEUE_STATES;
|
||||
|
||||
const allQueues: { key: string; label: string; queue: Queue }[] = [
|
||||
{ key: "download", label: "Download", queue: downloadQueue },
|
||||
@@ -285,7 +319,7 @@ router.get("/queues", async (req, res) => {
|
||||
allQueues.map(async (q) => ({
|
||||
key: q.key,
|
||||
label: q.label,
|
||||
...(await queueStats(q.queue)),
|
||||
...(await queueStats(q.key, q.queue)),
|
||||
}))
|
||||
);
|
||||
|
||||
@@ -294,8 +328,6 @@ router.get("/queues", async (req, res) => {
|
||||
: allQueues[0];
|
||||
const targetQueue = target ? target.queue : downloadQueue;
|
||||
|
||||
const jobs = await targetQueue.getJobs(states);
|
||||
|
||||
const matches = (job: { id?: string | undefined; name?: string }) => {
|
||||
if (!search) return true;
|
||||
return (
|
||||
@@ -304,10 +336,31 @@ router.get("/queues", async (req, res) => {
|
||||
);
|
||||
};
|
||||
|
||||
// Fetch all states in parallel, tag each job with its state
|
||||
const jobsByState = await Promise.all(
|
||||
QUEUE_STATES.map(async (state) => {
|
||||
const jobs = await targetQueue.getJobs([state]);
|
||||
return jobs.map((j) => {
|
||||
const json: Record<string, unknown> = { ...j.asJSON(), _state: state };
|
||||
if (state === "delayed" && j.delay > 0) {
|
||||
json.delayUntil = j.timestamp + j.delay;
|
||||
}
|
||||
return json;
|
||||
});
|
||||
})
|
||||
);
|
||||
const allJobs = jobsByState.flat().filter(matches);
|
||||
|
||||
// Sort: active first, then waiting, delayed, failed, completed
|
||||
const stateOrder: Record<string, number> = {
|
||||
active: 0, waiting: 1, delayed: 2, failed: 3, completed: 4,
|
||||
};
|
||||
allJobs.sort((a, b) => (stateOrder[a._state as string] ?? 9) - (stateOrder[b._state as string] ?? 9));
|
||||
|
||||
res.json({
|
||||
queues: statsResults,
|
||||
selectedQueue: target?.key || "download",
|
||||
jobs: jobs.filter(matches),
|
||||
jobs: allJobs,
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
@@ -21,7 +21,7 @@ import RepositoryModel from "../../core/model/repositories/repositories.model";
|
||||
import User from "../../core/User";
|
||||
import { RepositoryStatus } from "../../core/types";
|
||||
import { IUserDocument } from "../../core/model/users/users.types";
|
||||
import { checkToken, octokit } from "../../core/GitHubUtils";
|
||||
import { checkToken, octokit, getRedisGateResetAt } from "../../core/GitHubUtils";
|
||||
import { createLogger, serializeError } from "../../core/logger";
|
||||
|
||||
const logger = createLogger("route:repo");
|
||||
@@ -294,6 +294,10 @@ router.get("/:repoId/", async (req: express.Request, res: express.Response) => {
|
||||
: fullRepo.owner.id === user.model.id
|
||||
? "owner"
|
||||
: "coauthor";
|
||||
const gateResetAt = await getRedisGateResetAt();
|
||||
if (gateResetAt > 0) {
|
||||
json.rateLimitResetAt = gateResetAt;
|
||||
}
|
||||
res.json(json);
|
||||
} catch (error) {
|
||||
handleError(error, res, req);
|
||||
|
||||
@@ -339,8 +339,8 @@ router.get(
|
||||
fiveMinuteAgo.setMinutes(fiveMinuteAgo.getMinutes() - 5);
|
||||
if (repo.status != "ready") {
|
||||
if (
|
||||
repo.status != RepositoryStatus.QUEUE &&
|
||||
repo.model.statusDate < fiveMinuteAgo
|
||||
// && repo.status != "preparing"
|
||||
) {
|
||||
await repo.updateStatus(RepositoryStatus.PREPARING);
|
||||
await downloadQueue.add(repo.repoId, { repoId: repo.repoId }, {
|
||||
@@ -359,6 +359,14 @@ router.get(
|
||||
}
|
||||
);
|
||||
}
|
||||
const rlMatch = (repo.model.statusMessage || "").match(/^rate_limited:(\d+)$/);
|
||||
if (rlMatch) {
|
||||
const resetAt = parseInt(rlMatch[1], 10);
|
||||
throw new AnonymousError("rate_limited", {
|
||||
httpStatus: 425,
|
||||
object: { resetAt },
|
||||
});
|
||||
}
|
||||
throw new AnonymousError("repository_not_ready", {
|
||||
httpStatus: 425,
|
||||
object: repo,
|
||||
|
||||
@@ -218,7 +218,17 @@ export function handleError(
|
||||
if (res && !res.headersSent) {
|
||||
const safeCode =
|
||||
error instanceof AnonymousError ? errorCode : "internal_error";
|
||||
res.status(status).json({ error: safeCode });
|
||||
const body: Record<string, unknown> = { error: safeCode };
|
||||
if (
|
||||
error instanceof AnonymousError &&
|
||||
safeCode === "rate_limited" &&
|
||||
error.value &&
|
||||
typeof error.value === "object" &&
|
||||
"resetAt" in (error.value as Record<string, unknown>)
|
||||
) {
|
||||
body.resetAt = (error.value as Record<string, unknown>).resetAt;
|
||||
}
|
||||
res.status(status).json(body);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user