sync: pause all fetches if another device is pushing

This commit is contained in:
Abdullah Atta
2023-06-28 17:12:02 +05:00
parent 0ad00c9747
commit 6e7a85763c

View File

@@ -36,9 +36,10 @@ using Streetwriters.Data.Interfaces;
namespace Notesnook.API.Hubs
{
public struct RunningSync
public struct RunningPush
{
public long LastSynced { get; set; }
public long Timestamp { get; set; }
public int Validity { get; set; }
public string ConnectionId { get; set; }
}
public interface ISyncHubClient
@@ -50,27 +51,64 @@ namespace Notesnook.API.Hubs
public class GlobalSync
{
public static Dictionary<string, List<RunningSync>> RunningSyncs = new Dictionary<string, List<RunningSync>>();
private const int PUSH_VALIDITY_PERIOD_PER_ITEM = 5 * 100; // 0.5 second
private const int BASE_PUSH_VALIDITY_PERIOD = 5 * 1000; // 5 seconds
private readonly static Dictionary<string, List<RunningPush>> PushOperations = new();
public static void ClearCompletedSyncs(string userId, string connectionId)
public static void ClearPushOperations(string userId, string connectionId)
{
if (RunningSyncs.ContainsKey(userId))
if (PushOperations.TryGetValue(userId, out List<RunningPush> operations))
{
foreach (var sync in RunningSyncs[userId].ToArray())
if (sync.ConnectionId == connectionId)
RunningSyncs[userId].Remove(sync);
var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
foreach (var push in operations.ToArray())
if (push.ConnectionId == connectionId || !IsPushValid(push, now))
operations.Remove(push);
}
}
public static bool IsSyncRunning(string userId, string connectionId)
public static bool IsPushing(string userId, string connectionId)
{
if (RunningSyncs.ContainsKey(userId))
if (PushOperations.TryGetValue(userId, out List<RunningPush> operations))
{
foreach (var sync in RunningSyncs[userId])
if (sync.ConnectionId == connectionId) return true;
var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
foreach (var push in operations)
if (push.ConnectionId == connectionId && IsPushValid(push, now)) return true;
}
return false;
}
public static bool IsUserPushing(string userId)
{
var count = 0;
if (PushOperations.TryGetValue(userId, out List<RunningPush> operations))
{
var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
foreach (var push in operations)
if (IsPushValid(push, now)) ++count;
}
return count > 0;
}
public static void StartPush(string userId, string connectionId, int totalItems)
{
if (IsPushing(userId, connectionId)) return;
if (!PushOperations.ContainsKey(userId))
PushOperations[userId] = new List<RunningPush>();
PushOperations[userId].Add(new RunningPush
{
ConnectionId = connectionId,
Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
Validity = BASE_PUSH_VALIDITY_PERIOD + totalItems * PUSH_VALIDITY_PERIOD_PER_ITEM
});
}
private static bool IsPushValid(RunningPush push, long now)
{
return now < push.Timestamp + push.Validity;
}
}
[Authorize("Sync")]
@@ -100,10 +138,15 @@ namespace Notesnook.API.Hubs
public override async Task OnDisconnectedAsync(Exception exception)
{
var id = Context.User.FindFirstValue("sub");
GlobalSync.ClearCompletedSyncs(id, Context.ConnectionId);
await Groups.RemoveFromGroupAsync(Context.ConnectionId, id);
await base.OnDisconnectedAsync(exception);
try
{
await base.OnDisconnectedAsync(exception);
}
finally
{
var id = Context.User.FindFirstValue("sub");
GlobalSync.ClearPushOperations(id, Context.ConnectionId);
}
}
public async Task<int> SyncItem(BatchedSyncTransferItem transferItem)
@@ -111,16 +154,10 @@ namespace Notesnook.API.Hubs
var userId = Context.User.FindFirstValue("sub");
if (string.IsNullOrEmpty(userId)) return 0;
// Only allow a single sync to run per connection. This prevents a bunch of issues like sync loops
// and wasted bandwidth
if (GlobalSync.IsSyncRunning(userId, Context.ConnectionId)) return 0;
if (!GlobalSync.RunningSyncs.ContainsKey(userId))
GlobalSync.RunningSyncs[userId] = new List<RunningSync>();
UserSettings userSettings = await this.Repositories.UsersSettings.FindOneAsync((u) => u.UserId == userId);
long dateSynced = transferItem.LastSynced > userSettings.LastSynced ? transferItem.LastSynced : userSettings.LastSynced;
GlobalSync.RunningSyncs[userId].Add(new RunningSync { LastSynced = dateSynced, ConnectionId = Context.ConnectionId });
long dateSynced = Math.Max(transferItem.LastSynced, userSettings.LastSynced);
GlobalSync.StartPush(userId, Context.ConnectionId, transferItem.Total);
try
{
@@ -186,10 +223,9 @@ namespace Notesnook.API.Hubs
}
catch (Exception ex)
{
GlobalSync.ClearCompletedSyncs(userId, Context.ConnectionId);
GlobalSync.ClearPushOperations(userId, Context.ConnectionId);
throw ex;
}
}
public async Task<bool> SyncCompleted(long dateSynced)
@@ -199,9 +235,7 @@ namespace Notesnook.API.Hubs
{
UserSettings userSettings = await this.Repositories.UsersSettings.FindOneAsync((u) => u.UserId == userId);
long lastSynced = GlobalSync.RunningSyncs.ContainsKey(userId) && GlobalSync.RunningSyncs[userId].Count > 1
? GlobalSync.RunningSyncs[userId].MinBy((a) => a.LastSynced).LastSynced
: Math.Max(dateSynced, userSettings.LastSynced);
long lastSynced = Math.Max(dateSynced, userSettings.LastSynced);
userSettings.LastSynced = lastSynced;
@@ -213,7 +247,7 @@ namespace Notesnook.API.Hubs
}
finally
{
GlobalSync.ClearCompletedSyncs(userId, Context.ConnectionId);
GlobalSync.ClearPushOperations(userId, Context.ConnectionId);
}
}
@@ -222,11 +256,16 @@ namespace Notesnook.API.Hubs
{
var userId = Context.User.FindFirstValue("sub");
if (GlobalSync.IsUserPushing(userId))
{
throw new HubException("Cannot fetch data while another sync is in progress. Please try again later.");
}
var userSettings = await Repositories.UsersSettings.FindOneAsync((u) => u.UserId == userId);
if (userSettings.LastSynced > 0 && lastSyncedTimestamp > userSettings.LastSynced)
lastSyncedTimestamp = userSettings.LastSynced - 1;
// throw new HubException($"Provided timestamp value is too large. Server timestamp: {userSettings.LastSynced} Sent timestamp: {lastSyncedTimestamp}");
{
throw new HubException($"Provided timestamp value is too large. Server timestamp: {userSettings.LastSynced} Sent timestamp: {lastSyncedTimestamp}. Please run a Force Sync to fix this issue.");
}
// var client = Clients.Caller;
if (lastSyncedTimestamp > 0 && userSettings.LastSynced == lastSyncedTimestamp)