/* This file is part of the Notesnook Sync Server project (https://notesnook.com/) Copyright (C) 2023 Streetwriters (Private) Limited This program is free software: you can redistribute it and/or modify it under the terms of the Affero GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the Affero GNU General Public License for more details. You should have received a copy of the Affero GNU General Public License along with this program. If not, see . */ using System; using System.Collections.Generic; using System.Linq; using System.Net.Http; using System.Text.RegularExpressions; using System.Threading.Tasks; using Amazon; using Amazon.S3; using Amazon.S3.Model; using Microsoft.Extensions.Logging; using MongoDB.Driver; using Notesnook.API.Helpers; using Notesnook.API.Interfaces; using Notesnook.API.Models; using Streetwriters.Common; using Streetwriters.Common.Accessors; namespace Notesnook.API.Services { enum S3ClientMode { INTERNAL = 0, EXTERNAL = 1 } public class S3Service : IS3Service { private readonly string BUCKET_NAME = Constants.S3_BUCKET_NAME; private readonly string INTERNAL_BUCKET_NAME = Constants.S3_INTERNAL_BUCKET_NAME ?? Constants.S3_BUCKET_NAME; private readonly S3FailoverHelper S3Client; private ISyncItemsRepositoryAccessor Repositories { get; } private WampServiceAccessor ServiceAccessor { get; } // When running in a dockerized environment the sync server doesn't have access // to the host's S3 Service URL. It can only talk to S3 server via its own internal // network. This creates the issue where the client needs host-level access while // the sync server needs only internal access. // This wouldn't be a big issue (just map one to the other right?) but the signed // URLs generated by S3 are host specific. Changing their hostname on the fly causes // SignatureDoesNotMatch error. // That is why we create 2 separate S3 clients. One for internal traffic and one for external. private readonly S3FailoverHelper S3InternalClient; private readonly HttpClient httpClient = new(); public S3Service(ISyncItemsRepositoryAccessor syncItemsRepositoryAccessor, WampServiceAccessor wampServiceAccessor, ILogger logger) { Repositories = syncItemsRepositoryAccessor; ServiceAccessor = wampServiceAccessor; S3Client = new S3FailoverHelper( S3ClientFactory.CreateS3Clients( Constants.S3_SERVICE_URL, Constants.S3_REGION, Constants.S3_ACCESS_KEY_ID, Constants.S3_ACCESS_KEY, forcePathStyle: true ), logger: logger ); if (!string.IsNullOrEmpty(Constants.S3_INTERNAL_SERVICE_URL) && !string.IsNullOrEmpty(Constants.S3_INTERNAL_BUCKET_NAME)) { S3InternalClient = new S3FailoverHelper( S3ClientFactory.CreateS3Clients( Constants.S3_INTERNAL_SERVICE_URL, Constants.S3_REGION, Constants.S3_ACCESS_KEY_ID, Constants.S3_ACCESS_KEY, forcePathStyle: true ), logger: logger ); } else S3InternalClient = S3Client; AWSConfigsS3.UseSignatureVersion4 = true; } public async Task DeleteObjectAsync(string userId, string name) { var objectName = GetFullObjectName(userId, name) ?? throw new Exception("Invalid object name."); var response = await S3InternalClient.ExecuteWithFailoverAsync((client) => client.DeleteObjectAsync(INTERNAL_BUCKET_NAME, objectName), operationName: "DeleteObject", isWriteOperation: true); if (!IsSuccessStatusCode((int)response.HttpStatusCode)) throw new Exception("Could not delete object."); } public async Task DeleteObjectsAsync(string userId, string[] names) { var objectsToDelete = new List(); foreach (var name in names) { var objectName = GetFullObjectName(userId, name); if (objectName == null) continue; objectsToDelete.Add(new KeyVersion { Key = objectName }); } if (objectsToDelete.Count == 0) { return; } // S3 DeleteObjectsRequest supports max 1000 keys per request var batchSize = 1000; var deleteErrors = new List(); var failedBatches = 0; for (int i = 0; i < objectsToDelete.Count; i += batchSize) { var batch = objectsToDelete.Skip(i).Take(batchSize).ToList(); var deleteObjectsResponse = await S3InternalClient.ExecuteWithFailoverAsync( (client) => client.DeleteObjectsAsync(new DeleteObjectsRequest { BucketName = INTERNAL_BUCKET_NAME, Objects = batch, }), operationName: "DeleteObjects", isWriteOperation: true ); if (!IsSuccessStatusCode((int)deleteObjectsResponse.HttpStatusCode)) { failedBatches++; } if (deleteObjectsResponse.DeleteErrors.Count > 0) { deleteErrors.AddRange(deleteObjectsResponse.DeleteErrors); } } if (failedBatches > 0 || deleteErrors.Count > 0) { var errorParts = new List(); if (failedBatches > 0) { errorParts.Add($"{failedBatches} batch(es) failed with unsuccessful status code"); } if (deleteErrors.Count > 0) { errorParts.Add(string.Join(", ", deleteErrors.Select(e => $"{e.Key}: {e.Message}"))); } throw new Exception(string.Join("; ", errorParts)); } } public async Task DeleteDirectoryAsync(string userId) { var request = new ListObjectsV2Request { BucketName = INTERNAL_BUCKET_NAME, Prefix = userId, }; var response = new ListObjectsV2Response(); var keys = new List(); do { response = await S3InternalClient.ExecuteWithFailoverAsync((client) => client.ListObjectsV2Async(request), operationName: "ListObjectsV2"); response.S3Objects.ForEach(obj => keys.Add(new KeyVersion { Key = obj.Key, })); request.ContinuationToken = response.NextContinuationToken; } while (response.IsTruncated); if (keys.Count <= 0) return; var deleteObjectsResponse = await S3InternalClient.ExecuteWithFailoverAsync((client) => client.DeleteObjectsAsync(new DeleteObjectsRequest { BucketName = INTERNAL_BUCKET_NAME, Objects = keys, }), operationName: "DeleteObjects", isWriteOperation: true); if (!IsSuccessStatusCode((int)deleteObjectsResponse.HttpStatusCode)) throw new Exception("Could not delete directory."); } public async Task GetObjectSizeAsync(string userId, string name) { var url = await this.GetPresignedURLAsync(userId, name, HttpVerb.HEAD, S3ClientMode.INTERNAL); if (url == null) return 0; var request = new HttpRequestMessage(HttpMethod.Head, url); var response = await httpClient.SendAsync(request); return response.Content.Headers.ContentLength ?? 0; } public async Task GetUploadObjectUrlAsync(string userId, string name) { return await this.GetPresignedURLAsync(userId, name, HttpVerb.PUT); } public async Task GetInternalUploadObjectUrlAsync(string userId, string name) { return await this.GetPresignedURLAsync(userId, name, HttpVerb.PUT, S3ClientMode.INTERNAL); } public async Task GetDownloadObjectUrlAsync(string userId, string name) { // var subscriptionService = await WampServers.SubscriptionServer.GetServiceAsync(SubscriptionServerTopics.UserSubscriptionServiceTopic); // var subscription = await subscriptionService.GetUserSubscriptionAsync(Clients.Notesnook.Id, userId); // var size = await GetObjectSizeAsync(userId, name); // if (StorageHelper.IsFileSizeExceeded(subscription, size)) // { // var fileSizeLimit = StorageHelper.GetFileSizeLimitForPlan(subscription); // throw new Exception($"You cannot download files larger than {StorageHelper.FormatBytes(fileSizeLimit)} on this plan."); // } var url = await this.GetPresignedURLAsync(userId, name, HttpVerb.GET); if (url == null) return null; return url; } public async Task StartMultipartUploadAsync(string userId, string name, int parts, string? uploadId = null) { var objectName = GetFullObjectName(userId, name); if (userId == null || objectName == null) throw new Exception("Could not initiate multipart upload."); if (string.IsNullOrEmpty(uploadId)) { var response = await S3InternalClient.ExecuteWithFailoverAsync((client) => client.InitiateMultipartUploadAsync(INTERNAL_BUCKET_NAME, objectName), operationName: "InitiateMultipartUpload", isWriteOperation: true); if (!IsSuccessStatusCode((int)response.HttpStatusCode)) throw new Exception("Failed to initiate multipart upload."); uploadId = response.UploadId; } var signedUrls = new string[parts]; for (var i = 0; i < parts; ++i) { signedUrls[i] = await GetPresignedURLForUploadPartAsync(objectName, uploadId, i + 1); } return new MultipartUploadMeta { UploadId = uploadId, Parts = signedUrls }; } public async Task AbortMultipartUploadAsync(string userId, string name, string uploadId) { var objectName = GetFullObjectName(userId, name); if (userId == null || objectName == null) throw new Exception("Could not abort multipart upload."); var response = await S3InternalClient.ExecuteWithFailoverAsync((client) => client.AbortMultipartUploadAsync(INTERNAL_BUCKET_NAME, objectName, uploadId), operationName: "AbortMultipartUpload", isWriteOperation: true); if (!IsSuccessStatusCode(((int)response.HttpStatusCode))) throw new Exception("Failed to abort multipart upload."); } private async Task GetMultipartUploadSizeAsync(string userId, string key, string uploadId) { var objectName = GetFullObjectName(userId, key); var parts = await S3InternalClient.ExecuteWithFailoverAsync((client) => client.ListPartsAsync(INTERNAL_BUCKET_NAME, objectName, uploadId), operationName: "ListParts"); long totalSize = 0; foreach (var part in parts.Parts) { totalSize += part.Size; } return totalSize; } public async Task CompleteMultipartUploadAsync(string userId, CompleteMultipartUploadRequest uploadRequest) { var objectName = GetFullObjectName(userId, uploadRequest.Key); if (userId == null || objectName == null) throw new Exception("Could not abort multipart upload."); var userSettings = await Repositories.UsersSettings.FindOneAsync((u) => u.UserId == userId); if (userSettings == null) { await this.AbortMultipartUploadAsync(userId, uploadRequest.Key, uploadRequest.UploadId); throw new Exception("User settings not found."); } userSettings.StorageLimit ??= StorageHelper.RolloverStorageLimit(userSettings.StorageLimit); if (!Constants.IS_SELF_HOSTED) { var subscription = await ServiceAccessor.UserSubscriptionService.GetUserSubscriptionAsync(Clients.Notesnook.Id, userId) ?? throw new Exception("User subscription not found."); long fileSize = await GetMultipartUploadSizeAsync(userId, uploadRequest.Key, uploadRequest.UploadId); if (StorageHelper.IsFileSizeExceeded(subscription, fileSize)) { await this.AbortMultipartUploadAsync(userId, uploadRequest.Key, uploadRequest.UploadId); throw new Exception("Max file size exceeded."); } userSettings.StorageLimit.Value += fileSize; if (StorageHelper.IsStorageLimitReached(subscription, userSettings.StorageLimit.Value)) { await this.AbortMultipartUploadAsync(userId, uploadRequest.Key, uploadRequest.UploadId); throw new Exception("Storage limit reached."); } } uploadRequest.Key = objectName; uploadRequest.BucketName = INTERNAL_BUCKET_NAME; var response = await S3InternalClient.ExecuteWithFailoverAsync((client) => client.CompleteMultipartUploadAsync(uploadRequest), operationName: "CompleteMultipartUpload", isWriteOperation: true); if (!IsSuccessStatusCode((int)response.HttpStatusCode)) throw new Exception("Failed to complete multipart upload."); if (!Constants.IS_SELF_HOSTED) { await Repositories.UsersSettings.UpsertAsync(userSettings, (u) => u.UserId == userId); await Repositories.UsersSettings.Collection.UpdateOneAsync( Builders.Filter.Eq(u => u.UserId, userId), Builders.Update.Set(u => u.StorageLimit, userSettings.StorageLimit) ); } } private async Task GetPresignedURLAsync(string userId, string name, HttpVerb httpVerb, S3ClientMode mode = S3ClientMode.EXTERNAL) { var objectName = GetFullObjectName(userId, name); if (userId == null || objectName == null) return null; var client = mode == S3ClientMode.INTERNAL ? S3InternalClient : S3Client; var bucketName = mode == S3ClientMode.INTERNAL ? INTERNAL_BUCKET_NAME : BUCKET_NAME; return await client.ExecuteWithFailoverAsync(client => { var request = new GetPreSignedUrlRequest { BucketName = bucketName, Expires = System.DateTime.Now.AddHours(1), Verb = httpVerb, Key = objectName, #if (DEBUG || STAGING) Protocol = Protocol.HTTP, #else Protocol = client.Config.ServiceURL.StartsWith("http://") ? Protocol.HTTP : Protocol.HTTPS, #endif }; return client.GetPreSignedURLAsync(request); }, operationName: "GetPreSignedURL"); } private Task GetPresignedURLForUploadPartAsync(string objectName, string uploadId, int partNumber, S3ClientMode mode = S3ClientMode.EXTERNAL) { var client = mode == S3ClientMode.INTERNAL ? S3InternalClient : S3Client; var bucketName = mode == S3ClientMode.INTERNAL ? INTERNAL_BUCKET_NAME : BUCKET_NAME; return client.ExecuteWithFailoverAsync(c => c.GetPreSignedURLAsync(new GetPreSignedUrlRequest { BucketName = bucketName, Expires = System.DateTime.Now.AddHours(1), Verb = HttpVerb.PUT, Key = objectName, PartNumber = partNumber, UploadId = uploadId, #if (DEBUG || STAGING) Protocol = Protocol.HTTP, #else Protocol = c.Config.ServiceURL.StartsWith("http://") ? Protocol.HTTP : Protocol.HTTPS, #endif }), operationName: "GetPreSignedURL"); } private static string? GetFullObjectName(string userId, string name) { if (userId == null || !Regex.IsMatch(name, "[0-9a-zA-Z!" + Regex.Escape("-") + "_.*'()]")) return null; return $"{userId}/{name}"; } static bool IsSuccessStatusCode(int statusCode) { return ((int)statusCode >= 200) && ((int)statusCode <= 299); } } }