mirror of
https://github.com/tdurieux/anonymous_github.git
synced 2026-05-15 06:30:26 +02:00
fix rate limit
This commit is contained in:
+13
-9
@@ -70,20 +70,23 @@ function setTokenGate(token: string, retryAfterSec: number) {
|
||||
tokenGates.set(key, { resetAt });
|
||||
logger.warn("rate limit gate set", {
|
||||
code: "rate_limit_gate",
|
||||
tokenKey: key,
|
||||
retryAfterSec,
|
||||
resetAt: new Date(resetAt).toISOString(),
|
||||
});
|
||||
setRedisGate(retryAfterSec).catch(() => {});
|
||||
setRedisGate(key, retryAfterSec).catch(() => {});
|
||||
}
|
||||
}
|
||||
|
||||
export class RateLimitDelayError extends Error {
|
||||
resetAt: number;
|
||||
constructor(resetAt: number) {
|
||||
tokenKey: string;
|
||||
constructor(resetAt: number, tokenKey: string) {
|
||||
const delaySec = Math.ceil((resetAt - Date.now()) / 1000);
|
||||
super(`github_rate_limit_delay:${delaySec}s`);
|
||||
this.name = "RateLimitDelayError";
|
||||
this.resetAt = resetAt;
|
||||
this.tokenKey = tokenKey;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -113,7 +116,7 @@ async function waitForTokenGate(token: string): Promise<void> {
|
||||
waitMs = resetAt - Date.now();
|
||||
}
|
||||
|
||||
const redisResetAt = await getRedisGateResetAt();
|
||||
const redisResetAt = await getRedisGateResetAt(key);
|
||||
if (redisResetAt > resetAt) {
|
||||
resetAt = redisResetAt;
|
||||
waitMs = resetAt - Date.now();
|
||||
@@ -166,15 +169,16 @@ function ensureRedisGateClient(): Promise<RedisClientType | null> {
|
||||
return redisGateReady;
|
||||
}
|
||||
|
||||
async function setRedisGate(retryAfterSec: number): Promise<void> {
|
||||
async function setRedisGate(tokenKey: string, retryAfterSec: number): Promise<void> {
|
||||
const c = await ensureRedisGateClient();
|
||||
if (!c || !c.isOpen) return;
|
||||
const resetAt = Date.now() + retryAfterSec * 1000;
|
||||
const ttl = Math.ceil(retryAfterSec) + 10;
|
||||
try {
|
||||
await c.set(REDIS_GATE_PREFIX + "global", String(resetAt), { EX: ttl });
|
||||
await c.set(REDIS_GATE_PREFIX + tokenKey, String(resetAt), { EX: ttl });
|
||||
logger.info("redis rate limit gate written", {
|
||||
code: "redis_gate_set",
|
||||
tokenKey,
|
||||
resetAt: new Date(resetAt).toISOString(),
|
||||
ttl,
|
||||
});
|
||||
@@ -183,17 +187,17 @@ async function setRedisGate(retryAfterSec: number): Promise<void> {
|
||||
}
|
||||
}
|
||||
|
||||
export async function setRedisGateFromWorker(resetAt: number): Promise<void> {
|
||||
export async function setRedisGateFromWorker(tokenKey: string, resetAt: number): Promise<void> {
|
||||
const retryAfterSec = Math.max(0, (resetAt - Date.now()) / 1000);
|
||||
if (retryAfterSec <= 0) return;
|
||||
await setRedisGate(retryAfterSec);
|
||||
await setRedisGate(tokenKey, retryAfterSec);
|
||||
}
|
||||
|
||||
export async function getRedisGateResetAt(): Promise<number> {
|
||||
export async function getRedisGateResetAt(tokenKey: string): Promise<number> {
|
||||
const c = await ensureRedisGateClient();
|
||||
if (!c || !c.isOpen) return 0;
|
||||
try {
|
||||
const val = await c.get(REDIS_GATE_PREFIX + "global");
|
||||
const val = await c.get(REDIS_GATE_PREFIX + tokenKey);
|
||||
if (!val) return 0;
|
||||
const resetAt = parseInt(val, 10);
|
||||
if (isNaN(resetAt) || resetAt <= Date.now()) return 0;
|
||||
|
||||
@@ -19,7 +19,7 @@ import {
|
||||
getRepositoryFromGitHub,
|
||||
GitHubRepository,
|
||||
} from "./source/GitHubRepository";
|
||||
import { getToken, getRedisGateResetAt } from "./GitHubUtils";
|
||||
import { getToken } from "./GitHubUtils";
|
||||
import config from "../config";
|
||||
import FileModel from "./model/files/files.model";
|
||||
import AnonymizedRepositoryModel from "./model/anonymizedRepositories/anonymizedRepositories.model";
|
||||
@@ -234,14 +234,6 @@ export default class Repository {
|
||||
httpStatus: 410,
|
||||
});
|
||||
}
|
||||
const redisGateReset = await getRedisGateResetAt();
|
||||
if (redisGateReset > 0) {
|
||||
throw new AnonymousError("rate_limited", {
|
||||
httpStatus: 425,
|
||||
object: { resetAt: redisGateReset },
|
||||
});
|
||||
}
|
||||
|
||||
const fiveMinuteAgo = new Date();
|
||||
fiveMinuteAgo.setMinutes(fiveMinuteAgo.getMinutes() - 5);
|
||||
|
||||
|
||||
@@ -5,7 +5,7 @@ import { getRepository as getRepositoryImport } from "../../server/database";
|
||||
import { RepositoryStatus } from "../../core/types";
|
||||
import { RepoJobData } from "../index";
|
||||
import { createLogger, serializeError } from "../../core/logger";
|
||||
import { RateLimitDelayError, getRedisGateResetAt, setRedisGateFromWorker } from "../../core/GitHubUtils";
|
||||
import { RateLimitDelayError, getRedisGateResetAt, setRedisGateFromWorker, getToken } from "../../core/GitHubUtils";
|
||||
import { DelayedError } from "bullmq";
|
||||
|
||||
const logger = createLogger("queue:download");
|
||||
@@ -23,7 +23,11 @@ export default async function (job: SandboxedJob<RepoJobData, void>) {
|
||||
let statusInterval: any = null;
|
||||
await connect();
|
||||
|
||||
const gateResetAt = await getRedisGateResetAt();
|
||||
const repo = await getRepository(job.data.repoId);
|
||||
const token = await getToken(repo);
|
||||
const tokenKey = token.slice(-8);
|
||||
|
||||
const gateResetAt = await getRedisGateResetAt(tokenKey);
|
||||
if (gateResetAt > 0) {
|
||||
const delaySec = Math.ceil((gateResetAt - Date.now()) / 1000);
|
||||
logger.info("rate limit gate active, delaying job before work", {
|
||||
@@ -31,7 +35,6 @@ export default async function (job: SandboxedJob<RepoJobData, void>) {
|
||||
delaySec,
|
||||
resetAt: new Date(gateResetAt).toISOString(),
|
||||
});
|
||||
const repo = await getRepository(job.data.repoId);
|
||||
await repo.updateStatus(
|
||||
RepositoryStatus.QUEUE,
|
||||
`rate_limited:${gateResetAt}`
|
||||
@@ -39,8 +42,6 @@ export default async function (job: SandboxedJob<RepoJobData, void>) {
|
||||
await job.moveToDelayed(gateResetAt);
|
||||
throw new DelayedError();
|
||||
}
|
||||
|
||||
const repo = await getRepository(job.data.repoId);
|
||||
let tickPromise: Promise<void> | null = null;
|
||||
try {
|
||||
let progress: { status: string } | null = null;
|
||||
@@ -103,7 +104,7 @@ export default async function (job: SandboxedJob<RepoJobData, void>) {
|
||||
delaySec,
|
||||
resetAt: new Date(resetAt).toISOString(),
|
||||
});
|
||||
await setRedisGateFromWorker(resetAt);
|
||||
await setRedisGateFromWorker(tokenKey, resetAt);
|
||||
await repo.updateStatus(
|
||||
RepositoryStatus.QUEUE,
|
||||
`rate_limited:${resetAt}`
|
||||
|
||||
@@ -21,7 +21,7 @@ import RepositoryModel from "../../core/model/repositories/repositories.model";
|
||||
import User from "../../core/User";
|
||||
import { RepositoryStatus } from "../../core/types";
|
||||
import { IUserDocument } from "../../core/model/users/users.types";
|
||||
import { checkToken, octokit, getRedisGateResetAt } from "../../core/GitHubUtils";
|
||||
import { checkToken, octokit, getRedisGateResetAt, getToken } from "../../core/GitHubUtils";
|
||||
import { createLogger, serializeError } from "../../core/logger";
|
||||
|
||||
const logger = createLogger("route:repo");
|
||||
@@ -294,7 +294,8 @@ router.get("/:repoId/", async (req: express.Request, res: express.Response) => {
|
||||
: fullRepo.owner.id === user.model.id
|
||||
? "owner"
|
||||
: "coauthor";
|
||||
const gateResetAt = await getRedisGateResetAt();
|
||||
const repoToken = await getToken(fullRepo);
|
||||
const gateResetAt = await getRedisGateResetAt(repoToken.slice(-8));
|
||||
if (gateResetAt > 0) {
|
||||
json.rateLimitResetAt = gateResetAt;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user