mirror of
https://github.com/zhom/donutbrowser.git
synced 2026-05-12 04:41:32 +02:00
refactor: reduce the number of s3 calls
This commit is contained in:
@@ -117,7 +117,7 @@ export class SyncController {
|
||||
@Get("subscribe")
|
||||
@Sse()
|
||||
subscribe(@Req() req: Request): Observable<MessageEvent> {
|
||||
return this.syncService.subscribe(this.getUserContext(req), 2000).pipe(
|
||||
return this.syncService.subscribe(this.getUserContext(req), 5000).pipe(
|
||||
map((event) => ({
|
||||
data: event,
|
||||
})),
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import { randomUUID } from "node:crypto";
|
||||
import {
|
||||
CreateBucketCommand,
|
||||
DeleteObjectCommand,
|
||||
@@ -41,6 +42,18 @@ import type {
|
||||
SubscribeEventDto,
|
||||
} from "./dto/sync.dto.js";
|
||||
|
||||
/**
|
||||
* Marker object written under each scope (user / team / self-hosted root).
|
||||
* Subscribers HEAD this object on each poll and only LIST when its ETag has
|
||||
* changed, which keeps the steady-state polling cost down to one Class-B
|
||||
* HeadObject per scope per poll instead of N Class-A ListObjectsV2 calls.
|
||||
*
|
||||
* Filename starts with a dot so it sorts first and is unmistakably internal
|
||||
* to donut-sync; client `list()` calls strip it from results so it never
|
||||
* leaks into application data.
|
||||
*/
|
||||
const MANIFEST_KEY = ".donut-sync-manifest";
|
||||
|
||||
@Injectable()
|
||||
export class SyncService implements OnModuleInit {
|
||||
private readonly logger = new Logger(SyncService.name);
|
||||
@@ -149,6 +162,71 @@ export class SyncService implements OnModuleInit {
|
||||
return `${ctx.prefix}${key}`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return every scope prefix the given user can write to. For self-hosted
|
||||
* that's the bucket root (`""`); for cloud that's the user prefix plus an
|
||||
* optional team prefix.
|
||||
*/
|
||||
private scopesFor(ctx: UserContext): string[] {
|
||||
if (ctx.mode === "self-hosted") return [""];
|
||||
const out = [ctx.prefix];
|
||||
if (ctx.teamPrefix) out.push(ctx.teamPrefix);
|
||||
return out;
|
||||
}
|
||||
|
||||
/**
|
||||
* Bump the manifest object for the scope that owns `scopedKey`. Writers call
|
||||
* this fire-and-forget after any successful mutation so subscribers'
|
||||
* cheap HEAD polls observe an ETag change and pull a fresh listing.
|
||||
*
|
||||
* Slightly over-eager by design: we bump on presign-issue (rather than on
|
||||
* the actual S3 PUT), so a never-completed upload causes one wasted refresh
|
||||
* on other devices. That's strictly cheaper than verifying every upload.
|
||||
*/
|
||||
private async bumpManifest(
|
||||
ctx: UserContext,
|
||||
scopedKey: string,
|
||||
): Promise<void> {
|
||||
const scope = this.scopeForKey(ctx, scopedKey);
|
||||
if (scope === null) return;
|
||||
const key = `${scope}${MANIFEST_KEY}`;
|
||||
// Body just needs to be unique so the ETag changes; clients never read it.
|
||||
const body = JSON.stringify({
|
||||
updatedAt: new Date().toISOString(),
|
||||
nonce: randomUUID(),
|
||||
});
|
||||
try {
|
||||
await this.s3Client.send(
|
||||
new PutObjectCommand({
|
||||
Bucket: this.bucket,
|
||||
Key: key,
|
||||
Body: body,
|
||||
ContentType: "application/json",
|
||||
}),
|
||||
);
|
||||
} catch (err) {
|
||||
// Manifest bump failures must NEVER fail the user's request.
|
||||
// Subscribers fall back to detecting changes on their next listing.
|
||||
this.logger.warn(
|
||||
`Manifest bump failed for ${key}: ${err instanceof Error ? err.message : String(err)}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolve which scope owns a fully-scoped key. Returns null if the key
|
||||
* doesn't belong to a known scope (which shouldn't happen in practice
|
||||
* because validateKeyAccess gates the write paths).
|
||||
*/
|
||||
private scopeForKey(ctx: UserContext, scopedKey: string): string | null {
|
||||
if (ctx.mode === "self-hosted") return "";
|
||||
if (ctx.teamPrefix && scopedKey.startsWith(ctx.teamPrefix)) {
|
||||
return ctx.teamPrefix;
|
||||
}
|
||||
if (scopedKey.startsWith(ctx.prefix)) return ctx.prefix;
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Validate that a key is accessible by the user.
|
||||
* For cloud mode, key must start with user's prefix or team prefix.
|
||||
@@ -220,6 +298,11 @@ export class SyncService implements OnModuleInit {
|
||||
this.reportProfileUsageAsync(ctx);
|
||||
}
|
||||
|
||||
// Notify subscribers via the per-scope manifest. Fire-and-forget; a
|
||||
// failure here just means other devices pick up the change on their
|
||||
// next full listing instead of immediately.
|
||||
void this.bumpManifest(ctx, key);
|
||||
|
||||
return {
|
||||
url,
|
||||
expiresAt: expiresAt.toISOString(),
|
||||
@@ -294,6 +377,10 @@ export class SyncService implements OnModuleInit {
|
||||
this.reportProfileUsageAsync(ctx);
|
||||
}
|
||||
|
||||
if (deleted || tombstoneCreated) {
|
||||
void this.bumpManifest(ctx, key);
|
||||
}
|
||||
|
||||
return { deleted, tombstoneCreated };
|
||||
}
|
||||
|
||||
@@ -311,19 +398,22 @@ export class SyncService implements OnModuleInit {
|
||||
|
||||
const userPrefix = ctx?.prefix || "";
|
||||
const teamPrefix = ctx?.teamPrefix || "";
|
||||
const objects = (response.Contents || []).map((obj) => {
|
||||
let key = obj.Key || "";
|
||||
if (teamPrefix && key.startsWith(teamPrefix)) {
|
||||
key = key.substring(teamPrefix.length);
|
||||
} else if (userPrefix && key.startsWith(userPrefix)) {
|
||||
key = key.substring(userPrefix.length);
|
||||
}
|
||||
return {
|
||||
key,
|
||||
lastModified: obj.LastModified?.toISOString() || "",
|
||||
size: obj.Size || 0,
|
||||
};
|
||||
});
|
||||
const objects = (response.Contents || [])
|
||||
// Don't leak donut-sync's internal manifest object to clients.
|
||||
.filter((obj) => !(obj.Key || "").endsWith(MANIFEST_KEY))
|
||||
.map((obj) => {
|
||||
let key = obj.Key || "";
|
||||
if (teamPrefix && key.startsWith(teamPrefix)) {
|
||||
key = key.substring(teamPrefix.length);
|
||||
} else if (userPrefix && key.startsWith(userPrefix)) {
|
||||
key = key.substring(userPrefix.length);
|
||||
}
|
||||
return {
|
||||
key,
|
||||
lastModified: obj.LastModified?.toISOString() || "",
|
||||
size: obj.Size || 0,
|
||||
};
|
||||
});
|
||||
|
||||
return {
|
||||
objects,
|
||||
@@ -373,6 +463,20 @@ export class SyncService implements OnModuleInit {
|
||||
this.reportProfileUsageAsync(ctx);
|
||||
}
|
||||
|
||||
// One bump per scope touched by this batch (usually one).
|
||||
if (items.length > 0) {
|
||||
const scopesSeen = new Set<string>();
|
||||
for (const item of dto.items) {
|
||||
const key = this.scopeKey(ctx, item.key);
|
||||
const scope = this.scopeForKey(ctx, key);
|
||||
if (scope !== null && !scopesSeen.has(scope)) {
|
||||
scopesSeen.add(scope);
|
||||
// Use any key from the scope; bumpManifest only inspects scope.
|
||||
void this.bumpManifest(ctx, key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return { items };
|
||||
}
|
||||
|
||||
@@ -475,66 +579,154 @@ export class SyncService implements OnModuleInit {
|
||||
this.reportProfileUsageAsync(ctx);
|
||||
}
|
||||
|
||||
if (deletedCount > 0 || tombstoneCreated) {
|
||||
void this.bumpManifest(ctx, prefix);
|
||||
}
|
||||
|
||||
return { deletedCount, tombstoneCreated };
|
||||
}
|
||||
|
||||
/**
|
||||
* Long-lived per-client poll loop.
|
||||
*
|
||||
* Steady-state cost is one HEAD per scope per poll (Class B on R2). A LIST
|
||||
* (Class A) is only issued when:
|
||||
* 1. it's the client's first poll (need to seed the state map), or
|
||||
* 2. a write touched the scope and bumped its manifest ETag.
|
||||
*
|
||||
* This is *eventual* cross-device sync, gated by the poll interval.
|
||||
* Real-time push is intentionally not provided here — that lives in the
|
||||
* paid backend.
|
||||
*/
|
||||
subscribe(
|
||||
ctx: UserContext,
|
||||
pollIntervalMs = 2000,
|
||||
pollIntervalMs = 5000,
|
||||
): Observable<SubscribeEventDto> {
|
||||
const basePrefixes = ["profiles/", "proxies/", "groups/", "tombstones/"];
|
||||
const scopes = this.scopesFor(ctx);
|
||||
|
||||
let prefixes: string[];
|
||||
if (ctx.mode === "self-hosted") {
|
||||
prefixes = basePrefixes;
|
||||
} else {
|
||||
prefixes = basePrefixes.map((p) => `${ctx.prefix}${p}`);
|
||||
if (ctx.teamPrefix) {
|
||||
prefixes.push(...basePrefixes.map((p) => `${ctx.teamPrefix}${p}`));
|
||||
}
|
||||
}
|
||||
|
||||
// Per-connection state (not shared across subscribers)
|
||||
// Per-connection state (not shared across subscribers).
|
||||
const lastManifestEtag = new Map<string, string | undefined>();
|
||||
let lastKnownState = new Map<string, string>();
|
||||
let initialized = false;
|
||||
|
||||
const pollChanges$ = interval(pollIntervalMs).pipe(
|
||||
startWith(0),
|
||||
switchMap(async () => {
|
||||
const events: SubscribeEventDto[] = [];
|
||||
const currentState = new Map<string, string>();
|
||||
|
||||
for (const prefix of prefixes) {
|
||||
// Phase 1 — cheap HEAD on each scope's manifest. This is the
|
||||
// steady-state cost (Class B). If no manifest changed since the
|
||||
// last poll, we don't touch S3 again this tick.
|
||||
let anyScopeChanged = false;
|
||||
for (const scope of scopes) {
|
||||
const manifestKey = `${scope}${MANIFEST_KEY}`;
|
||||
let currentEtag: string | undefined;
|
||||
try {
|
||||
const result = await this.list({ prefix, maxKeys: 1000 });
|
||||
for (const obj of result.objects) {
|
||||
const stateKey = `${obj.key}:${obj.lastModified}`;
|
||||
currentState.set(obj.key, stateKey);
|
||||
|
||||
const previousStateKey = lastKnownState.get(obj.key);
|
||||
if (previousStateKey !== stateKey) {
|
||||
events.push({
|
||||
type: "change",
|
||||
key: obj.key,
|
||||
lastModified: obj.lastModified,
|
||||
size: obj.size,
|
||||
});
|
||||
}
|
||||
const head = await this.s3Client.send(
|
||||
new HeadObjectCommand({
|
||||
Bucket: this.bucket,
|
||||
Key: manifestKey,
|
||||
}),
|
||||
);
|
||||
currentEtag = head.ETag;
|
||||
} catch (err: unknown) {
|
||||
const status =
|
||||
err && typeof err === "object" && "$metadata" in err
|
||||
? (err as { $metadata?: { httpStatusCode?: number } }).$metadata
|
||||
?.httpStatusCode
|
||||
: undefined;
|
||||
const name =
|
||||
err && typeof err === "object" && "name" in err
|
||||
? (err as { name?: string }).name
|
||||
: undefined;
|
||||
if (name === "NotFound" || name === "NoSuchKey" || status === 404) {
|
||||
// No manifest yet — treat as "no changes" (undefined ETag).
|
||||
currentEtag = undefined;
|
||||
} else {
|
||||
this.logger.error(
|
||||
`Manifest HEAD failed for ${manifestKey}: ${err instanceof Error ? err.message : String(err)}`,
|
||||
);
|
||||
continue;
|
||||
}
|
||||
} catch (error) {
|
||||
console.error(`Failed to list prefix ${prefix}:`, error);
|
||||
}
|
||||
|
||||
const previousEtag = lastManifestEtag.get(scope);
|
||||
if (previousEtag !== currentEtag) {
|
||||
anyScopeChanged = true;
|
||||
}
|
||||
lastManifestEtag.set(scope, currentEtag);
|
||||
}
|
||||
|
||||
// After the first poll, only run the LIST when something actually
|
||||
// changed in at least one scope.
|
||||
if (initialized && !anyScopeChanged) {
|
||||
return [];
|
||||
}
|
||||
|
||||
// Phase 2 — one LIST per scope (not per base prefix). Filter to the
|
||||
// four base prefixes client-side. This is the cost we pay only when
|
||||
// a manifest told us there's something new to look at.
|
||||
const currentState = new Map<string, string>();
|
||||
for (const scope of scopes) {
|
||||
let continuationToken: string | undefined;
|
||||
do {
|
||||
try {
|
||||
const result = await this.s3Client.send(
|
||||
new ListObjectsV2Command({
|
||||
Bucket: this.bucket,
|
||||
Prefix: scope,
|
||||
MaxKeys: 1000,
|
||||
ContinuationToken: continuationToken,
|
||||
}),
|
||||
);
|
||||
|
||||
for (const obj of result.Contents || []) {
|
||||
const fullKey = obj.Key;
|
||||
if (!fullKey) continue;
|
||||
const relativeKey = fullKey.startsWith(scope)
|
||||
? fullKey.substring(scope.length)
|
||||
: fullKey;
|
||||
// Skip the manifest object itself + anything outside the
|
||||
// four data prefixes.
|
||||
if (relativeKey === MANIFEST_KEY) continue;
|
||||
if (!basePrefixes.some((bp) => relativeKey.startsWith(bp))) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const lastModified = obj.LastModified?.toISOString() || "";
|
||||
const stateKey = `${relativeKey}:${lastModified}`;
|
||||
currentState.set(relativeKey, stateKey);
|
||||
|
||||
const previousStateKey = lastKnownState.get(relativeKey);
|
||||
if (previousStateKey !== stateKey) {
|
||||
events.push({
|
||||
type: "change",
|
||||
key: relativeKey,
|
||||
lastModified,
|
||||
size: obj.Size || 0,
|
||||
});
|
||||
}
|
||||
}
|
||||
continuationToken = result.NextContinuationToken;
|
||||
} catch (err) {
|
||||
this.logger.error(
|
||||
`List failed for scope '${scope}': ${err instanceof Error ? err.message : String(err)}`,
|
||||
);
|
||||
continuationToken = undefined;
|
||||
}
|
||||
} while (continuationToken);
|
||||
}
|
||||
|
||||
// Detect deletes by comparing key sets.
|
||||
for (const [key] of lastKnownState) {
|
||||
if (!currentState.has(key)) {
|
||||
events.push({
|
||||
type: "delete",
|
||||
key,
|
||||
});
|
||||
events.push({ type: "delete", key });
|
||||
}
|
||||
}
|
||||
|
||||
lastKnownState = currentState;
|
||||
initialized = true;
|
||||
return events;
|
||||
}),
|
||||
switchMap((events) => of(...events)),
|
||||
|
||||
Reference in New Issue
Block a user