api: remove legacy sync hub

This commit is contained in:
Abdullah Atta
2025-10-14 21:16:59 +05:00
parent 50f159a37b
commit 8db33889b6
9 changed files with 40 additions and 613 deletions

View File

@@ -7,19 +7,17 @@ public sealed class SyncEventCounterSource : EventSource
{
public static readonly SyncEventCounterSource Log = new();
private Meter meter = new("Notesnook.API.Metrics.Sync", "1.0.0");
private Counter<int> fetchCounter;
private Counter<int> pushCounter;
private Counter<int> legacyFetchCounter;
private Counter<int> pushV2Counter;
private Counter<int> fetchV2Counter;
private Histogram<long> fetchV2Duration;
private Histogram<long> pushV2Duration;
private readonly Meter meter = new("Notesnook.API.Metrics.Sync", "1.0.0");
private readonly Counter<int> fetchCounter;
private readonly Counter<int> pushCounter;
private readonly Counter<int> pushV2Counter;
private readonly Counter<int> fetchV2Counter;
private readonly Histogram<long> fetchV2Duration;
private readonly Histogram<long> pushV2Duration;
private SyncEventCounterSource()
{
fetchCounter = meter.CreateCounter<int>("sync.fetches", "fetches", "Total fetches");
pushCounter = meter.CreateCounter<int>("sync.pushes", "pushes", "Total pushes");
legacyFetchCounter = meter.CreateCounter<int>("sync.legacy-fetches", "fetches", "Total legacy fetches");
fetchV2Counter = meter.CreateCounter<int>("sync.v2.fetches", "fetches", "Total v2 fetches");
pushV2Counter = meter.CreateCounter<int>("sync.v2.pushes", "pushes", "Total v2 pushes");
fetchV2Duration = meter.CreateHistogram<long>("sync.v2.fetch_duration");
@@ -27,7 +25,6 @@ public sealed class SyncEventCounterSource : EventSource
}
public void Fetch() => fetchCounter.Add(1);
public void LegacyFetch() => legacyFetchCounter.Add(1);
public void FetchV2() => fetchV2Counter.Add(1);
public void PushV2() => pushV2Counter.Add(1);
public void Push() => pushCounter.Add(1);
@@ -36,14 +33,7 @@ public sealed class SyncEventCounterSource : EventSource
protected override void Dispose(bool disposing)
{
legacyFetchCounter = null;
fetchV2Counter = null;
pushV2Counter = null;
pushCounter = null;
fetchCounter = null;
meter.Dispose();
meter = null;
base.Dispose(disposing);
}
}

View File

