diff --git a/donut-sync/src/sync/sync.controller.ts b/donut-sync/src/sync/sync.controller.ts index 1be8ad6..b48f404 100644 --- a/donut-sync/src/sync/sync.controller.ts +++ b/donut-sync/src/sync/sync.controller.ts @@ -117,7 +117,7 @@ export class SyncController { @Get("subscribe") @Sse() subscribe(@Req() req: Request): Observable { - return this.syncService.subscribe(this.getUserContext(req), 2000).pipe( + return this.syncService.subscribe(this.getUserContext(req), 5000).pipe( map((event) => ({ data: event, })), diff --git a/donut-sync/src/sync/sync.service.ts b/donut-sync/src/sync/sync.service.ts index 4d218be..7fd8292 100644 --- a/donut-sync/src/sync/sync.service.ts +++ b/donut-sync/src/sync/sync.service.ts @@ -1,3 +1,4 @@ +import { randomUUID } from "node:crypto"; import { CreateBucketCommand, DeleteObjectCommand, @@ -41,6 +42,18 @@ import type { SubscribeEventDto, } from "./dto/sync.dto.js"; +/** + * Marker object written under each scope (user / team / self-hosted root). + * Subscribers HEAD this object on each poll and only LIST when its ETag has + * changed, which keeps the steady-state polling cost down to one Class-B + * HeadObject per scope per poll instead of N Class-A ListObjectsV2 calls. + * + * Filename starts with a dot so it sorts first and is unmistakably internal + * to donut-sync; client `list()` calls strip it from results so it never + * leaks into application data. + */ +const MANIFEST_KEY = ".donut-sync-manifest"; + @Injectable() export class SyncService implements OnModuleInit { private readonly logger = new Logger(SyncService.name); @@ -149,6 +162,71 @@ export class SyncService implements OnModuleInit { return `${ctx.prefix}${key}`; } + /** + * Return every scope prefix the given user can write to. For self-hosted + * that's the bucket root (`""`); for cloud that's the user prefix plus an + * optional team prefix. + */ + private scopesFor(ctx: UserContext): string[] { + if (ctx.mode === "self-hosted") return [""]; + const out = [ctx.prefix]; + if (ctx.teamPrefix) out.push(ctx.teamPrefix); + return out; + } + + /** + * Bump the manifest object for the scope that owns `scopedKey`. Writers call + * this fire-and-forget after any successful mutation so subscribers' + * cheap HEAD polls observe an ETag change and pull a fresh listing. + * + * Slightly over-eager by design: we bump on presign-issue (rather than on + * the actual S3 PUT), so a never-completed upload causes one wasted refresh + * on other devices. That's strictly cheaper than verifying every upload. + */ + private async bumpManifest( + ctx: UserContext, + scopedKey: string, + ): Promise { + const scope = this.scopeForKey(ctx, scopedKey); + if (scope === null) return; + const key = `${scope}${MANIFEST_KEY}`; + // Body just needs to be unique so the ETag changes; clients never read it. + const body = JSON.stringify({ + updatedAt: new Date().toISOString(), + nonce: randomUUID(), + }); + try { + await this.s3Client.send( + new PutObjectCommand({ + Bucket: this.bucket, + Key: key, + Body: body, + ContentType: "application/json", + }), + ); + } catch (err) { + // Manifest bump failures must NEVER fail the user's request. + // Subscribers fall back to detecting changes on their next listing. + this.logger.warn( + `Manifest bump failed for ${key}: ${err instanceof Error ? err.message : String(err)}`, + ); + } + } + + /** + * Resolve which scope owns a fully-scoped key. Returns null if the key + * doesn't belong to a known scope (which shouldn't happen in practice + * because validateKeyAccess gates the write paths). + */ + private scopeForKey(ctx: UserContext, scopedKey: string): string | null { + if (ctx.mode === "self-hosted") return ""; + if (ctx.teamPrefix && scopedKey.startsWith(ctx.teamPrefix)) { + return ctx.teamPrefix; + } + if (scopedKey.startsWith(ctx.prefix)) return ctx.prefix; + return null; + } + /** * Validate that a key is accessible by the user. * For cloud mode, key must start with user's prefix or team prefix. @@ -220,6 +298,11 @@ export class SyncService implements OnModuleInit { this.reportProfileUsageAsync(ctx); } + // Notify subscribers via the per-scope manifest. Fire-and-forget; a + // failure here just means other devices pick up the change on their + // next full listing instead of immediately. + void this.bumpManifest(ctx, key); + return { url, expiresAt: expiresAt.toISOString(), @@ -294,6 +377,10 @@ export class SyncService implements OnModuleInit { this.reportProfileUsageAsync(ctx); } + if (deleted || tombstoneCreated) { + void this.bumpManifest(ctx, key); + } + return { deleted, tombstoneCreated }; } @@ -311,19 +398,22 @@ export class SyncService implements OnModuleInit { const userPrefix = ctx?.prefix || ""; const teamPrefix = ctx?.teamPrefix || ""; - const objects = (response.Contents || []).map((obj) => { - let key = obj.Key || ""; - if (teamPrefix && key.startsWith(teamPrefix)) { - key = key.substring(teamPrefix.length); - } else if (userPrefix && key.startsWith(userPrefix)) { - key = key.substring(userPrefix.length); - } - return { - key, - lastModified: obj.LastModified?.toISOString() || "", - size: obj.Size || 0, - }; - }); + const objects = (response.Contents || []) + // Don't leak donut-sync's internal manifest object to clients. + .filter((obj) => !(obj.Key || "").endsWith(MANIFEST_KEY)) + .map((obj) => { + let key = obj.Key || ""; + if (teamPrefix && key.startsWith(teamPrefix)) { + key = key.substring(teamPrefix.length); + } else if (userPrefix && key.startsWith(userPrefix)) { + key = key.substring(userPrefix.length); + } + return { + key, + lastModified: obj.LastModified?.toISOString() || "", + size: obj.Size || 0, + }; + }); return { objects, @@ -373,6 +463,20 @@ export class SyncService implements OnModuleInit { this.reportProfileUsageAsync(ctx); } + // One bump per scope touched by this batch (usually one). + if (items.length > 0) { + const scopesSeen = new Set(); + for (const item of dto.items) { + const key = this.scopeKey(ctx, item.key); + const scope = this.scopeForKey(ctx, key); + if (scope !== null && !scopesSeen.has(scope)) { + scopesSeen.add(scope); + // Use any key from the scope; bumpManifest only inspects scope. + void this.bumpManifest(ctx, key); + } + } + } + return { items }; } @@ -475,66 +579,154 @@ export class SyncService implements OnModuleInit { this.reportProfileUsageAsync(ctx); } + if (deletedCount > 0 || tombstoneCreated) { + void this.bumpManifest(ctx, prefix); + } + return { deletedCount, tombstoneCreated }; } + /** + * Long-lived per-client poll loop. + * + * Steady-state cost is one HEAD per scope per poll (Class B on R2). A LIST + * (Class A) is only issued when: + * 1. it's the client's first poll (need to seed the state map), or + * 2. a write touched the scope and bumped its manifest ETag. + * + * This is *eventual* cross-device sync, gated by the poll interval. + * Real-time push is intentionally not provided here — that lives in the + * paid backend. + */ subscribe( ctx: UserContext, - pollIntervalMs = 2000, + pollIntervalMs = 5000, ): Observable { const basePrefixes = ["profiles/", "proxies/", "groups/", "tombstones/"]; + const scopes = this.scopesFor(ctx); - let prefixes: string[]; - if (ctx.mode === "self-hosted") { - prefixes = basePrefixes; - } else { - prefixes = basePrefixes.map((p) => `${ctx.prefix}${p}`); - if (ctx.teamPrefix) { - prefixes.push(...basePrefixes.map((p) => `${ctx.teamPrefix}${p}`)); - } - } - - // Per-connection state (not shared across subscribers) + // Per-connection state (not shared across subscribers). + const lastManifestEtag = new Map(); let lastKnownState = new Map(); + let initialized = false; const pollChanges$ = interval(pollIntervalMs).pipe( startWith(0), switchMap(async () => { const events: SubscribeEventDto[] = []; - const currentState = new Map(); - for (const prefix of prefixes) { + // Phase 1 — cheap HEAD on each scope's manifest. This is the + // steady-state cost (Class B). If no manifest changed since the + // last poll, we don't touch S3 again this tick. + let anyScopeChanged = false; + for (const scope of scopes) { + const manifestKey = `${scope}${MANIFEST_KEY}`; + let currentEtag: string | undefined; try { - const result = await this.list({ prefix, maxKeys: 1000 }); - for (const obj of result.objects) { - const stateKey = `${obj.key}:${obj.lastModified}`; - currentState.set(obj.key, stateKey); - - const previousStateKey = lastKnownState.get(obj.key); - if (previousStateKey !== stateKey) { - events.push({ - type: "change", - key: obj.key, - lastModified: obj.lastModified, - size: obj.size, - }); - } + const head = await this.s3Client.send( + new HeadObjectCommand({ + Bucket: this.bucket, + Key: manifestKey, + }), + ); + currentEtag = head.ETag; + } catch (err: unknown) { + const status = + err && typeof err === "object" && "$metadata" in err + ? (err as { $metadata?: { httpStatusCode?: number } }).$metadata + ?.httpStatusCode + : undefined; + const name = + err && typeof err === "object" && "name" in err + ? (err as { name?: string }).name + : undefined; + if (name === "NotFound" || name === "NoSuchKey" || status === 404) { + // No manifest yet — treat as "no changes" (undefined ETag). + currentEtag = undefined; + } else { + this.logger.error( + `Manifest HEAD failed for ${manifestKey}: ${err instanceof Error ? err.message : String(err)}`, + ); + continue; } - } catch (error) { - console.error(`Failed to list prefix ${prefix}:`, error); } + + const previousEtag = lastManifestEtag.get(scope); + if (previousEtag !== currentEtag) { + anyScopeChanged = true; + } + lastManifestEtag.set(scope, currentEtag); } + // After the first poll, only run the LIST when something actually + // changed in at least one scope. + if (initialized && !anyScopeChanged) { + return []; + } + + // Phase 2 — one LIST per scope (not per base prefix). Filter to the + // four base prefixes client-side. This is the cost we pay only when + // a manifest told us there's something new to look at. + const currentState = new Map(); + for (const scope of scopes) { + let continuationToken: string | undefined; + do { + try { + const result = await this.s3Client.send( + new ListObjectsV2Command({ + Bucket: this.bucket, + Prefix: scope, + MaxKeys: 1000, + ContinuationToken: continuationToken, + }), + ); + + for (const obj of result.Contents || []) { + const fullKey = obj.Key; + if (!fullKey) continue; + const relativeKey = fullKey.startsWith(scope) + ? fullKey.substring(scope.length) + : fullKey; + // Skip the manifest object itself + anything outside the + // four data prefixes. + if (relativeKey === MANIFEST_KEY) continue; + if (!basePrefixes.some((bp) => relativeKey.startsWith(bp))) { + continue; + } + + const lastModified = obj.LastModified?.toISOString() || ""; + const stateKey = `${relativeKey}:${lastModified}`; + currentState.set(relativeKey, stateKey); + + const previousStateKey = lastKnownState.get(relativeKey); + if (previousStateKey !== stateKey) { + events.push({ + type: "change", + key: relativeKey, + lastModified, + size: obj.Size || 0, + }); + } + } + continuationToken = result.NextContinuationToken; + } catch (err) { + this.logger.error( + `List failed for scope '${scope}': ${err instanceof Error ? err.message : String(err)}`, + ); + continuationToken = undefined; + } + } while (continuationToken); + } + + // Detect deletes by comparing key sets. for (const [key] of lastKnownState) { if (!currentState.has(key)) { - events.push({ - type: "delete", - key, - }); + events.push({ type: "delete", key }); } } lastKnownState = currentState; + initialized = true; return events; }), switchMap((events) => of(...events)),