sync: detect multiple conflicting syncs

When 2 or more syncs conflict, it is necessary to adjust last synced
date to avoid data from entering a Sync Blindspot.
This commit is contained in:
Abdullah Atta
2023-05-22 18:22:16 +05:00
parent 99a7ffa6ae
commit 19056a9302
+114 -62
View File
@@ -36,6 +36,11 @@ using Streetwriters.Data.Interfaces;
namespace Notesnook.API.Hubs
{
public struct RunningSync
{
public long LastSynced { get; set; }
public string ConnectionId { get; set; }
}
public interface ISyncHubClient
{
Task SyncItem(SyncTransferItem transferItem);
@@ -43,6 +48,31 @@ namespace Notesnook.API.Hubs
Task SyncCompleted();
}
public class GlobalSync
{
public static Dictionary<string, List<RunningSync>> RunningSyncs = new Dictionary<string, List<RunningSync>>();
public static void ClearCompletedSyncs(string userId, string connectionId)
{
if (RunningSyncs.ContainsKey(userId))
{
foreach (var sync in RunningSyncs[userId].ToArray())
if (sync.ConnectionId == connectionId)
RunningSyncs[userId].Remove(sync);
}
}
public static bool IsSyncRunning(string userId, string connectionId)
{
if (RunningSyncs.ContainsKey(userId))
{
foreach (var sync in RunningSyncs[userId])
if (sync.ConnectionId == connectionId) return true;
}
return false;
}
}
[Authorize("Sync")]
public class SyncHub : Hub<ISyncHubClient>
{
@@ -71,97 +101,118 @@ namespace Notesnook.API.Hubs
public override async Task OnDisconnectedAsync(Exception exception)
{
var id = Context.User.FindFirstValue("sub");
GlobalSync.ClearCompletedSyncs(id, Context.ConnectionId);
await Groups.RemoveFromGroupAsync(Context.ConnectionId, id);
await base.OnDisconnectedAsync(exception);
}
public async Task<int> SyncItem(BatchedSyncTransferItem transferItem)
{
var userId = Context.User.FindFirstValue("sub");
if (string.IsNullOrEmpty(userId)) return 0;
var others = Clients.OthersInGroup(userId);
if (!GlobalSync.RunningSyncs.ContainsKey(userId))
GlobalSync.RunningSyncs[userId] = new List<RunningSync>();
UserSettings userSettings = await this.Repositories.UsersSettings.FindOneAsync((u) => u.UserId == userId);
long dateSynced = transferItem.LastSynced > userSettings.LastSynced ? transferItem.LastSynced : userSettings.LastSynced;
GlobalSync.RunningSyncs[userId].Add(new RunningSync { LastSynced = dateSynced, ConnectionId = Context.ConnectionId });
for (int i = 0; i < transferItem.Items.Length; ++i)
try
{
var data = transferItem.Items[i];
var type = transferItem.Types[i];
// We intentionally don't await here to speed up the sync. Fire and forget
// suits here because we don't really care if the item reaches the other
// devices.
others.SyncItem(
new SyncTransferItem
{
Item = data,
ItemType = type,
LastSynced = dateSynced,
Total = transferItem.Total,
Current = transferItem.Current + i
});
switch (type)
var others = Clients.OthersInGroup(userId);
for (int i = 0; i < transferItem.Items.Length; ++i)
{
case "content":
Repositories.Contents.Upsert(JsonSerializer.Deserialize<Content>(data), userId, dateSynced);
break;
case "attachment":
Repositories.Attachments.Upsert(JsonSerializer.Deserialize<Attachment>(data), userId, dateSynced);
break;
case "note":
Repositories.Notes.Upsert(JsonSerializer.Deserialize<Note>(data), userId, dateSynced);
break;
case "notebook":
Repositories.Notebooks.Upsert(JsonSerializer.Deserialize<Notebook>(data), userId, dateSynced);
break;
case "shortcut":
Repositories.Shortcuts.Upsert(JsonSerializer.Deserialize<Shortcut>(data), userId, dateSynced);
break;
case "reminder":
Repositories.Reminders.Upsert(JsonSerializer.Deserialize<Reminder>(data), userId, dateSynced);
break;
case "relation":
Repositories.Relations.Upsert(JsonSerializer.Deserialize<Relation>(data), userId, dateSynced);
break;
case "settings":
var settings = JsonSerializer.Deserialize<Setting>(data);
settings.Id = MongoDB.Bson.ObjectId.Parse(userId);
settings.ItemId = userId;
Repositories.Settings.Upsert(settings, userId, dateSynced);
break;
case "vaultKey":
userSettings.VaultKey = JsonSerializer.Deserialize<EncryptedData>(data);
Repositories.UsersSettings.Upsert(userSettings, (u) => u.UserId == userId);
break;
default:
throw new HubException("Invalid item type.");
var data = transferItem.Items[i];
var type = transferItem.Types[i];
// We intentionally don't await here to speed up the sync. Fire and forget
// suits here because we don't really care if the item reaches the other
// devices.
others.SyncItem(
new SyncTransferItem
{
Item = data,
ItemType = type,
LastSynced = dateSynced,
Total = transferItem.Total,
Current = transferItem.Current + i
});
switch (type)
{
case "content":
Repositories.Contents.Upsert(JsonSerializer.Deserialize<Content>(data), userId, dateSynced);
break;
case "attachment":
Repositories.Attachments.Upsert(JsonSerializer.Deserialize<Attachment>(data), userId, dateSynced);
break;
case "note":
Repositories.Notes.Upsert(JsonSerializer.Deserialize<Note>(data), userId, dateSynced);
break;
case "notebook":
Repositories.Notebooks.Upsert(JsonSerializer.Deserialize<Notebook>(data), userId, dateSynced);
break;
case "shortcut":
Repositories.Shortcuts.Upsert(JsonSerializer.Deserialize<Shortcut>(data), userId, dateSynced);
break;
case "reminder":
Repositories.Reminders.Upsert(JsonSerializer.Deserialize<Reminder>(data), userId, dateSynced);
break;
case "relation":
Repositories.Relations.Upsert(JsonSerializer.Deserialize<Relation>(data), userId, dateSynced);
break;
case "settings":
var settings = JsonSerializer.Deserialize<Setting>(data);
settings.Id = MongoDB.Bson.ObjectId.Parse(userId);
settings.ItemId = userId;
Repositories.Settings.Upsert(settings, userId, dateSynced);
break;
case "vaultKey":
userSettings.VaultKey = JsonSerializer.Deserialize<EncryptedData>(data);
Repositories.UsersSettings.Upsert(userSettings, (u) => u.UserId == userId);
break;
default:
throw new HubException("Invalid item type.");
}
}
return await unit.Commit() ? 1 : 0;
}
catch (Exception ex)
{
GlobalSync.ClearCompletedSyncs(userId, Context.ConnectionId);
throw ex;
}
return await unit.Commit() ? 1 : 0;
}
public async Task<bool> SyncCompleted(long dateSynced)
{
var userId = Context.User.FindFirstValue("sub");
try
{
UserSettings userSettings = await this.Repositories.UsersSettings.FindOneAsync((u) => u.UserId == userId);
UserSettings userSettings = await this.Repositories.UsersSettings.FindOneAsync((u) => u.UserId == userId);
long lastSynced = GlobalSync.RunningSyncs.ContainsKey(userId) && GlobalSync.RunningSyncs[userId].Count > 1
? GlobalSync.RunningSyncs[userId].MinBy((a) => a.LastSynced).LastSynced
: Math.Max(dateSynced, userSettings.LastSynced);
long lastSynced = dateSynced > userSettings.LastSynced ? dateSynced : userSettings.LastSynced;
userSettings.LastSynced = lastSynced;
userSettings.LastSynced = lastSynced;
await this.Repositories.UsersSettings.UpsertAsync(userSettings, (u) => u.UserId == userId);
await this.Repositories.UsersSettings.UpsertAsync(userSettings, (u) => u.UserId == userId);
await Clients.OthersInGroup(userId).RemoteSyncCompleted(lastSynced);
await Clients.OthersInGroup(userId).RemoteSyncCompleted(lastSynced);
return true;
return true;
}
finally
{
GlobalSync.ClearCompletedSyncs(userId, Context.ConnectionId);
}
}
public async IAsyncEnumerable<SyncTransferItem> FetchItems(long lastSyncedTimestamp, [EnumeratorCancellation]
@@ -171,8 +222,9 @@ namespace Notesnook.API.Hubs
var userSettings = await Repositories.UsersSettings.FindOneAsync((u) => u.UserId == userId);
if (userSettings.LastSynced > 0 && lastSyncedTimestamp > userSettings.LastSynced)
throw new HubException($"Provided timestamp value is too large. Server timestamp: {userSettings.LastSynced} Sent timestamp: {lastSyncedTimestamp}");
lastSyncedTimestamp = userSettings.LastSynced - 1;
// throw new HubException($"Provided timestamp value is too large. Server timestamp: {userSettings.LastSynced} Sent timestamp: {lastSyncedTimestamp}");
// var client = Clients.Caller;
if (lastSyncedTimestamp > 0 && userSettings.LastSynced == lastSyncedTimestamp)