diff --git a/Notesnook.API/Hubs/SyncHub.cs b/Notesnook.API/Hubs/SyncHub.cs index 9241932..55a6ff2 100644 --- a/Notesnook.API/Hubs/SyncHub.cs +++ b/Notesnook.API/Hubs/SyncHub.cs @@ -23,14 +23,17 @@ using System.Linq; using System.Runtime.CompilerServices; using System.Security.Claims; using System.Text.Json; +using System.Text.Json.Serialization; using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.SignalR; +using MongoDB.Driver; using Notesnook.API.Authorization; using Notesnook.API.Interfaces; using Notesnook.API.Models; +using Notesnook.API.Repositories; using Streetwriters.Common.Models; using Streetwriters.Data.Interfaces; @@ -39,20 +42,25 @@ namespace Notesnook.API.Hubs public struct RunningPush { public long Timestamp { get; set; } - public int Validity { get; set; } + public long Validity { get; set; } public string ConnectionId { get; set; } } public interface ISyncHubClient { Task SyncItem(SyncTransferItem transferItem); + Task PushItems(SyncTransferItemV2 transferItem); + Task SendItems(SyncTransferItemV2 transferItem); Task RemoteSyncCompleted(long lastSynced); + Task PushCompleted(long lastSynced); Task SyncCompleted(); } public class GlobalSync { + private const long PUSH_VALIDITY_EXTENSION_PERIOD = 16 * 1000; // 16 second private const int PUSH_VALIDITY_PERIOD_PER_ITEM = 5 * 100; // 0.5 second - private const int BASE_PUSH_VALIDITY_PERIOD = 5 * 1000; // 5 seconds + private const long BASE_PUSH_VALIDITY_PERIOD = 5 * 1000; // 5 seconds + private const long BASE_PUSH_VALIDITY_PERIOD_NEW = 16 * 1000; // 16 seconds private readonly static Dictionary> PushOperations = new(); public static void ClearPushOperations(string userId, string connectionId) @@ -90,7 +98,7 @@ namespace Notesnook.API.Hubs return count > 0; } - public static void StartPush(string userId, string connectionId, int totalItems) + public static void StartPush(string userId, string connectionId, long? totalItems = null) { if (IsPushing(userId, connectionId)) return; @@ -101,10 +109,27 @@ namespace Notesnook.API.Hubs { ConnectionId = connectionId, Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), - Validity = BASE_PUSH_VALIDITY_PERIOD + totalItems * PUSH_VALIDITY_PERIOD_PER_ITEM + Validity = totalItems.HasValue ? BASE_PUSH_VALIDITY_PERIOD + (totalItems.Value * PUSH_VALIDITY_PERIOD_PER_ITEM) : BASE_PUSH_VALIDITY_PERIOD_NEW }); } + public static void ExtendPush(string userId, string connectionId) + { + if (!IsPushing(userId, connectionId) || !PushOperations.ContainsKey(userId)) + { + StartPush(userId, connectionId); + return; + } + var index = PushOperations[userId].FindIndex((push) => push.ConnectionId == connectionId); + if (index < 0) + { + StartPush(userId, connectionId); + return; + } + + var pushOperation = PushOperations[userId][index]; + pushOperation.Validity += PUSH_VALIDITY_EXTENSION_PERIOD; + } private static bool IsPushValid(RunningPush push, long now) { return now < push.Timestamp + push.Validity; @@ -116,6 +141,18 @@ namespace Notesnook.API.Hubs { private ISyncItemsRepositoryAccessor Repositories { get; } private readonly IUnitOfWork unit; + private readonly string[] CollectionKeys = new[] { + "settings", + "attachment", + "note", + "notebook", + "content", + "shortcut", + "reminder", + "color", + "tag", + "relation", // relations must sync at the end to prevent invalid state + }; public SyncHub(ISyncItemsRepositoryAccessor syncItemsRepositoryAccessor, IUnitOfWork unitOfWork) { @@ -203,6 +240,16 @@ namespace Notesnook.API.Hubs case "relation": Repositories.Relations.Upsert(JsonSerializer.Deserialize(data), userId, dateSynced); break; + case "color": + var color = JsonSerializer.Deserialize(data); + if (color.Version < 5.9) continue; + Repositories.Colors.Upsert(color, userId, dateSynced); + break; + case "tag": + var tag = JsonSerializer.Deserialize(data); + if (tag.Version < 5.9) continue; + Repositories.Tags.Upsert(tag, userId, dateSynced); + break; case "settings": var settings = JsonSerializer.Deserialize(data); settings.Id = MongoDB.Bson.ObjectId.Parse(userId); @@ -228,6 +275,83 @@ namespace Notesnook.API.Hubs } } + private Action MapTypeToUpsertAction(string type) + { + return type switch + { + "attachment" => Repositories.Attachments.Upsert, + "note" => Repositories.Notes.Upsert, + "notebook" => Repositories.Notebooks.Upsert, + "content" => Repositories.Contents.Upsert, + "shortcut" => Repositories.Shortcuts.Upsert, + "reminder" => Repositories.Reminders.Upsert, + "relation" => Repositories.Relations.Upsert, + "color" => Repositories.Colors.Upsert, + "tag" => Repositories.Tags.Upsert, + _ => null, + }; + } + + public async Task InitializePush(SyncMetadata syncMetadata) + { + if (syncMetadata.LastSynced <= 0) throw new HubException("Last synced time cannot be zero or less than zero."); + + var userId = Context.User.FindFirstValue("sub"); + if (string.IsNullOrEmpty(userId)) return 0; + + UserSettings userSettings = await Repositories.UsersSettings.FindOneAsync((u) => u.UserId == userId); + long dateSynced = Math.Max(syncMetadata.LastSynced, userSettings.LastSynced); + + GlobalSync.StartPush(userId, Context.ConnectionId); + + + if (userSettings.VaultKey != null && syncMetadata.VaultKey != null && !userSettings.VaultKey.Equals(syncMetadata.VaultKey) || (userSettings.VaultKey == null && syncMetadata.VaultKey != null)) + { + userSettings.VaultKey = syncMetadata.VaultKey; + await Repositories.UsersSettings.UpsertAsync(userSettings, (u) => u.UserId == userId); + } + + return dateSynced; + } + + public async Task PushItems(SyncTransferItemV2 pushItem, long dateSynced) + { + var userId = Context.User.FindFirstValue("sub"); + if (string.IsNullOrEmpty(userId)) return 0; + + try + { + var others = Clients.OthersInGroup(userId); + others.PushItems(pushItem); + + GlobalSync.ExtendPush(userId, Context.ConnectionId); + + if (pushItem.Type == "settings") + { + var settings = pushItem.Items.First(); + if (settings == null) return 0; + settings.Id = MongoDB.Bson.ObjectId.Parse(userId); + settings.ItemId = userId; + Repositories.Settings.Upsert(settings, userId, dateSynced); + } + else + { + var UpsertItem = MapTypeToUpsertAction(pushItem.Type) ?? throw new Exception("Invalid item type."); + foreach (var item in pushItem.Items) + { + UpsertItem(item, userId, dateSynced); + } + } + + return await unit.Commit() ? 1 : 0; + } + catch (Exception ex) + { + GlobalSync.ClearPushOperations(userId, Context.ConnectionId); + throw ex; + } + } + public async Task SyncCompleted(long dateSynced) { var userId = Context.User.FindFirstValue("sub"); @@ -242,6 +366,7 @@ namespace Notesnook.API.Hubs await this.Repositories.UsersSettings.UpsertAsync(userSettings, (u) => u.UserId == userId); await Clients.OthersInGroup(userId).RemoteSyncCompleted(lastSynced); + await Clients.OthersInGroup(userId).PushCompleted(lastSynced); return true; } @@ -251,8 +376,59 @@ namespace Notesnook.API.Hubs } } - public async IAsyncEnumerable FetchItems(long lastSyncedTimestamp, [EnumeratorCancellation] - CancellationToken cancellationToken) + private static async IAsyncEnumerable PrepareChunks(Func>>[] collections, string[] types, string userId, long lastSyncedTimestamp, int size, long maxBytes, int skipChunks) + { + var chunksProcessed = 0; + for (int i = 0; i < collections.Length; i++) + { + var type = types[i]; + + using var cursor = await collections[i](userId, lastSyncedTimestamp, size); + + var chunk = new List(); + long totalBytes = 0; + long METADATA_BYTES = 5 * 1024; + + while (await cursor.MoveNextAsync()) + { + if (chunksProcessed++ < skipChunks) continue; + foreach (var item in cursor.Current) + { + chunk.Add(item); + totalBytes += item.Length + METADATA_BYTES; + if (totalBytes >= maxBytes) + { + yield return new SyncTransferItemV2 + { + Items = chunk, + Type = type, + Count = chunksProcessed + }; + + totalBytes = 0; + chunk.Clear(); + } + } + } + if (chunk.Count > 0) + { + if (chunksProcessed++ < skipChunks) continue; + yield return new SyncTransferItemV2 + { + Items = chunk, + Type = type, + Count = chunksProcessed + }; + } + } + } + + public Task RequestFetch(long lastSyncedTimestamp) + { + return RequestResumableFetch(lastSyncedTimestamp); + } + + public async Task RequestResumableFetch(long lastSyncedTimestamp, int cursor = 0) { var userId = Context.User.FindFirstValue("sub"); @@ -270,49 +446,87 @@ namespace Notesnook.API.Hubs if (lastSyncedTimestamp > 0 && userSettings.LastSynced == lastSyncedTimestamp) { - yield return new SyncTransferItem + return new SyncMetadata { LastSynced = userSettings.LastSynced, - Synced = true + }; + } + + var isResumable = lastSyncedTimestamp == 0; + if (!isResumable) cursor = 0; + + var chunks = PrepareChunks( + collections: new[] { + Repositories.Settings.FindItemsSyncedAfter, + Repositories.Attachments.FindItemsSyncedAfter, + Repositories.Notes.FindItemsSyncedAfter, + Repositories.Notebooks.FindItemsSyncedAfter, + Repositories.Contents.FindItemsSyncedAfter, + Repositories.Shortcuts.FindItemsSyncedAfter, + Repositories.Reminders.FindItemsSyncedAfter, + Repositories.Colors.FindItemsSyncedAfter, + Repositories.Tags.FindItemsSyncedAfter, + Repositories.Relations.FindItemsSyncedAfter, + }, + types: CollectionKeys, + userId, + lastSyncedTimestamp, + size: 1000, + maxBytes: 7 * 1024 * 1024, + skipChunks: cursor + ); + + await foreach (var chunk in chunks) + { + _ = await Clients.Caller.SendItems(chunk).WaitAsync(TimeSpan.FromMinutes(10)); + } + + return new SyncMetadata + { + VaultKey = userSettings.VaultKey, + LastSynced = userSettings.LastSynced, + }; + } + + public async IAsyncEnumerable FetchItems(long lastSyncedTimestamp, [EnumeratorCancellation] + CancellationToken cancellationToken) + { + var userId = Context.User.FindFirstValue("sub"); + + if (GlobalSync.IsUserPushing(userId)) + { + throw new HubException("Cannot fetch data while another sync is in progress. Please try again later."); + } + + var userSettings = await Repositories.UsersSettings.FindOneAsync((u) => u.UserId == userId); + if (userSettings.LastSynced > 0 && lastSyncedTimestamp > userSettings.LastSynced) + { + throw new HubException($"Provided timestamp value is too large. Server timestamp: {userSettings.LastSynced} Sent timestamp: {lastSyncedTimestamp}. Please run a Force Sync to fix this issue."); + } + + if (lastSyncedTimestamp > 0 && userSettings.LastSynced == lastSyncedTimestamp) + { + yield return new SyncTransferItem + { + Synced = true, + LastSynced = userSettings.LastSynced }; yield break; } + var total = (await Task.WhenAll( + Repositories.Attachments.CountItemsSyncedAfterAsync(userId, lastSyncedTimestamp), + Repositories.Notes.CountItemsSyncedAfterAsync(userId, lastSyncedTimestamp), + Repositories.Notebooks.CountItemsSyncedAfterAsync(userId, lastSyncedTimestamp), + Repositories.Contents.CountItemsSyncedAfterAsync(userId, lastSyncedTimestamp), + Repositories.Settings.CountItemsSyncedAfterAsync(userId, lastSyncedTimestamp), + Repositories.Shortcuts.CountItemsSyncedAfterAsync(userId, lastSyncedTimestamp), + Repositories.Reminders.CountItemsSyncedAfterAsync(userId, lastSyncedTimestamp), + Repositories.Relations.CountItemsSyncedAfterAsync(userId, lastSyncedTimestamp), + Repositories.Colors.CountItemsSyncedAfterAsync(userId, lastSyncedTimestamp), + Repositories.Tags.CountItemsSyncedAfterAsync(userId, lastSyncedTimestamp) + )).Sum((a) => a); - var attachments = await Repositories.Attachments.GetItemsSyncedAfterAsync(userId, lastSyncedTimestamp); - - var notes = await Repositories.Notes.GetItemsSyncedAfterAsync(userId, lastSyncedTimestamp); - - var notebooks = await Repositories.Notebooks.GetItemsSyncedAfterAsync(userId, lastSyncedTimestamp); - - var contents = await Repositories.Contents.GetItemsSyncedAfterAsync(userId, lastSyncedTimestamp); - - var settings = await Repositories.Settings.GetItemsSyncedAfterAsync(userId, lastSyncedTimestamp); - - var shortcuts = await Repositories.Shortcuts.GetItemsSyncedAfterAsync(userId, lastSyncedTimestamp); - - var reminders = await Repositories.Reminders.GetItemsSyncedAfterAsync(userId, lastSyncedTimestamp); - - var relations = await Repositories.Relations.GetItemsSyncedAfterAsync(userId, lastSyncedTimestamp); - - var collections = new Dictionary> - { - ["attachment"] = attachments, - ["note"] = notes, - ["notebook"] = notebooks, - ["content"] = contents, - ["shortcut"] = shortcuts, - ["reminder"] = reminders, - ["relation"] = relations, - ["settings"] = settings, - }; - - if (userSettings.VaultKey != null) - { - collections.Add("vaultKey", new object[] { userSettings.VaultKey }); - } - - var total = collections.Values.Sum((a) => a.Count()); if (total == 0) { yield return new SyncTransferItem @@ -323,23 +537,51 @@ namespace Notesnook.API.Hubs yield break; } - foreach (var collection in collections) + var collections = new[] { + Repositories.Settings.FindItemsSyncedAfter, + Repositories.Attachments.FindItemsSyncedAfter, + Repositories.Notes.FindItemsSyncedAfter, + Repositories.Notebooks.FindItemsSyncedAfter, + Repositories.Contents.FindItemsSyncedAfter, + Repositories.Shortcuts.FindItemsSyncedAfter, + Repositories.Reminders.FindItemsSyncedAfter, + Repositories.Colors.FindItemsSyncedAfter, + Repositories.Tags.FindItemsSyncedAfter, + Repositories.Relations.FindItemsSyncedAfter, + }; + + for (var i = 0; i < collections.Length; ++i) { - foreach (var item in collection.Value) + var key = CollectionKeys[i]; + using var cursor = await collections[i](userId, lastSyncedTimestamp, 1000); + + while (await cursor.MoveNextAsync(cancellationToken)) { - if (item == null) continue; - // Check the cancellation token regularly so that the server will stop producing items if the client disconnects. - cancellationToken.ThrowIfCancellationRequested(); - yield return new SyncTransferItem + foreach (var item in cursor.Current) { - LastSynced = userSettings.LastSynced, - Synced = false, - Item = JsonSerializer.Serialize(item), - ItemType = collection.Key, - Total = total, - }; + yield return new SyncTransferItem + { + LastSynced = userSettings.LastSynced, + Synced = false, + Item = JsonSerializer.Serialize(item), + ItemType = key, + Total = (int)total, + }; + } } } + + if (userSettings.VaultKey != null) + { + yield return new SyncTransferItem + { + LastSynced = userSettings.LastSynced, + Synced = false, + Item = JsonSerializer.Serialize(userSettings.VaultKey), + ItemType = "vaultKey", + Total = (int)total, + }; + } } } @@ -384,4 +626,33 @@ namespace Notesnook.API.Hubs [MessagePack.Key("current")] public int Current { get; set; } } + + [MessagePack.MessagePackObject] + public struct SyncTransferItemV2 + { + [MessagePack.Key("items")] + [JsonPropertyName("items")] + public IEnumerable Items { get; set; } + + [MessagePack.Key("type")] + [JsonPropertyName("type")] + public string Type { get; set; } + [MessagePack.Key("count")] + [JsonPropertyName("count")] + public int Count { get; set; } + } + + [MessagePack.MessagePackObject] + public struct SyncMetadata + { + [MessagePack.Key("vaultKey")] + [JsonPropertyName("vaultKey")] + public EncryptedData VaultKey { get; set; } + + [MessagePack.Key("lastSynced")] + [JsonPropertyName("lastSynced")] + public long LastSynced { get; set; } + // [MessagePack.Key("total")] + // public long TotalItems { get; set; } + } } \ No newline at end of file diff --git a/Notesnook.API/Models/EncryptedData.cs b/Notesnook.API/Models/EncryptedData.cs index 444c65c..1b1e646 100644 --- a/Notesnook.API/Models/EncryptedData.cs +++ b/Notesnook.API/Models/EncryptedData.cs @@ -52,5 +52,14 @@ namespace Notesnook.API.Models [BsonElement("salt")] [DataMember(Name = "salt")] public string Salt { get; set; } + + public override bool Equals(object obj) + { + if (obj is EncryptedData encryptedData) + { + return IV == encryptedData.IV && Salt == encryptedData.Salt && Cipher == encryptedData.Cipher && Length == encryptedData.Length; + } + return base.Equals(obj); + } } } diff --git a/Notesnook.API/Repositories/SyncItemsRepository.cs b/Notesnook.API/Repositories/SyncItemsRepository.cs index 90faa70..88830ff 100644 --- a/Notesnook.API/Repositories/SyncItemsRepository.cs +++ b/Notesnook.API/Repositories/SyncItemsRepository.cs @@ -50,12 +50,21 @@ namespace Notesnook.API.Repositories return ALGORITHMS.Contains(algorithm); } - public async Task> GetItemsSyncedAfterAsync(string userId, long timestamp) + public Task CountItemsSyncedAfterAsync(string userId, long timestamp) { - var cursor = await Collection.FindAsync(n => (n.DateSynced > timestamp) && n.UserId.Equals(userId)); - return cursor.ToList(); + return Collection.CountDocumentsAsync(n => (n.DateSynced > timestamp) && n.UserId.Equals(userId)); + } + public Task> FindItemsSyncedAfter(string userId, long timestamp, int batchSize) + { + return Collection.FindAsync(n => (n.DateSynced > timestamp) && n.UserId.Equals(userId), new FindOptions + { + BatchSize = batchSize, + AllowDiskUse = true, + AllowPartialResults = false, + NoCursorTimeout = true, + Sort = new SortDefinitionBuilder().Ascending((a) => a.Id) + }); } - // public async Task DeleteIdsAsync(string[] ids, string userId, CancellationToken token = default(CancellationToken)) // { // await Collection.DeleteManyAsync((i) => ids.Contains(i.Id) && i.UserId == userId, token); @@ -63,10 +72,10 @@ namespace Notesnook.API.Repositories public void DeleteByUserId(string userId) { - dbContext.AddCommand((handle, ct) => Collection.DeleteManyAsync(handle, (i) => i.UserId == userId, cancellationToken: ct)); + dbContext.AddCommand((handle, ct) => Collection.DeleteManyAsync(handle, (i) => i.UserId == userId, cancellationToken: ct)); } - public async Task UpsertAsync(T item, string userId, long dateSynced) + public async Task UpsertAsync(SyncItem item, string userId, long dateSynced) { if (item.Length > 15 * 1024 * 1024)