@@ -1,473 +0,0 @@
/*
This file is part of the Notesnook Sync Server project (https://notesnook.com/)
Copyright (C) 2023 Streetwriters (Private) Limited
This program is free software: you can redistribute it and/or modify
it under the terms of the Affero GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Affero GNU General Public License for more details.
You should have received a copy of the Affero GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Security.Claims;
using System.Text.Json;
using System.Text.Json.Serialization;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.SignalR;
using MongoDB.Driver;
using Notesnook.API.Authorization;
using Notesnook.API.Interfaces;
using Notesnook.API.Models;
using Notesnook.API.Repositories;
using Streetwriters.Common.Models;
using Streetwriters.Data.Interfaces;
namespace Notesnook.API.Hubs
{
public struct RunningPush
{
public long Timestamp { get; set; }
public long Validity { get; set; }
public string ConnectionId { get; set; }
}
public interface ISyncHubClient
{
Task PushItems(SyncTransferItemV2 transferItem);
Task<bool> SendItems(SyncTransferItemV2 transferItem);
Task PushCompleted(long lastSynced);
}
public class GlobalSync
{
private const long PUSH_VALIDITY_EXTENSION_PERIOD = 16 * 1000; // 16 second
private const int PUSH_VALIDITY_PERIOD_PER_ITEM = 5 * 100; // 0.5 second
private const long BASE_PUSH_VALIDITY_PERIOD = 5 * 1000; // 5 seconds
private const long BASE_PUSH_VALIDITY_PERIOD_NEW = 16 * 1000; // 16 seconds
private readonly static Dictionary<string, List<RunningPush>> PushOperations = new();
public static void ClearPushOperations(string userId, string connectionId)
{
if (PushOperations.TryGetValue(userId, out List<RunningPush> operations))
{
var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
foreach (var push in operations.ToArray())
if (push.ConnectionId == connectionId || !IsPushValid(push, now))
operations.Remove(push);
}
}
public static bool IsPushing(string userId, string connectionId)
{
if (PushOperations.TryGetValue(userId, out List<RunningPush> operations))
{
var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
foreach (var push in operations)
if (push.ConnectionId == connectionId && IsPushValid(push, now)) return true;
}
return false;
}
public static bool IsUserPushing(string userId)
{
var count = 0;
if (PushOperations.TryGetValue(userId, out List<RunningPush> operations))
{
var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
foreach (var push in operations)
if (IsPushValid(push, now)) ++count;
}
return count > 0;
}
public static void StartPush(string userId, string connectionId, long? totalItems = null)
{
if (IsPushing(userId, connectionId)) return;
if (!PushOperations.ContainsKey(userId))
PushOperations[userId] = new List<RunningPush>();
PushOperations[userId].Add(new RunningPush
{
ConnectionId = connectionId,
Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
Validity = totalItems.HasValue ? BASE_PUSH_VALIDITY_PERIOD + (totalItems.Value * PUSH_VALIDITY_PERIOD_PER_ITEM) : BASE_PUSH_VALIDITY_PERIOD_NEW
});
}
public static void ExtendPush(string userId, string connectionId)
{
if (!IsPushing(userId, connectionId) || !PushOperations.ContainsKey(userId))
{
StartPush(userId, connectionId);
return;
}
var index = PushOperations[userId].FindIndex((push) => push.ConnectionId == connectionId);
if (index < 0)
{
StartPush(userId, connectionId);
return;
}
var pushOperation = PushOperations[userId][index];
pushOperation.Validity += PUSH_VALIDITY_EXTENSION_PERIOD;
}
private static bool IsPushValid(RunningPush push, long now)
{
return now < push.Timestamp + push.Validity;
}
}
[Authorize("Sync")]
public class SyncHub : Hub<ISyncHubClient>
{
private ISyncItemsRepositoryAccessor Repositories { get; }
private readonly IUnitOfWork unit;
private readonly string[] CollectionKeys = new[] {
"settings",
"attachment",
"note",
"notebook",
"content",
"shortcut",
"reminder",
"relation", // relations must sync at the end to prevent invalid state
};
public SyncHub(ISyncItemsRepositoryAccessor syncItemsRepositoryAccessor, IUnitOfWork unitOfWork)
{
Repositories = syncItemsRepositoryAccessor;
unit = unitOfWork;
}
public override async Task OnConnectedAsync()
{
var result = new SyncRequirement().IsAuthorized(Context.User, new PathString("/hubs/sync"));
if (!result.Succeeded)
{
var reason = result.AuthorizationFailure.FailureReasons.FirstOrDefault();
throw new HubException(reason?.Message ?? "Unauthorized");
}
var id = Context.User.FindFirstValue("sub");
await Groups.AddToGroupAsync(Context.ConnectionId, id);
await base.OnConnectedAsync();
}
public override async Task OnDisconnectedAsync(Exception exception)
{
try
{
await base.OnDisconnectedAsync(exception);
}
finally
{
var id = Context.User.FindFirstValue("sub");
GlobalSync.ClearPushOperations(id, Context.ConnectionId);
}
}
private Action<SyncItem, string, long> MapTypeToUpsertAction(string type)
{
return type switch
{
"attachment" => Repositories.Attachments.Upsert,
"note" => Repositories.Notes.Upsert,
"notebook" => Repositories.Notebooks.Upsert,
"content" => Repositories.Contents.Upsert,
"shortcut" => Repositories.Shortcuts.Upsert,
"reminder" => Repositories.Reminders.Upsert,
"relation" => Repositories.Relations.Upsert,
_ => null,
};
}
public async Task<long> InitializePush(SyncMetadata syncMetadata)
{
if (syncMetadata.LastSynced <= 0) throw new HubException("Last synced time cannot be zero or less than zero.");
var userId = Context.User.FindFirstValue("sub");
if (string.IsNullOrEmpty(userId)) return 0;
UserSettings userSettings = await Repositories.UsersSettings.FindOneAsync((u) => u.UserId == userId);
long dateSynced = Math.Max(syncMetadata.LastSynced, userSettings.LastSynced);
GlobalSync.StartPush(userId, Context.ConnectionId);
if (
(userSettings.VaultKey != null &&
syncMetadata.VaultKey != null &&
!userSettings.VaultKey.Equals(syncMetadata.VaultKey) &&
!syncMetadata.VaultKey.IsEmpty()) ||
(userSettings.VaultKey == null &&
syncMetadata.VaultKey != null &&
!syncMetadata.VaultKey.IsEmpty()))
{
userSettings.VaultKey = syncMetadata.VaultKey;
await Repositories.UsersSettings.UpsertAsync(userSettings, (u) => u.UserId == userId);
}
return dateSynced;
}
public async Task<int> PushItems(SyncTransferItemV2 pushItem, long dateSynced)
{
var userId = Context.User.FindFirstValue("sub");
if (string.IsNullOrEmpty(userId)) return 0;
SyncEventCounterSource.Log.Push();
try
{
var others = Clients.OthersInGroup(userId);
others.PushItems(pushItem);
GlobalSync.ExtendPush(userId, Context.ConnectionId);
if (pushItem.Type == "settings")
{
var settings = pushItem.Items.First();
if (settings == null) return 0;
settings.Id = MongoDB.Bson.ObjectId.Parse(userId);
settings.ItemId = userId;
Repositories.LegacySettings.Upsert(settings, userId, dateSynced);
}
else
{
var UpsertItem = MapTypeToUpsertAction(pushItem.Type) ?? throw new Exception("Invalid item type.");
foreach (var item in pushItem.Items)
{
UpsertItem(item, userId, dateSynced);
}
}
return await unit.Commit() ? 1 : 0;
}
catch (Exception ex)
{
GlobalSync.ClearPushOperations(userId, Context.ConnectionId);
throw ex;
}
}
public async Task<bool> SyncCompleted(long dateSynced)
{
var userId = Context.User.FindFirstValue("sub");
try
{
UserSettings userSettings = await this.Repositories.UsersSettings.FindOneAsync((u) => u.UserId == userId);
long lastSynced = Math.Max(dateSynced, userSettings.LastSynced);
userSettings.LastSynced = lastSynced;
await this.Repositories.UsersSettings.UpsertAsync(userSettings, (u) => u.UserId == userId);
await Clients.OthersInGroup(userId).PushCompleted(lastSynced);
return true;
}
finally
{
GlobalSync.ClearPushOperations(userId, Context.ConnectionId);
}
}
private static async IAsyncEnumerable<SyncTransferItemV2> PrepareChunks(Func<string, long, int, Task<IAsyncCursor<SyncItem>>>[] collections, string[] types, string userId, long lastSyncedTimestamp, int size, long maxBytes, int skipChunks)
{
var chunksProcessed = 0;
for (int i = 0; i < collections.Length; i++)
{
var type = types[i];
using var cursor = await collections[i](userId, lastSyncedTimestamp, size);
var chunk = new List<SyncItem>();
long totalBytes = 0;
long METADATA_BYTES = 5 * 1024;
while (await cursor.MoveNextAsync())
{
if (chunksProcessed++ < skipChunks) continue;
foreach (var item in cursor.Current)
{
chunk.Add(item);
totalBytes += item.Length + METADATA_BYTES;
if (totalBytes >= maxBytes)
{
yield return new SyncTransferItemV2
{
Items = chunk,
Type = type,
Count = chunksProcessed
};
totalBytes = 0;
chunk.Clear();
}
}
}
if (chunk.Count > 0)
{
if (chunksProcessed++ < skipChunks) continue;
yield return new SyncTransferItemV2
{
Items = chunk,
Type = type,
Count = chunksProcessed
};
}
}
}
public Task<SyncMetadata> RequestFetch(long lastSyncedTimestamp)
{
return RequestResumableFetch(lastSyncedTimestamp);
}
public async Task<SyncMetadata> RequestResumableFetch(long lastSyncedTimestamp, int cursor = 0)
{
var userId = Context.User.FindFirstValue("sub");
if (GlobalSync.IsUserPushing(userId))
{
throw new HubException("Cannot fetch data while another sync is in progress. Please try again later.");
}
SyncEventCounterSource.Log.Fetch();
var userSettings = await Repositories.UsersSettings.FindOneAsync((u) => u.UserId == userId);
if (userSettings.LastSynced > 0 && lastSyncedTimestamp > userSettings.LastSynced)
{
throw new HubException($"Provided timestamp value is too large. Server timestamp: {userSettings.LastSynced} Sent timestamp: {lastSyncedTimestamp}. Please run a Force Sync to fix this issue.");
}
// var client = Clients.Caller;
if (lastSyncedTimestamp > 0 && userSettings.LastSynced == lastSyncedTimestamp)
{
return new SyncMetadata
{
LastSynced = userSettings.LastSynced,
};
}
var isResumable = lastSyncedTimestamp == 0;
if (!isResumable) cursor = 0;
var chunks = PrepareChunks(
collections: new[] {
Repositories.LegacySettings.FindItemsSyncedAfter,
Repositories.Attachments.FindItemsSyncedAfter,
Repositories.Notes.FindItemsSyncedAfter,
Repositories.Notebooks.FindItemsSyncedAfter,
Repositories.Contents.FindItemsSyncedAfter,
Repositories.Shortcuts.FindItemsSyncedAfter,
Repositories.Reminders.FindItemsSyncedAfter,
Repositories.Relations.FindItemsSyncedAfter,
},
types: CollectionKeys,
userId,
lastSyncedTimestamp,
size: 1000,
maxBytes: 7 * 1024 * 1024,
skipChunks: cursor
);
await foreach (var chunk in chunks)
{
_ = await Clients.Caller.SendItems(chunk).WaitAsync(TimeSpan.FromMinutes(10));
}
return new SyncMetadata
{
VaultKey = userSettings.VaultKey,
LastSynced = userSettings.LastSynced,
};
}
}
[MessagePack.MessagePackObject]
public struct BatchedSyncTransferItem
{
[MessagePack.Key("lastSynced")]
public long LastSynced { get; set; }
[MessagePack.Key("items")]
public string[] Items { get; set; }
[MessagePack.Key("types")]
public string[] Types { get; set; }
[MessagePack.Key("total")]
public int Total { get; set; }
[MessagePack.Key("current")]
public int Current { get; set; }
}
[MessagePack.MessagePackObject]
public struct SyncTransferItem
{
[MessagePack.Key("synced")]
public bool Synced { get; set; }
[MessagePack.Key("lastSynced")]
public long LastSynced { get; set; }
[MessagePack.Key("item")]
public string Item { get; set; }
[MessagePack.Key("itemType")]
public string ItemType { get; set; }
[MessagePack.Key("total")]
public int Total { get; set; }
[MessagePack.Key("current")]
public int Current { get; set; }
}
[MessagePack.MessagePackObject]
public struct SyncTransferItemV2
{
[MessagePack.Key("items")]
[JsonPropertyName("items")]
public IEnumerable<SyncItem> Items { get; set; }
[MessagePack.Key("type")]
[JsonPropertyName("type")]
public string Type { get; set; }
[MessagePack.Key("count")]
[JsonPropertyName("count")]
public int Count { get; set; }
}
[MessagePack.MessagePackObject]
public struct SyncMetadata
{
[MessagePack.Key("vaultKey")]
[JsonPropertyName("vaultKey")]
public EncryptedData VaultKey { get; set; }
[MessagePack.Key("lastSynced")]
[JsonPropertyName("lastSynced")]
public long LastSynced { get; set; }
// [MessagePack.Key("total")]
// public long TotalItems { get; set; }
}
}

View File

@@ -325,4 +325,19 @@ namespace Notesnook.API.Hubs
[JsonPropertyName("synced")]
public bool Synced { get; set; }
}
[MessagePack.MessagePackObject]
public struct SyncTransferItemV2
{
[MessagePack.Key("items")]
[JsonPropertyName("items")]
public IEnumerable<SyncItem> Items { get; set; }
[MessagePack.Key("type")]
[JsonPropertyName("type")]
public string Type { get; set; }
[MessagePack.Key("count")]
[JsonPropertyName("count")]
public int Count { get; set; }
}
}

View File

@@ -1,29 +0,0 @@
/*
This file is part of the Notesnook Sync Server project (https://notesnook.com/)
Copyright (C) 2023 Streetwriters (Private) Limited
This program is free software: you can redistribute it and/or modify
it under the terms of the Affero GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Affero GNU General Public License for more details.
You should have received a copy of the Affero GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
namespace Notesnook.API.Interfaces
{
public interface IEncrypted
{
string Cipher { get; set; }
string IV { get; set; }
long Length { get; set; }
string Salt { get; set; }
}
}

View File

@@ -1,33 +0,0 @@
/*
This file is part of the Notesnook Sync Server project (https://notesnook.com/)
Copyright (C) 2023 Streetwriters (Private) Limited
This program is free software: you can redistribute it and/or modify
it under the terms of the Affero GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Affero GNU General Public License for more details.
You should have received a copy of the Affero GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
using Notesnook.API.Models;
using Streetwriters.Common.Interfaces;
namespace Notesnook.API.Interfaces
{
public interface IMonograph : IDocument
{
string Title { get; set; }
string UserId { get; set; }
byte[] CompressedContent { get; set; }
EncryptedData EncryptedContent { get; set; }
long DatePublished { get; set; }
}
}

View File

@@ -27,7 +27,7 @@ namespace Notesnook.API.Interfaces
{
Task CreateUserAsync();
Task DeleteUserAsync(string userId);
Task DeleteUserAsync(string userId, string jti, string password);
Task DeleteUserAsync(string userId, string? jti, string password);
Task<bool> ResetUserAsync(string userId, bool removeAttachments);
Task<UserResponse> GetUserAsync(string userId);
Task SetUserKeysAsync(string userId, UserKeys keys);

View File

@@ -1,42 +0,0 @@
/*
This file is part of the Notesnook Sync Server project (https://notesnook.com/)
Copyright (C) 2023 Streetwriters (Private) Limited
This program is free software: you can redistribute it and/or modify
it under the terms of the Affero GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Affero GNU General Public License for more details.
You should have received a copy of the Affero GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
using Notesnook.API.Models;
using Streetwriters.Common.Interfaces;
using Streetwriters.Common.Models;
namespace Notesnook.API.Interfaces
{
public interface IUserSettings : IDocument
{
string UserId { get; set; }
long LastSynced
{
get; set;
}
EncryptedData VaultKey
{
get; set;
}
string Salt { get; set; }
}
}

View File

@@ -59,8 +59,8 @@ namespace Notesnook.API.Services
// URLs generated by S3 are host specific. Changing their hostname on the fly causes
// SignatureDoesNotMatch error.
// That is why we create 2 separate S3 clients. One for internal traffic and one for external.
private AmazonS3Client S3InternalClient { get; }
private HttpClient httpClient = new HttpClient();
private AmazonS3Client? S3InternalClient { get; }
private readonly HttpClient httpClient = new();
public S3Service(ISyncItemsRepositoryAccessor syncItemsRepositoryAccessor)
{
@@ -100,8 +100,7 @@ namespace Notesnook.API.Services
public async Task DeleteObjectAsync(string userId, string name)
{
var objectName = GetFullObjectName(userId, name);
if (objectName == null) throw new Exception("Invalid object name."); ;
var objectName = GetFullObjectName(userId, name) ?? throw new Exception("Invalid object name.");
var response = await GetS3Client(S3ClientMode.INTERNAL).DeleteObjectAsync(GetBucketName(S3ClientMode.INTERNAL), objectName);
@@ -240,6 +239,7 @@ namespace Notesnook.API.Services
await this.AbortMultipartUploadAsync(userId, uploadRequest.Key, uploadRequest.UploadId);
throw new Exception("User settings not found.");
}
userSettings.StorageLimit ??= new Limit { Value = 0, UpdatedAt = 0 };
if (!Constants.IS_SELF_HOSTED)
{
@@ -253,9 +253,8 @@ namespace Notesnook.API.Services
throw new Exception("Max file size exceeded.");
}
userSettings.StorageLimit ??= new Limit { Value = 0, UpdatedAt = 0 };
userSettings.StorageLimit.Value += fileSize;
if (StorageHelper.IsStorageLimitReached(subscription, userSettings.StorageLimit))
if (StorageHelper.IsStorageLimitReached(subscription, userSettings.StorageLimit.Value))
{
await this.AbortMultipartUploadAsync(userId, uploadRequest.Key, uploadRequest.UploadId);
throw new Exception("Storage limit reached.");
@@ -315,13 +314,13 @@ namespace Notesnook.API.Services
});
}
private string GetFullObjectName(string userId, string name)
private static string? GetFullObjectName(string userId, string name)
{
if (userId == null || !Regex.IsMatch(name, "[0-9a-zA-Z!" + Regex.Escape("-") + "_.*'()]")) return null;
return $"{userId}/{name}";
}
bool IsSuccessStatusCode(int statusCode)
static bool IsSuccessStatusCode(int statusCode)
{
return ((int)statusCode >= 200) && ((int)statusCode <= 299);
}