diff --git a/Notesnook.API/Hubs/SyncHub.cs b/Notesnook.API/Hubs/SyncHub.cs index 1ae9c29..160eadb 100644 --- a/Notesnook.API/Hubs/SyncHub.cs +++ b/Notesnook.API/Hubs/SyncHub.cs @@ -36,6 +36,11 @@ using Streetwriters.Data.Interfaces; namespace Notesnook.API.Hubs { + public struct RunningSync + { + public long LastSynced { get; set; } + public string ConnectionId { get; set; } + } public interface ISyncHubClient { Task SyncItem(SyncTransferItem transferItem); @@ -43,6 +48,31 @@ namespace Notesnook.API.Hubs Task SyncCompleted(); } + public class GlobalSync + { + public static Dictionary> RunningSyncs = new Dictionary>(); + + public static void ClearCompletedSyncs(string userId, string connectionId) + { + if (RunningSyncs.ContainsKey(userId)) + { + foreach (var sync in RunningSyncs[userId].ToArray()) + if (sync.ConnectionId == connectionId) + RunningSyncs[userId].Remove(sync); + } + } + + public static bool IsSyncRunning(string userId, string connectionId) + { + if (RunningSyncs.ContainsKey(userId)) + { + foreach (var sync in RunningSyncs[userId]) + if (sync.ConnectionId == connectionId) return true; + } + return false; + } + } + [Authorize("Sync")] public class SyncHub : Hub { @@ -71,97 +101,118 @@ namespace Notesnook.API.Hubs public override async Task OnDisconnectedAsync(Exception exception) { var id = Context.User.FindFirstValue("sub"); + GlobalSync.ClearCompletedSyncs(id, Context.ConnectionId); await Groups.RemoveFromGroupAsync(Context.ConnectionId, id); await base.OnDisconnectedAsync(exception); } public async Task SyncItem(BatchedSyncTransferItem transferItem) { - var userId = Context.User.FindFirstValue("sub"); if (string.IsNullOrEmpty(userId)) return 0; var others = Clients.OthersInGroup(userId); + if (!GlobalSync.RunningSyncs.ContainsKey(userId)) + GlobalSync.RunningSyncs[userId] = new List(); + UserSettings userSettings = await this.Repositories.UsersSettings.FindOneAsync((u) => u.UserId == userId); - long dateSynced = transferItem.LastSynced > userSettings.LastSynced ? transferItem.LastSynced : userSettings.LastSynced; + GlobalSync.RunningSyncs[userId].Add(new RunningSync { LastSynced = dateSynced, ConnectionId = Context.ConnectionId }); - for (int i = 0; i < transferItem.Items.Length; ++i) + try { - var data = transferItem.Items[i]; - var type = transferItem.Types[i]; - - // We intentionally don't await here to speed up the sync. Fire and forget - // suits here because we don't really care if the item reaches the other - // devices. - others.SyncItem( - new SyncTransferItem - { - Item = data, - ItemType = type, - LastSynced = dateSynced, - Total = transferItem.Total, - Current = transferItem.Current + i - }); - - switch (type) + var others = Clients.OthersInGroup(userId); + for (int i = 0; i < transferItem.Items.Length; ++i) { - case "content": - Repositories.Contents.Upsert(JsonSerializer.Deserialize(data), userId, dateSynced); - break; - case "attachment": - Repositories.Attachments.Upsert(JsonSerializer.Deserialize(data), userId, dateSynced); - break; - case "note": - Repositories.Notes.Upsert(JsonSerializer.Deserialize(data), userId, dateSynced); - break; - case "notebook": - Repositories.Notebooks.Upsert(JsonSerializer.Deserialize(data), userId, dateSynced); - break; - case "shortcut": - Repositories.Shortcuts.Upsert(JsonSerializer.Deserialize(data), userId, dateSynced); - break; - case "reminder": - Repositories.Reminders.Upsert(JsonSerializer.Deserialize(data), userId, dateSynced); - break; - case "relation": - Repositories.Relations.Upsert(JsonSerializer.Deserialize(data), userId, dateSynced); - break; - case "settings": - var settings = JsonSerializer.Deserialize(data); - settings.Id = MongoDB.Bson.ObjectId.Parse(userId); - settings.ItemId = userId; - Repositories.Settings.Upsert(settings, userId, dateSynced); - break; - case "vaultKey": - userSettings.VaultKey = JsonSerializer.Deserialize(data); - Repositories.UsersSettings.Upsert(userSettings, (u) => u.UserId == userId); - break; - default: - throw new HubException("Invalid item type."); + var data = transferItem.Items[i]; + var type = transferItem.Types[i]; + + // We intentionally don't await here to speed up the sync. Fire and forget + // suits here because we don't really care if the item reaches the other + // devices. + others.SyncItem( + new SyncTransferItem + { + Item = data, + ItemType = type, + LastSynced = dateSynced, + Total = transferItem.Total, + Current = transferItem.Current + i + }); + + switch (type) + { + case "content": + Repositories.Contents.Upsert(JsonSerializer.Deserialize(data), userId, dateSynced); + break; + case "attachment": + Repositories.Attachments.Upsert(JsonSerializer.Deserialize(data), userId, dateSynced); + break; + case "note": + Repositories.Notes.Upsert(JsonSerializer.Deserialize(data), userId, dateSynced); + break; + case "notebook": + Repositories.Notebooks.Upsert(JsonSerializer.Deserialize(data), userId, dateSynced); + break; + case "shortcut": + Repositories.Shortcuts.Upsert(JsonSerializer.Deserialize(data), userId, dateSynced); + break; + case "reminder": + Repositories.Reminders.Upsert(JsonSerializer.Deserialize(data), userId, dateSynced); + break; + case "relation": + Repositories.Relations.Upsert(JsonSerializer.Deserialize(data), userId, dateSynced); + break; + case "settings": + var settings = JsonSerializer.Deserialize(data); + settings.Id = MongoDB.Bson.ObjectId.Parse(userId); + settings.ItemId = userId; + Repositories.Settings.Upsert(settings, userId, dateSynced); + break; + case "vaultKey": + userSettings.VaultKey = JsonSerializer.Deserialize(data); + Repositories.UsersSettings.Upsert(userSettings, (u) => u.UserId == userId); + break; + default: + throw new HubException("Invalid item type."); + } + } + return await unit.Commit() ? 1 : 0; + } + catch (Exception ex) + { + GlobalSync.ClearCompletedSyncs(userId, Context.ConnectionId); + throw ex; } - - return await unit.Commit() ? 1 : 0; } public async Task SyncCompleted(long dateSynced) { var userId = Context.User.FindFirstValue("sub"); + try + { + UserSettings userSettings = await this.Repositories.UsersSettings.FindOneAsync((u) => u.UserId == userId); - UserSettings userSettings = await this.Repositories.UsersSettings.FindOneAsync((u) => u.UserId == userId); + long lastSynced = GlobalSync.RunningSyncs.ContainsKey(userId) && GlobalSync.RunningSyncs[userId].Count > 1 + ? GlobalSync.RunningSyncs[userId].MinBy((a) => a.LastSynced).LastSynced + : Math.Max(dateSynced, userSettings.LastSynced); - long lastSynced = dateSynced > userSettings.LastSynced ? dateSynced : userSettings.LastSynced; + userSettings.LastSynced = lastSynced; - userSettings.LastSynced = lastSynced; + await this.Repositories.UsersSettings.UpsertAsync(userSettings, (u) => u.UserId == userId); - await this.Repositories.UsersSettings.UpsertAsync(userSettings, (u) => u.UserId == userId); + await Clients.OthersInGroup(userId).RemoteSyncCompleted(lastSynced); - await Clients.OthersInGroup(userId).RemoteSyncCompleted(lastSynced); - return true; + return true; + } + finally + { + GlobalSync.ClearCompletedSyncs(userId, Context.ConnectionId); + } } public async IAsyncEnumerable FetchItems(long lastSyncedTimestamp, [EnumeratorCancellation] @@ -171,8 +222,9 @@ namespace Notesnook.API.Hubs var userSettings = await Repositories.UsersSettings.FindOneAsync((u) => u.UserId == userId); if (userSettings.LastSynced > 0 && lastSyncedTimestamp > userSettings.LastSynced) - throw new HubException($"Provided timestamp value is too large. Server timestamp: {userSettings.LastSynced} Sent timestamp: {lastSyncedTimestamp}"); + lastSyncedTimestamp = userSettings.LastSynced - 1; + // throw new HubException($"Provided timestamp value is too large. Server timestamp: {userSettings.LastSynced} Sent timestamp: {lastSyncedTimestamp}"); // var client = Clients.Caller; if (lastSyncedTimestamp > 0 && userSettings.LastSynced == lastSyncedTimestamp)