mirror of
https://github.com/zhom/donutbrowser.git
synced 2026-06-08 07:53:57 +02:00
chore: linting
This commit is contained in:
@@ -0,0 +1,37 @@
|
||||
import { Test, type TestingModule } from "@nestjs/testing";
|
||||
import { AppController } from "./app.controller.js";
|
||||
import { AppService } from "./app.service.js";
|
||||
import { SyncService } from "./sync/sync.service.js";
|
||||
|
||||
describe("AppController", () => {
|
||||
let appController: AppController;
|
||||
|
||||
beforeEach(async () => {
|
||||
const app: TestingModule = await Test.createTestingModule({
|
||||
controllers: [AppController],
|
||||
providers: [
|
||||
AppService,
|
||||
{
|
||||
provide: SyncService,
|
||||
useValue: {
|
||||
checkS3Connectivity: jest.fn().mockResolvedValue(true),
|
||||
},
|
||||
},
|
||||
],
|
||||
}).compile();
|
||||
|
||||
appController = app.get<AppController>(AppController);
|
||||
});
|
||||
|
||||
describe("root", () => {
|
||||
it("should return service name", () => {
|
||||
expect(appController.getHello()).toBe("Donut Sync Service");
|
||||
});
|
||||
});
|
||||
|
||||
describe("health", () => {
|
||||
it("should return ok status", () => {
|
||||
expect(appController.getHealth()).toEqual({ status: "ok" });
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,33 @@
|
||||
import { Controller, Get, HttpException, HttpStatus } from "@nestjs/common";
|
||||
import { AppService } from "./app.service.js";
|
||||
import { SyncService } from "./sync/sync.service.js";
|
||||
|
||||
@Controller()
|
||||
export class AppController {
|
||||
constructor(
|
||||
private readonly appService: AppService,
|
||||
private readonly syncService: SyncService,
|
||||
) {}
|
||||
|
||||
@Get()
|
||||
getHello(): string {
|
||||
return this.appService.getHello();
|
||||
}
|
||||
|
||||
@Get("health")
|
||||
getHealth(): { status: string } {
|
||||
return { status: "ok" };
|
||||
}
|
||||
|
||||
@Get("readyz")
|
||||
async getReadiness(): Promise<{ status: string; s3: boolean }> {
|
||||
const s3Ready = await this.syncService.checkS3Connectivity();
|
||||
if (!s3Ready) {
|
||||
throw new HttpException(
|
||||
{ status: "not ready", s3: false },
|
||||
HttpStatus.SERVICE_UNAVAILABLE,
|
||||
);
|
||||
}
|
||||
return { status: "ready", s3: true };
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,17 @@
|
||||
import { Module } from "@nestjs/common";
|
||||
import { ConfigModule } from "@nestjs/config";
|
||||
import { AppController } from "./app.controller.js";
|
||||
import { AppService } from "./app.service.js";
|
||||
import { SyncModule } from "./sync/sync.module.js";
|
||||
|
||||
@Module({
|
||||
imports: [
|
||||
ConfigModule.forRoot({
|
||||
isGlobal: true,
|
||||
}),
|
||||
SyncModule,
|
||||
],
|
||||
controllers: [AppController],
|
||||
providers: [AppService],
|
||||
})
|
||||
export class AppModule {}
|
||||
@@ -0,0 +1,8 @@
|
||||
import { Injectable } from "@nestjs/common";
|
||||
|
||||
@Injectable()
|
||||
export class AppService {
|
||||
getHello(): string {
|
||||
return "Donut Sync Service";
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,37 @@
|
||||
import {
|
||||
type CanActivate,
|
||||
type ExecutionContext,
|
||||
Injectable,
|
||||
UnauthorizedException,
|
||||
} from "@nestjs/common";
|
||||
import { ConfigService } from "@nestjs/config";
|
||||
import type { Request } from "express";
|
||||
|
||||
@Injectable()
|
||||
export class AuthGuard implements CanActivate {
|
||||
constructor(private configService: ConfigService) {}
|
||||
|
||||
canActivate(context: ExecutionContext): boolean {
|
||||
const request = context.switchToHttp().getRequest<Request>();
|
||||
const authHeader = request.headers.authorization;
|
||||
|
||||
if (!authHeader || !authHeader.startsWith("Bearer ")) {
|
||||
throw new UnauthorizedException(
|
||||
"Missing or invalid authorization header",
|
||||
);
|
||||
}
|
||||
|
||||
const token = authHeader.substring(7);
|
||||
const expectedToken = this.configService.get<string>("SYNC_TOKEN");
|
||||
|
||||
if (!expectedToken) {
|
||||
throw new UnauthorizedException("Sync token not configured on server");
|
||||
}
|
||||
|
||||
if (token !== expectedToken) {
|
||||
throw new UnauthorizedException("Invalid sync token");
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,30 @@
|
||||
import { NestFactory } from "@nestjs/core";
|
||||
import { AppModule } from "./app.module.js";
|
||||
|
||||
function validateEnv() {
|
||||
const required = ["SYNC_TOKEN"];
|
||||
const missing = required.filter((key) => !process.env[key]);
|
||||
if (missing.length > 0) {
|
||||
console.error(
|
||||
`Missing required environment variables: ${missing.join(", ")}`,
|
||||
);
|
||||
process.exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
async function bootstrap() {
|
||||
validateEnv();
|
||||
|
||||
const app = await NestFactory.create(AppModule);
|
||||
|
||||
app.enableCors({
|
||||
origin: "*",
|
||||
methods: ["GET", "POST", "PUT", "DELETE", "OPTIONS"],
|
||||
allowedHeaders: ["Content-Type", "Authorization"],
|
||||
});
|
||||
|
||||
const port = process.env.PORT ?? 3929;
|
||||
await app.listen(port);
|
||||
console.log(`Donut Sync service running on port ${port}`);
|
||||
}
|
||||
void bootstrap();
|
||||
@@ -0,0 +1,114 @@
|
||||
export class StatRequestDto {
|
||||
key: string;
|
||||
}
|
||||
|
||||
export class StatResponseDto {
|
||||
exists: boolean;
|
||||
lastModified?: string;
|
||||
size?: number;
|
||||
}
|
||||
|
||||
export class PresignUploadRequestDto {
|
||||
key: string;
|
||||
contentType?: string;
|
||||
expiresIn?: number;
|
||||
}
|
||||
|
||||
export class PresignUploadResponseDto {
|
||||
url: string;
|
||||
expiresAt: string;
|
||||
}
|
||||
|
||||
export class PresignDownloadRequestDto {
|
||||
key: string;
|
||||
expiresIn?: number;
|
||||
}
|
||||
|
||||
export class PresignDownloadResponseDto {
|
||||
url: string;
|
||||
expiresAt: string;
|
||||
}
|
||||
|
||||
export class DeleteRequestDto {
|
||||
key: string;
|
||||
tombstoneKey?: string;
|
||||
deletedAt?: string;
|
||||
}
|
||||
|
||||
export class DeleteResponseDto {
|
||||
deleted: boolean;
|
||||
tombstoneCreated: boolean;
|
||||
}
|
||||
|
||||
export class ListRequestDto {
|
||||
prefix: string;
|
||||
maxKeys?: number;
|
||||
continuationToken?: string;
|
||||
}
|
||||
|
||||
export class ListObjectDto {
|
||||
key: string;
|
||||
lastModified: string;
|
||||
size: number;
|
||||
}
|
||||
|
||||
export class ListResponseDto {
|
||||
objects: ListObjectDto[];
|
||||
isTruncated: boolean;
|
||||
nextContinuationToken?: string;
|
||||
}
|
||||
|
||||
export class SubscribeEventDto {
|
||||
type: "change" | "delete" | "ping";
|
||||
key?: string;
|
||||
lastModified?: string;
|
||||
size?: number;
|
||||
}
|
||||
|
||||
// Batch presign DTOs
|
||||
export class PresignUploadBatchItemDto {
|
||||
key: string;
|
||||
contentType?: string;
|
||||
}
|
||||
|
||||
export class PresignUploadBatchRequestDto {
|
||||
items: PresignUploadBatchItemDto[];
|
||||
expiresIn?: number;
|
||||
}
|
||||
|
||||
export class PresignUploadBatchItemResponseDto {
|
||||
key: string;
|
||||
url: string;
|
||||
expiresAt: string;
|
||||
}
|
||||
|
||||
export class PresignUploadBatchResponseDto {
|
||||
items: PresignUploadBatchItemResponseDto[];
|
||||
}
|
||||
|
||||
export class PresignDownloadBatchRequestDto {
|
||||
keys: string[];
|
||||
expiresIn?: number;
|
||||
}
|
||||
|
||||
export class PresignDownloadBatchItemResponseDto {
|
||||
key: string;
|
||||
url: string;
|
||||
expiresAt: string;
|
||||
}
|
||||
|
||||
export class PresignDownloadBatchResponseDto {
|
||||
items: PresignDownloadBatchItemResponseDto[];
|
||||
}
|
||||
|
||||
// Delete prefix DTOs
|
||||
export class DeletePrefixRequestDto {
|
||||
prefix: string;
|
||||
tombstoneKey?: string;
|
||||
deletedAt?: string;
|
||||
}
|
||||
|
||||
export class DeletePrefixResponseDto {
|
||||
deletedCount: number;
|
||||
tombstoneCreated: boolean;
|
||||
}
|
||||
@@ -0,0 +1,96 @@
|
||||
import {
|
||||
Body,
|
||||
Controller,
|
||||
Get,
|
||||
type MessageEvent,
|
||||
Post,
|
||||
Sse,
|
||||
UseGuards,
|
||||
} from "@nestjs/common";
|
||||
import { map, type Observable } from "rxjs";
|
||||
import { AuthGuard } from "../auth/auth.guard.js";
|
||||
import type {
|
||||
DeletePrefixRequestDto,
|
||||
DeletePrefixResponseDto,
|
||||
DeleteRequestDto,
|
||||
DeleteResponseDto,
|
||||
ListRequestDto,
|
||||
ListResponseDto,
|
||||
PresignDownloadBatchRequestDto,
|
||||
PresignDownloadBatchResponseDto,
|
||||
PresignDownloadRequestDto,
|
||||
PresignDownloadResponseDto,
|
||||
PresignUploadBatchRequestDto,
|
||||
PresignUploadBatchResponseDto,
|
||||
PresignUploadRequestDto,
|
||||
PresignUploadResponseDto,
|
||||
StatRequestDto,
|
||||
StatResponseDto,
|
||||
} from "./dto/sync.dto.js";
|
||||
import { SyncService } from "./sync.service.js";
|
||||
|
||||
@Controller("v1/objects")
|
||||
@UseGuards(AuthGuard)
|
||||
export class SyncController {
|
||||
constructor(private readonly syncService: SyncService) {}
|
||||
|
||||
@Post("stat")
|
||||
async stat(@Body() dto: StatRequestDto): Promise<StatResponseDto> {
|
||||
return this.syncService.stat(dto);
|
||||
}
|
||||
|
||||
@Post("presign-upload")
|
||||
async presignUpload(
|
||||
@Body() dto: PresignUploadRequestDto,
|
||||
): Promise<PresignUploadResponseDto> {
|
||||
return this.syncService.presignUpload(dto);
|
||||
}
|
||||
|
||||
@Post("presign-download")
|
||||
async presignDownload(
|
||||
@Body() dto: PresignDownloadRequestDto,
|
||||
): Promise<PresignDownloadResponseDto> {
|
||||
return this.syncService.presignDownload(dto);
|
||||
}
|
||||
|
||||
@Post("delete")
|
||||
async delete(@Body() dto: DeleteRequestDto): Promise<DeleteResponseDto> {
|
||||
return this.syncService.delete(dto);
|
||||
}
|
||||
|
||||
@Post("list")
|
||||
async list(@Body() dto: ListRequestDto): Promise<ListResponseDto> {
|
||||
return this.syncService.list(dto);
|
||||
}
|
||||
|
||||
@Post("presign-upload-batch")
|
||||
async presignUploadBatch(
|
||||
@Body() dto: PresignUploadBatchRequestDto,
|
||||
): Promise<PresignUploadBatchResponseDto> {
|
||||
return this.syncService.presignUploadBatch(dto);
|
||||
}
|
||||
|
||||
@Post("presign-download-batch")
|
||||
async presignDownloadBatch(
|
||||
@Body() dto: PresignDownloadBatchRequestDto,
|
||||
): Promise<PresignDownloadBatchResponseDto> {
|
||||
return this.syncService.presignDownloadBatch(dto);
|
||||
}
|
||||
|
||||
@Post("delete-prefix")
|
||||
async deletePrefix(
|
||||
@Body() dto: DeletePrefixRequestDto,
|
||||
): Promise<DeletePrefixResponseDto> {
|
||||
return this.syncService.deletePrefix(dto);
|
||||
}
|
||||
|
||||
@Get("subscribe")
|
||||
@Sse()
|
||||
subscribe(): Observable<MessageEvent> {
|
||||
return this.syncService.subscribe(2000).pipe(
|
||||
map((event) => ({
|
||||
data: event,
|
||||
})),
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,11 @@
|
||||
import { Module } from "@nestjs/common";
|
||||
import { AuthGuard } from "../auth/auth.guard.js";
|
||||
import { SyncController } from "./sync.controller.js";
|
||||
import { SyncService } from "./sync.service.js";
|
||||
|
||||
@Module({
|
||||
controllers: [SyncController],
|
||||
providers: [SyncService, AuthGuard],
|
||||
exports: [SyncService],
|
||||
})
|
||||
export class SyncModule {}
|
||||
@@ -0,0 +1,418 @@
|
||||
import {
|
||||
CreateBucketCommand,
|
||||
DeleteObjectCommand,
|
||||
DeleteObjectsCommand,
|
||||
GetObjectCommand,
|
||||
HeadBucketCommand,
|
||||
HeadObjectCommand,
|
||||
ListObjectsV2Command,
|
||||
PutObjectCommand as PutCmd,
|
||||
PutObjectCommand,
|
||||
S3Client,
|
||||
} from "@aws-sdk/client-s3";
|
||||
import { getSignedUrl } from "@aws-sdk/s3-request-presigner";
|
||||
import { Injectable, type OnModuleInit } from "@nestjs/common";
|
||||
import { ConfigService } from "@nestjs/config";
|
||||
import { interval, merge, type Observable, of, Subject } from "rxjs";
|
||||
import { catchError, filter, map, startWith, switchMap } from "rxjs/operators";
|
||||
import type {
|
||||
DeletePrefixRequestDto,
|
||||
DeletePrefixResponseDto,
|
||||
DeleteRequestDto,
|
||||
DeleteResponseDto,
|
||||
ListRequestDto,
|
||||
ListResponseDto,
|
||||
PresignDownloadBatchRequestDto,
|
||||
PresignDownloadBatchResponseDto,
|
||||
PresignDownloadRequestDto,
|
||||
PresignDownloadResponseDto,
|
||||
PresignUploadBatchRequestDto,
|
||||
PresignUploadBatchResponseDto,
|
||||
PresignUploadRequestDto,
|
||||
PresignUploadResponseDto,
|
||||
StatRequestDto,
|
||||
StatResponseDto,
|
||||
SubscribeEventDto,
|
||||
} from "./dto/sync.dto.js";
|
||||
|
||||
@Injectable()
|
||||
export class SyncService implements OnModuleInit {
|
||||
private s3Client: S3Client;
|
||||
private bucket: string;
|
||||
private lastKnownState: Map<string, string> = new Map();
|
||||
private changeSubject = new Subject<SubscribeEventDto>();
|
||||
private s3Ready = false;
|
||||
|
||||
constructor(private configService: ConfigService) {
|
||||
const endpoint =
|
||||
this.configService.get<string>("S3_ENDPOINT") || "http://localhost:8987";
|
||||
const region = this.configService.get<string>("S3_REGION") || "us-east-1";
|
||||
const accessKeyId =
|
||||
this.configService.get<string>("S3_ACCESS_KEY_ID") || "minioadmin";
|
||||
const secretAccessKey =
|
||||
this.configService.get<string>("S3_SECRET_ACCESS_KEY") || "minioadmin";
|
||||
const forcePathStyle =
|
||||
this.configService.get<string>("S3_FORCE_PATH_STYLE") !== "false";
|
||||
|
||||
this.bucket = this.configService.get<string>("S3_BUCKET") || "donut-sync";
|
||||
|
||||
this.s3Client = new S3Client({
|
||||
endpoint,
|
||||
region,
|
||||
credentials: {
|
||||
accessKeyId,
|
||||
secretAccessKey,
|
||||
},
|
||||
forcePathStyle,
|
||||
});
|
||||
}
|
||||
|
||||
async onModuleInit() {
|
||||
await this.ensureBucketExists();
|
||||
}
|
||||
|
||||
private async ensureBucketExists(): Promise<void> {
|
||||
try {
|
||||
await this.s3Client.send(new HeadBucketCommand({ Bucket: this.bucket }));
|
||||
this.s3Ready = true;
|
||||
} catch (error: unknown) {
|
||||
const isNotFound =
|
||||
error &&
|
||||
typeof error === "object" &&
|
||||
"name" in error &&
|
||||
(error.name === "NotFound" ||
|
||||
error.name === "NoSuchBucket" ||
|
||||
error.name === "404");
|
||||
|
||||
if (isNotFound) {
|
||||
try {
|
||||
await this.s3Client.send(
|
||||
new CreateBucketCommand({ Bucket: this.bucket }),
|
||||
);
|
||||
this.s3Ready = true;
|
||||
} catch (createError) {
|
||||
console.error("Failed to create S3 bucket:", createError);
|
||||
throw createError;
|
||||
}
|
||||
} else {
|
||||
console.error("S3 connection failed:", error);
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
isReady(): boolean {
|
||||
return this.s3Ready;
|
||||
}
|
||||
|
||||
async checkS3Connectivity(): Promise<boolean> {
|
||||
try {
|
||||
await this.s3Client.send(new HeadBucketCommand({ Bucket: this.bucket }));
|
||||
return true;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
async stat(dto: StatRequestDto): Promise<StatResponseDto> {
|
||||
try {
|
||||
const response = await this.s3Client.send(
|
||||
new HeadObjectCommand({
|
||||
Bucket: this.bucket,
|
||||
Key: dto.key,
|
||||
}),
|
||||
);
|
||||
|
||||
return {
|
||||
exists: true,
|
||||
lastModified: response.LastModified?.toISOString(),
|
||||
size: response.ContentLength,
|
||||
};
|
||||
} catch (error: unknown) {
|
||||
if (
|
||||
error &&
|
||||
typeof error === "object" &&
|
||||
"name" in error &&
|
||||
error.name === "NotFound"
|
||||
) {
|
||||
return { exists: false };
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async presignUpload(
|
||||
dto: PresignUploadRequestDto,
|
||||
): Promise<PresignUploadResponseDto> {
|
||||
const expiresIn = dto.expiresIn || 3600;
|
||||
const expiresAt = new Date(Date.now() + expiresIn * 1000);
|
||||
|
||||
const command = new PutCmd({
|
||||
Bucket: this.bucket,
|
||||
Key: dto.key,
|
||||
ContentType: dto.contentType || "application/octet-stream",
|
||||
});
|
||||
|
||||
const url = await getSignedUrl(this.s3Client, command, { expiresIn });
|
||||
|
||||
return {
|
||||
url,
|
||||
expiresAt: expiresAt.toISOString(),
|
||||
};
|
||||
}
|
||||
|
||||
async presignDownload(
|
||||
dto: PresignDownloadRequestDto,
|
||||
): Promise<PresignDownloadResponseDto> {
|
||||
const expiresIn = dto.expiresIn || 3600;
|
||||
const expiresAt = new Date(Date.now() + expiresIn * 1000);
|
||||
|
||||
const command = new GetObjectCommand({
|
||||
Bucket: this.bucket,
|
||||
Key: dto.key,
|
||||
});
|
||||
|
||||
const url = await getSignedUrl(this.s3Client, command, { expiresIn });
|
||||
|
||||
return {
|
||||
url,
|
||||
expiresAt: expiresAt.toISOString(),
|
||||
};
|
||||
}
|
||||
|
||||
async delete(dto: DeleteRequestDto): Promise<DeleteResponseDto> {
|
||||
let deleted = false;
|
||||
let tombstoneCreated = false;
|
||||
|
||||
try {
|
||||
await this.s3Client.send(
|
||||
new DeleteObjectCommand({
|
||||
Bucket: this.bucket,
|
||||
Key: dto.key,
|
||||
}),
|
||||
);
|
||||
deleted = true;
|
||||
} catch {
|
||||
deleted = false;
|
||||
}
|
||||
|
||||
if (dto.tombstoneKey) {
|
||||
const tombstoneData = JSON.stringify({
|
||||
id: dto.key,
|
||||
deleted_at: dto.deletedAt || new Date().toISOString(),
|
||||
});
|
||||
|
||||
await this.s3Client.send(
|
||||
new PutObjectCommand({
|
||||
Bucket: this.bucket,
|
||||
Key: dto.tombstoneKey,
|
||||
Body: tombstoneData,
|
||||
ContentType: "application/json",
|
||||
}),
|
||||
);
|
||||
tombstoneCreated = true;
|
||||
}
|
||||
|
||||
return { deleted, tombstoneCreated };
|
||||
}
|
||||
|
||||
async list(dto: ListRequestDto): Promise<ListResponseDto> {
|
||||
const response = await this.s3Client.send(
|
||||
new ListObjectsV2Command({
|
||||
Bucket: this.bucket,
|
||||
Prefix: dto.prefix,
|
||||
MaxKeys: dto.maxKeys || 1000,
|
||||
ContinuationToken: dto.continuationToken,
|
||||
}),
|
||||
);
|
||||
|
||||
const objects = (response.Contents || []).map((obj) => ({
|
||||
key: obj.Key || "",
|
||||
lastModified: obj.LastModified?.toISOString() || "",
|
||||
size: obj.Size || 0,
|
||||
}));
|
||||
|
||||
return {
|
||||
objects,
|
||||
isTruncated: response.IsTruncated || false,
|
||||
nextContinuationToken: response.NextContinuationToken,
|
||||
};
|
||||
}
|
||||
|
||||
async presignUploadBatch(
|
||||
dto: PresignUploadBatchRequestDto,
|
||||
): Promise<PresignUploadBatchResponseDto> {
|
||||
const expiresIn = dto.expiresIn || 3600;
|
||||
const expiresAt = new Date(Date.now() + expiresIn * 1000);
|
||||
|
||||
const items = await Promise.all(
|
||||
dto.items.map(async (item) => {
|
||||
const command = new PutCmd({
|
||||
Bucket: this.bucket,
|
||||
Key: item.key,
|
||||
ContentType: item.contentType || "application/octet-stream",
|
||||
});
|
||||
|
||||
const url = await getSignedUrl(this.s3Client, command, { expiresIn });
|
||||
|
||||
return {
|
||||
key: item.key,
|
||||
url,
|
||||
expiresAt: expiresAt.toISOString(),
|
||||
};
|
||||
}),
|
||||
);
|
||||
|
||||
return { items };
|
||||
}
|
||||
|
||||
async presignDownloadBatch(
|
||||
dto: PresignDownloadBatchRequestDto,
|
||||
): Promise<PresignDownloadBatchResponseDto> {
|
||||
const expiresIn = dto.expiresIn || 3600;
|
||||
const expiresAt = new Date(Date.now() + expiresIn * 1000);
|
||||
|
||||
const items = await Promise.all(
|
||||
dto.keys.map(async (key) => {
|
||||
const command = new GetObjectCommand({
|
||||
Bucket: this.bucket,
|
||||
Key: key,
|
||||
});
|
||||
|
||||
const url = await getSignedUrl(this.s3Client, command, { expiresIn });
|
||||
|
||||
return {
|
||||
key,
|
||||
url,
|
||||
expiresAt: expiresAt.toISOString(),
|
||||
};
|
||||
}),
|
||||
);
|
||||
|
||||
return { items };
|
||||
}
|
||||
|
||||
async deletePrefix(
|
||||
dto: DeletePrefixRequestDto,
|
||||
): Promise<DeletePrefixResponseDto> {
|
||||
let deletedCount = 0;
|
||||
let tombstoneCreated = false;
|
||||
let continuationToken: string | undefined;
|
||||
|
||||
// Paginate through all objects with the prefix
|
||||
do {
|
||||
const listResponse = await this.s3Client.send(
|
||||
new ListObjectsV2Command({
|
||||
Bucket: this.bucket,
|
||||
Prefix: dto.prefix,
|
||||
MaxKeys: 1000,
|
||||
ContinuationToken: continuationToken,
|
||||
}),
|
||||
);
|
||||
|
||||
const objects = listResponse.Contents || [];
|
||||
if (objects.length > 0) {
|
||||
// Delete objects in batches of 1000 (S3 limit)
|
||||
const deleteObjects = objects
|
||||
.filter((obj): obj is typeof obj & { Key: string } => !!obj.Key)
|
||||
.map((obj) => ({ Key: obj.Key }));
|
||||
|
||||
if (deleteObjects.length > 0) {
|
||||
await this.s3Client.send(
|
||||
new DeleteObjectsCommand({
|
||||
Bucket: this.bucket,
|
||||
Delete: {
|
||||
Objects: deleteObjects,
|
||||
Quiet: true,
|
||||
},
|
||||
}),
|
||||
);
|
||||
deletedCount += deleteObjects.length;
|
||||
}
|
||||
}
|
||||
|
||||
continuationToken = listResponse.NextContinuationToken;
|
||||
} while (continuationToken);
|
||||
|
||||
// Create tombstone if requested
|
||||
if (dto.tombstoneKey && deletedCount > 0) {
|
||||
const tombstoneData = JSON.stringify({
|
||||
prefix: dto.prefix,
|
||||
deleted_at: dto.deletedAt || new Date().toISOString(),
|
||||
deleted_count: deletedCount,
|
||||
});
|
||||
|
||||
await this.s3Client.send(
|
||||
new PutObjectCommand({
|
||||
Bucket: this.bucket,
|
||||
Key: dto.tombstoneKey,
|
||||
Body: tombstoneData,
|
||||
ContentType: "application/json",
|
||||
}),
|
||||
);
|
||||
tombstoneCreated = true;
|
||||
}
|
||||
|
||||
return { deletedCount, tombstoneCreated };
|
||||
}
|
||||
|
||||
subscribe(pollIntervalMs = 2000): Observable<SubscribeEventDto> {
|
||||
const prefixes = ["profiles/", "proxies/", "groups/", "tombstones/"];
|
||||
|
||||
const pollChanges$ = interval(pollIntervalMs).pipe(
|
||||
startWith(0),
|
||||
switchMap(async () => {
|
||||
const events: SubscribeEventDto[] = [];
|
||||
const currentState = new Map<string, string>();
|
||||
|
||||
for (const prefix of prefixes) {
|
||||
try {
|
||||
const result = await this.list({ prefix, maxKeys: 1000 });
|
||||
for (const obj of result.objects) {
|
||||
const stateKey = `${obj.key}:${obj.lastModified}`;
|
||||
currentState.set(obj.key, stateKey);
|
||||
|
||||
const previousStateKey = this.lastKnownState.get(obj.key);
|
||||
if (previousStateKey !== stateKey) {
|
||||
events.push({
|
||||
type: "change",
|
||||
key: obj.key,
|
||||
lastModified: obj.lastModified,
|
||||
size: obj.size,
|
||||
});
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
console.error(`Failed to list prefix ${prefix}:`, error);
|
||||
}
|
||||
}
|
||||
|
||||
for (const [key] of this.lastKnownState) {
|
||||
if (!currentState.has(key)) {
|
||||
events.push({
|
||||
type: "delete",
|
||||
key,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
this.lastKnownState = currentState;
|
||||
return events;
|
||||
}),
|
||||
switchMap((events) => of(...events)),
|
||||
filter((event): event is SubscribeEventDto => event !== null),
|
||||
catchError((error) => {
|
||||
console.error("Error in subscribe poll:", error);
|
||||
return of({ type: "ping" as const });
|
||||
}),
|
||||
);
|
||||
|
||||
const ping$ = interval(30000).pipe(map(() => ({ type: "ping" as const })));
|
||||
|
||||
return merge(pollChanges$, ping$, this.changeSubject.asObservable());
|
||||
}
|
||||
|
||||
emitChange(event: SubscribeEventDto) {
|
||||
this.changeSubject.next(event);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user