s3: add support for failover

This commit is contained in:
Abdullah Atta
2025-12-17 09:06:26 +05:00
parent 347507f00a
commit 265b456c46
5 changed files with 494 additions and 99 deletions
+74 -88
View File
@@ -27,6 +27,7 @@ using Amazon;
using Amazon.Runtime;
using Amazon.S3;
using Amazon.S3.Model;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MongoDB.Driver;
using Notesnook.API.Helpers;
@@ -47,9 +48,9 @@ namespace Notesnook.API.Services
public class S3Service : IS3Service
{
private readonly string BUCKET_NAME = Constants.S3_BUCKET_NAME ?? "";
private readonly string INTERNAL_BUCKET_NAME = Constants.S3_INTERNAL_BUCKET_NAME ?? "";
private AmazonS3Client S3Client { get; }
private readonly string BUCKET_NAME = Constants.S3_BUCKET_NAME;
private readonly string INTERNAL_BUCKET_NAME = Constants.S3_INTERNAL_BUCKET_NAME ?? Constants.S3_BUCKET_NAME;
private readonly S3FailoverHelper S3Client;
private ISyncItemsRepositoryAccessor Repositories { get; }
// When running in a dockerized environment the sync server doesn't have access
@@ -60,57 +61,48 @@ namespace Notesnook.API.Services
// URLs generated by S3 are host specific. Changing their hostname on the fly causes
// SignatureDoesNotMatch error.
// That is why we create 2 separate S3 clients. One for internal traffic and one for external.
private AmazonS3Client? S3InternalClient { get; }
private readonly S3FailoverHelper S3InternalClient;
private readonly HttpClient httpClient = new();
public S3Service(ISyncItemsRepositoryAccessor syncItemsRepositoryAccessor)
public S3Service(ISyncItemsRepositoryAccessor syncItemsRepositoryAccessor, ILogger<S3Service> logger)
{
Repositories = syncItemsRepositoryAccessor;
var config = CreateConfig();
#if (DEBUG || STAGING)
S3Client = new AmazonS3Client("S3RVER", "S3RVER", config);
#else
S3Client = new AmazonS3Client(Constants.S3_ACCESS_KEY_ID, Constants.S3_ACCESS_KEY, config);
#endif
S3Client = new S3FailoverHelper(
S3ClientFactory.CreateS3Clients(
Constants.S3_SERVICE_URL,
Constants.S3_REGION,
Constants.S3_ACCESS_KEY_ID,
Constants.S3_ACCESS_KEY,
forcePathStyle: true
),
logger: logger
);
if (!string.IsNullOrEmpty(Constants.S3_INTERNAL_SERVICE_URL))
if (!string.IsNullOrEmpty(Constants.S3_INTERNAL_SERVICE_URL) && !string.IsNullOrEmpty(Constants.S3_INTERNAL_BUCKET_NAME))
{
S3InternalClient = new AmazonS3Client(Constants.S3_ACCESS_KEY_ID, Constants.S3_ACCESS_KEY, new AmazonS3Config
{
ServiceURL = Constants.S3_INTERNAL_SERVICE_URL,
AuthenticationRegion = Constants.S3_REGION,
ForcePathStyle = true,
SignatureMethod = SigningAlgorithm.HmacSHA256,
SignatureVersion = "4"
});
S3InternalClient = new S3FailoverHelper(
S3ClientFactory.CreateS3Clients(
Constants.S3_INTERNAL_SERVICE_URL,
Constants.S3_REGION,
Constants.S3_ACCESS_KEY_ID,
Constants.S3_ACCESS_KEY,
forcePathStyle: true
),
logger: logger
);
}
else S3InternalClient = S3Client;
AWSConfigsS3.UseSignatureVersion4 = true;
}
public static AmazonS3Config CreateConfig()
{
return new AmazonS3Config
{
#if (DEBUG || STAGING)
ServiceURL = Servers.S3Server.ToString(),
#else
ServiceURL = Constants.S3_SERVICE_URL,
AuthenticationRegion = Constants.S3_REGION,
#endif
ForcePathStyle = true,
SignatureMethod = SigningAlgorithm.HmacSHA256,
SignatureVersion = "4"
};
}
public async Task DeleteObjectAsync(string userId, string name)
{
var objectName = GetFullObjectName(userId, name) ?? throw new Exception("Invalid object name.");
var response = await GetS3Client(S3ClientMode.INTERNAL).DeleteObjectAsync(GetBucketName(S3ClientMode.INTERNAL), objectName);
var response = await S3InternalClient.ExecuteWithFailoverAsync((client) => client.DeleteObjectAsync(INTERNAL_BUCKET_NAME, objectName), operationName: "DeleteObject", isWriteOperation: true);
if (!IsSuccessStatusCode(((int)response.HttpStatusCode)))
if (!IsSuccessStatusCode((int)response.HttpStatusCode))
throw new Exception("Could not delete object.");
}
@@ -118,7 +110,7 @@ namespace Notesnook.API.Services
{
var request = new ListObjectsV2Request
{
BucketName = GetBucketName(S3ClientMode.INTERNAL),
BucketName = INTERNAL_BUCKET_NAME,
Prefix = userId,
};
@@ -126,7 +118,7 @@ namespace Notesnook.API.Services
var keys = new List<KeyVersion>();
do
{
response = await GetS3Client(S3ClientMode.INTERNAL).ListObjectsV2Async(request);
response = await S3InternalClient.ExecuteWithFailoverAsync((client) => client.ListObjectsV2Async(request), operationName: "ListObjectsV2");
response.S3Objects.ForEach(obj => keys.Add(new KeyVersion
{
Key = obj.Key,
@@ -138,12 +130,11 @@ namespace Notesnook.API.Services
if (keys.Count <= 0) return;
var deleteObjectsResponse = await GetS3Client(S3ClientMode.INTERNAL)
.DeleteObjectsAsync(new DeleteObjectsRequest
var deleteObjectsResponse = await S3InternalClient.ExecuteWithFailoverAsync((client) => client.DeleteObjectsAsync(new DeleteObjectsRequest
{
BucketName = GetBucketName(S3ClientMode.INTERNAL),
BucketName = INTERNAL_BUCKET_NAME,
Objects = keys,
});
}), operationName: "DeleteObjects", isWriteOperation: true);
if (!IsSuccessStatusCode((int)deleteObjectsResponse.HttpStatusCode))
throw new Exception("Could not delete directory.");
@@ -151,7 +142,7 @@ namespace Notesnook.API.Services
public async Task<long> GetObjectSizeAsync(string userId, string name)
{
var url = this.GetPresignedURL(userId, name, HttpVerb.HEAD, S3ClientMode.INTERNAL);
var url = await this.GetPresignedURLAsync(userId, name, HttpVerb.HEAD, S3ClientMode.INTERNAL);
if (url == null) return 0;
var request = new HttpRequestMessage(HttpMethod.Head, url);
@@ -160,17 +151,17 @@ namespace Notesnook.API.Services
}
public string? GetUploadObjectUrl(string userId, string name)
public async Task<string?> GetUploadObjectUrlAsync(string userId, string name)
{
return this.GetPresignedURL(userId, name, HttpVerb.PUT);
return await this.GetPresignedURLAsync(userId, name, HttpVerb.PUT);
}
public string? GetInternalUploadObjectUrl(string userId, string name)
public async Task<string?> GetInternalUploadObjectUrlAsync(string userId, string name)
{
return this.GetPresignedURL(userId, name, HttpVerb.PUT, S3ClientMode.INTERNAL);
return await this.GetPresignedURLAsync(userId, name, HttpVerb.PUT, S3ClientMode.INTERNAL);
}
public async Task<string?> GetDownloadObjectUrl(string userId, string name)
public async Task<string?> GetDownloadObjectUrlAsync(string userId, string name)
{
// var subscriptionService = await WampServers.SubscriptionServer.GetServiceAsync<IUserSubscriptionService>(SubscriptionServerTopics.UserSubscriptionServiceTopic);
// var subscription = await subscriptionService.GetUserSubscriptionAsync(Clients.Notesnook.Id, userId);
@@ -182,7 +173,7 @@ namespace Notesnook.API.Services
// throw new Exception($"You cannot download files larger than {StorageHelper.FormatBytes(fileSizeLimit)} on this plan.");
// }
var url = this.GetPresignedURL(userId, name, HttpVerb.GET);
var url = await this.GetPresignedURLAsync(userId, name, HttpVerb.GET);
if (url == null) return null;
return url;
}
@@ -194,8 +185,8 @@ namespace Notesnook.API.Services
if (string.IsNullOrEmpty(uploadId))
{
var response = await GetS3Client(S3ClientMode.INTERNAL).InitiateMultipartUploadAsync(GetBucketName(S3ClientMode.INTERNAL), objectName);
if (!IsSuccessStatusCode(((int)response.HttpStatusCode))) throw new Exception("Failed to initiate multipart upload.");
var response = await S3InternalClient.ExecuteWithFailoverAsync((client) => client.InitiateMultipartUploadAsync(INTERNAL_BUCKET_NAME, objectName), operationName: "InitiateMultipartUpload", isWriteOperation: true);
if (!IsSuccessStatusCode((int)response.HttpStatusCode)) throw new Exception("Failed to initiate multipart upload.");
uploadId = response.UploadId;
}
@@ -203,7 +194,7 @@ namespace Notesnook.API.Services
var signedUrls = new string[parts];
for (var i = 0; i < parts; ++i)
{
signedUrls[i] = GetPresignedURLForUploadPart(objectName, uploadId, i + 1);
signedUrls[i] = await GetPresignedURLForUploadPartAsync(objectName, uploadId, i + 1);
}
return new MultipartUploadMeta
@@ -218,14 +209,14 @@ namespace Notesnook.API.Services
var objectName = GetFullObjectName(userId, name);
if (userId == null || objectName == null) throw new Exception("Could not abort multipart upload.");
var response = await GetS3Client(S3ClientMode.INTERNAL).AbortMultipartUploadAsync(GetBucketName(S3ClientMode.INTERNAL), objectName, uploadId);
var response = await S3InternalClient.ExecuteWithFailoverAsync((client) => client.AbortMultipartUploadAsync(INTERNAL_BUCKET_NAME, objectName, uploadId), operationName: "AbortMultipartUpload", isWriteOperation: true);
if (!IsSuccessStatusCode(((int)response.HttpStatusCode))) throw new Exception("Failed to abort multipart upload.");
}
private async Task<long> GetMultipartUploadSizeAsync(string userId, string key, string uploadId)
{
var objectName = GetFullObjectName(userId, key);
var parts = await GetS3Client(S3ClientMode.INTERNAL).ListPartsAsync(GetBucketName(S3ClientMode.INTERNAL), objectName, uploadId);
var parts = await S3InternalClient.ExecuteWithFailoverAsync((client) => client.ListPartsAsync(INTERNAL_BUCKET_NAME, objectName, uploadId), operationName: "ListParts");
long totalSize = 0;
foreach (var part in parts.Parts)
{
@@ -268,9 +259,9 @@ namespace Notesnook.API.Services
}
uploadRequest.Key = objectName;
uploadRequest.BucketName = GetBucketName(S3ClientMode.INTERNAL);
var response = await GetS3Client(S3ClientMode.INTERNAL).CompleteMultipartUploadAsync(uploadRequest);
if (!IsSuccessStatusCode(((int)response.HttpStatusCode))) throw new Exception("Failed to complete multipart upload.");
uploadRequest.BucketName = INTERNAL_BUCKET_NAME;
var response = await S3InternalClient.ExecuteWithFailoverAsync((client) => client.CompleteMultipartUploadAsync(uploadRequest), operationName: "CompleteMultipartUpload", isWriteOperation: true);
if (!IsSuccessStatusCode((int)response.HttpStatusCode)) throw new Exception("Failed to complete multipart upload.");
if (!Constants.IS_SELF_HOSTED)
{
@@ -282,34 +273,41 @@ namespace Notesnook.API.Services
}
}
private string? GetPresignedURL(string userId, string name, HttpVerb httpVerb, S3ClientMode mode = S3ClientMode.EXTERNAL)
private async Task<string?> GetPresignedURLAsync(string userId, string name, HttpVerb httpVerb, S3ClientMode mode = S3ClientMode.EXTERNAL)
{
var objectName = GetFullObjectName(userId, name);
if (userId == null || objectName == null) return null;
var client = GetS3Client(mode);
var request = new GetPreSignedUrlRequest
var client = mode == S3ClientMode.INTERNAL ? S3InternalClient : S3Client;
var bucketName = mode == S3ClientMode.INTERNAL ? INTERNAL_BUCKET_NAME : BUCKET_NAME;
return await client.ExecuteWithFailoverAsync(client =>
{
BucketName = GetBucketName(mode),
Expires = System.DateTime.Now.AddHours(1),
Verb = httpVerb,
Key = objectName,
var request = new GetPreSignedUrlRequest
{
BucketName = bucketName,
Expires = System.DateTime.Now.AddHours(1),
Verb = httpVerb,
Key = objectName,
#if (DEBUG || STAGING)
Protocol = Protocol.HTTP,
Protocol = Protocol.HTTP,
#else
Protocol = client.Config.ServiceURL.StartsWith("http://") ? Protocol.HTTP : Protocol.HTTPS,
Protocol = client.Config.ServiceURL.StartsWith("http://") ? Protocol.HTTP : Protocol.HTTPS,
#endif
};
return client.GetPreSignedURL(request);
};
return client.GetPreSignedURLAsync(request);
}, operationName: "GetPreSignedURL");
}
private string GetPresignedURLForUploadPart(string objectName, string uploadId, int partNumber, S3ClientMode mode = S3ClientMode.EXTERNAL)
private Task<string> GetPresignedURLForUploadPartAsync(string objectName, string uploadId, int partNumber, S3ClientMode mode = S3ClientMode.EXTERNAL)
{
var client = GetS3Client(mode);
return client.GetPreSignedURL(new GetPreSignedUrlRequest
var client = mode == S3ClientMode.INTERNAL ? S3InternalClient : S3Client;
var bucketName = mode == S3ClientMode.INTERNAL ? INTERNAL_BUCKET_NAME : BUCKET_NAME;
return client.ExecuteWithFailoverAsync(c => c.GetPreSignedURLAsync(new GetPreSignedUrlRequest
{
BucketName = GetBucketName(mode),
BucketName = bucketName,
Expires = System.DateTime.Now.AddHours(1),
Verb = HttpVerb.PUT,
Key = objectName,
@@ -318,9 +316,9 @@ namespace Notesnook.API.Services
#if (DEBUG || STAGING)
Protocol = Protocol.HTTP,
#else
Protocol = client.Config.ServiceURL.StartsWith("http://") ? Protocol.HTTP : Protocol.HTTPS,
Protocol = c.Config.ServiceURL.StartsWith("http://") ? Protocol.HTTP : Protocol.HTTPS,
#endif
});
}), operationName: "GetPreSignedURL");
}
private static string? GetFullObjectName(string userId, string name)
@@ -333,17 +331,5 @@ namespace Notesnook.API.Services
{
return ((int)statusCode >= 200) && ((int)statusCode <= 299);
}
AmazonS3Client GetS3Client(S3ClientMode mode = S3ClientMode.EXTERNAL)
{
if (mode == S3ClientMode.INTERNAL && S3InternalClient != null) return S3InternalClient;
return S3Client;
}
string GetBucketName(S3ClientMode mode = S3ClientMode.EXTERNAL)
{
if (mode == S3ClientMode.INTERNAL && S3InternalClient != null) return INTERNAL_BUCKET_NAME;
return BUCKET_NAME;
}
}
}