sync: introduce sync v2

This commit is contained in:
Abdullah Atta
2023-09-09 20:29:05 +05:00
parent 55a7e9fd1c
commit ab7ea72fd4
3 changed files with 349 additions and 60 deletions

View File

@@ -23,14 +23,17 @@ using System.Linq;
using System.Runtime.CompilerServices;
using System.Security.Claims;
using System.Text.Json;
using System.Text.Json.Serialization;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.SignalR;
using MongoDB.Driver;
using Notesnook.API.Authorization;
using Notesnook.API.Interfaces;
using Notesnook.API.Models;
using Notesnook.API.Repositories;
using Streetwriters.Common.Models;
using Streetwriters.Data.Interfaces;
@@ -39,20 +42,25 @@ namespace Notesnook.API.Hubs
public struct RunningPush
{
public long Timestamp { get; set; }
public int Validity { get; set; }
public long Validity { get; set; }
public string ConnectionId { get; set; }
}
public interface ISyncHubClient
{
Task SyncItem(SyncTransferItem transferItem);
Task PushItems(SyncTransferItemV2 transferItem);
Task<bool> SendItems(SyncTransferItemV2 transferItem);
Task RemoteSyncCompleted(long lastSynced);
Task PushCompleted(long lastSynced);
Task SyncCompleted();
}
public class GlobalSync
{
private const long PUSH_VALIDITY_EXTENSION_PERIOD = 16 * 1000; // 16 second
private const int PUSH_VALIDITY_PERIOD_PER_ITEM = 5 * 100; // 0.5 second
private const int BASE_PUSH_VALIDITY_PERIOD = 5 * 1000; // 5 seconds
private const long BASE_PUSH_VALIDITY_PERIOD = 5 * 1000; // 5 seconds
private const long BASE_PUSH_VALIDITY_PERIOD_NEW = 16 * 1000; // 16 seconds
private readonly static Dictionary<string, List<RunningPush>> PushOperations = new();
public static void ClearPushOperations(string userId, string connectionId)
@@ -90,7 +98,7 @@ namespace Notesnook.API.Hubs
return count > 0;
}
public static void StartPush(string userId, string connectionId, int totalItems)
public static void StartPush(string userId, string connectionId, long? totalItems = null)
{
if (IsPushing(userId, connectionId)) return;
@@ -101,10 +109,27 @@ namespace Notesnook.API.Hubs
{
ConnectionId = connectionId,
Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
Validity = BASE_PUSH_VALIDITY_PERIOD + totalItems * PUSH_VALIDITY_PERIOD_PER_ITEM
Validity = totalItems.HasValue ? BASE_PUSH_VALIDITY_PERIOD + (totalItems.Value * PUSH_VALIDITY_PERIOD_PER_ITEM) : BASE_PUSH_VALIDITY_PERIOD_NEW
});
}
public static void ExtendPush(string userId, string connectionId)
{
if (!IsPushing(userId, connectionId) || !PushOperations.ContainsKey(userId))
{
StartPush(userId, connectionId);
return;
}
var index = PushOperations[userId].FindIndex((push) => push.ConnectionId == connectionId);
if (index < 0)
{
StartPush(userId, connectionId);
return;
}
var pushOperation = PushOperations[userId][index];
pushOperation.Validity += PUSH_VALIDITY_EXTENSION_PERIOD;
}
private static bool IsPushValid(RunningPush push, long now)
{
return now < push.Timestamp + push.Validity;
@@ -116,6 +141,18 @@ namespace Notesnook.API.Hubs
{
private ISyncItemsRepositoryAccessor Repositories { get; }
private readonly IUnitOfWork unit;
private readonly string[] CollectionKeys = new[] {
"settings",
"attachment",
"note",
"notebook",
"content",
"shortcut",
"reminder",
"color",
"tag",
"relation", // relations must sync at the end to prevent invalid state
};
public SyncHub(ISyncItemsRepositoryAccessor syncItemsRepositoryAccessor, IUnitOfWork unitOfWork)
{
@@ -203,6 +240,16 @@ namespace Notesnook.API.Hubs
case "relation":
Repositories.Relations.Upsert(JsonSerializer.Deserialize<Relation>(data), userId, dateSynced);
break;
case "color":
var color = JsonSerializer.Deserialize<Color>(data);
if (color.Version < 5.9) continue;
Repositories.Colors.Upsert(color, userId, dateSynced);
break;
case "tag":
var tag = JsonSerializer.Deserialize<Models.Tag>(data);
if (tag.Version < 5.9) continue;
Repositories.Tags.Upsert(tag, userId, dateSynced);
break;
case "settings":
var settings = JsonSerializer.Deserialize<Setting>(data);
settings.Id = MongoDB.Bson.ObjectId.Parse(userId);
@@ -228,6 +275,83 @@ namespace Notesnook.API.Hubs
}
}
private Action<SyncItem, string, long> MapTypeToUpsertAction(string type)
{
return type switch
{
"attachment" => Repositories.Attachments.Upsert,
"note" => Repositories.Notes.Upsert,
"notebook" => Repositories.Notebooks.Upsert,
"content" => Repositories.Contents.Upsert,
"shortcut" => Repositories.Shortcuts.Upsert,
"reminder" => Repositories.Reminders.Upsert,
"relation" => Repositories.Relations.Upsert,
"color" => Repositories.Colors.Upsert,
"tag" => Repositories.Tags.Upsert,
_ => null,
};
}
public async Task<long> InitializePush(SyncMetadata syncMetadata)
{
if (syncMetadata.LastSynced <= 0) throw new HubException("Last synced time cannot be zero or less than zero.");
var userId = Context.User.FindFirstValue("sub");
if (string.IsNullOrEmpty(userId)) return 0;
UserSettings userSettings = await Repositories.UsersSettings.FindOneAsync((u) => u.UserId == userId);
long dateSynced = Math.Max(syncMetadata.LastSynced, userSettings.LastSynced);
GlobalSync.StartPush(userId, Context.ConnectionId);
if (userSettings.VaultKey != null && syncMetadata.VaultKey != null && !userSettings.VaultKey.Equals(syncMetadata.VaultKey) || (userSettings.VaultKey == null && syncMetadata.VaultKey != null))
{
userSettings.VaultKey = syncMetadata.VaultKey;
await Repositories.UsersSettings.UpsertAsync(userSettings, (u) => u.UserId == userId);
}
return dateSynced;
}
public async Task<int> PushItems(SyncTransferItemV2 pushItem, long dateSynced)
{
var userId = Context.User.FindFirstValue("sub");
if (string.IsNullOrEmpty(userId)) return 0;
try
{
var others = Clients.OthersInGroup(userId);
others.PushItems(pushItem);
GlobalSync.ExtendPush(userId, Context.ConnectionId);
if (pushItem.Type == "settings")
{
var settings = pushItem.Items.First();
if (settings == null) return 0;
settings.Id = MongoDB.Bson.ObjectId.Parse(userId);
settings.ItemId = userId;
Repositories.Settings.Upsert(settings, userId, dateSynced);
}
else
{
var UpsertItem = MapTypeToUpsertAction(pushItem.Type) ?? throw new Exception("Invalid item type.");
foreach (var item in pushItem.Items)
{
UpsertItem(item, userId, dateSynced);
}
}
return await unit.Commit() ? 1 : 0;
}
catch (Exception ex)
{
GlobalSync.ClearPushOperations(userId, Context.ConnectionId);
throw ex;
}
}
public async Task<bool> SyncCompleted(long dateSynced)
{
var userId = Context.User.FindFirstValue("sub");
@@ -242,6 +366,7 @@ namespace Notesnook.API.Hubs
await this.Repositories.UsersSettings.UpsertAsync(userSettings, (u) => u.UserId == userId);
await Clients.OthersInGroup(userId).RemoteSyncCompleted(lastSynced);
await Clients.OthersInGroup(userId).PushCompleted(lastSynced);
return true;
}
@@ -251,8 +376,59 @@ namespace Notesnook.API.Hubs
}
}
public async IAsyncEnumerable<SyncTransferItem> FetchItems(long lastSyncedTimestamp, [EnumeratorCancellation]
CancellationToken cancellationToken)
private static async IAsyncEnumerable<SyncTransferItemV2> PrepareChunks(Func<string, long, int, Task<IAsyncCursor<SyncItem>>>[] collections, string[] types, string userId, long lastSyncedTimestamp, int size, long maxBytes, int skipChunks)
{
var chunksProcessed = 0;
for (int i = 0; i < collections.Length; i++)
{
var type = types[i];
using var cursor = await collections[i](userId, lastSyncedTimestamp, size);
var chunk = new List<SyncItem>();
long totalBytes = 0;
long METADATA_BYTES = 5 * 1024;
while (await cursor.MoveNextAsync())
{
if (chunksProcessed++ < skipChunks) continue;
foreach (var item in cursor.Current)
{
chunk.Add(item);
totalBytes += item.Length + METADATA_BYTES;
if (totalBytes >= maxBytes)
{
yield return new SyncTransferItemV2
{
Items = chunk,
Type = type,
Count = chunksProcessed
};
totalBytes = 0;
chunk.Clear();
}
}
}
if (chunk.Count > 0)
{
if (chunksProcessed++ < skipChunks) continue;
yield return new SyncTransferItemV2
{
Items = chunk,
Type = type,
Count = chunksProcessed
};
}
}
}
public Task<SyncMetadata> RequestFetch(long lastSyncedTimestamp)
{
return RequestResumableFetch(lastSyncedTimestamp);
}
public async Task<SyncMetadata> RequestResumableFetch(long lastSyncedTimestamp, int cursor = 0)
{
var userId = Context.User.FindFirstValue("sub");
@@ -270,49 +446,87 @@ namespace Notesnook.API.Hubs
if (lastSyncedTimestamp > 0 && userSettings.LastSynced == lastSyncedTimestamp)
{
yield return new SyncTransferItem
return new SyncMetadata
{
LastSynced = userSettings.LastSynced,
Synced = true
};
}
var isResumable = lastSyncedTimestamp == 0;
if (!isResumable) cursor = 0;
var chunks = PrepareChunks(
collections: new[] {
Repositories.Settings.FindItemsSyncedAfter,
Repositories.Attachments.FindItemsSyncedAfter,
Repositories.Notes.FindItemsSyncedAfter,
Repositories.Notebooks.FindItemsSyncedAfter,
Repositories.Contents.FindItemsSyncedAfter,
Repositories.Shortcuts.FindItemsSyncedAfter,
Repositories.Reminders.FindItemsSyncedAfter,
Repositories.Colors.FindItemsSyncedAfter,
Repositories.Tags.FindItemsSyncedAfter,
Repositories.Relations.FindItemsSyncedAfter,
},
types: CollectionKeys,
userId,
lastSyncedTimestamp,
size: 1000,
maxBytes: 7 * 1024 * 1024,
skipChunks: cursor
);
await foreach (var chunk in chunks)
{
_ = await Clients.Caller.SendItems(chunk).WaitAsync(TimeSpan.FromMinutes(10));
}
return new SyncMetadata
{
VaultKey = userSettings.VaultKey,
LastSynced = userSettings.LastSynced,
};
}
public async IAsyncEnumerable<SyncTransferItem> FetchItems(long lastSyncedTimestamp, [EnumeratorCancellation]
CancellationToken cancellationToken)
{
var userId = Context.User.FindFirstValue("sub");
if (GlobalSync.IsUserPushing(userId))
{
throw new HubException("Cannot fetch data while another sync is in progress. Please try again later.");
}
var userSettings = await Repositories.UsersSettings.FindOneAsync((u) => u.UserId == userId);
if (userSettings.LastSynced > 0 && lastSyncedTimestamp > userSettings.LastSynced)
{
throw new HubException($"Provided timestamp value is too large. Server timestamp: {userSettings.LastSynced} Sent timestamp: {lastSyncedTimestamp}. Please run a Force Sync to fix this issue.");
}
if (lastSyncedTimestamp > 0 && userSettings.LastSynced == lastSyncedTimestamp)
{
yield return new SyncTransferItem
{
Synced = true,
LastSynced = userSettings.LastSynced
};
yield break;
}
var total = (await Task.WhenAll(
Repositories.Attachments.CountItemsSyncedAfterAsync(userId, lastSyncedTimestamp),
Repositories.Notes.CountItemsSyncedAfterAsync(userId, lastSyncedTimestamp),
Repositories.Notebooks.CountItemsSyncedAfterAsync(userId, lastSyncedTimestamp),
Repositories.Contents.CountItemsSyncedAfterAsync(userId, lastSyncedTimestamp),
Repositories.Settings.CountItemsSyncedAfterAsync(userId, lastSyncedTimestamp),
Repositories.Shortcuts.CountItemsSyncedAfterAsync(userId, lastSyncedTimestamp),
Repositories.Reminders.CountItemsSyncedAfterAsync(userId, lastSyncedTimestamp),
Repositories.Relations.CountItemsSyncedAfterAsync(userId, lastSyncedTimestamp),
Repositories.Colors.CountItemsSyncedAfterAsync(userId, lastSyncedTimestamp),
Repositories.Tags.CountItemsSyncedAfterAsync(userId, lastSyncedTimestamp)
)).Sum((a) => a);
var attachments = await Repositories.Attachments.GetItemsSyncedAfterAsync(userId, lastSyncedTimestamp);
var notes = await Repositories.Notes.GetItemsSyncedAfterAsync(userId, lastSyncedTimestamp);
var notebooks = await Repositories.Notebooks.GetItemsSyncedAfterAsync(userId, lastSyncedTimestamp);
var contents = await Repositories.Contents.GetItemsSyncedAfterAsync(userId, lastSyncedTimestamp);
var settings = await Repositories.Settings.GetItemsSyncedAfterAsync(userId, lastSyncedTimestamp);
var shortcuts = await Repositories.Shortcuts.GetItemsSyncedAfterAsync(userId, lastSyncedTimestamp);
var reminders = await Repositories.Reminders.GetItemsSyncedAfterAsync(userId, lastSyncedTimestamp);
var relations = await Repositories.Relations.GetItemsSyncedAfterAsync(userId, lastSyncedTimestamp);
var collections = new Dictionary<string, IEnumerable<object>>
{
["attachment"] = attachments,
["note"] = notes,
["notebook"] = notebooks,
["content"] = contents,
["shortcut"] = shortcuts,
["reminder"] = reminders,
["relation"] = relations,
["settings"] = settings,
};
if (userSettings.VaultKey != null)
{
collections.Add("vaultKey", new object[] { userSettings.VaultKey });
}
var total = collections.Values.Sum((a) => a.Count());
if (total == 0)
{
yield return new SyncTransferItem
@@ -323,23 +537,51 @@ namespace Notesnook.API.Hubs
yield break;
}
foreach (var collection in collections)
var collections = new[] {
Repositories.Settings.FindItemsSyncedAfter,
Repositories.Attachments.FindItemsSyncedAfter,
Repositories.Notes.FindItemsSyncedAfter,
Repositories.Notebooks.FindItemsSyncedAfter,
Repositories.Contents.FindItemsSyncedAfter,
Repositories.Shortcuts.FindItemsSyncedAfter,
Repositories.Reminders.FindItemsSyncedAfter,
Repositories.Colors.FindItemsSyncedAfter,
Repositories.Tags.FindItemsSyncedAfter,
Repositories.Relations.FindItemsSyncedAfter,
};
for (var i = 0; i < collections.Length; ++i)
{
foreach (var item in collection.Value)
var key = CollectionKeys[i];
using var cursor = await collections[i](userId, lastSyncedTimestamp, 1000);
while (await cursor.MoveNextAsync(cancellationToken))
{
if (item == null) continue;
// Check the cancellation token regularly so that the server will stop producing items if the client disconnects.
cancellationToken.ThrowIfCancellationRequested();
yield return new SyncTransferItem
foreach (var item in cursor.Current)
{
LastSynced = userSettings.LastSynced,
Synced = false,
Item = JsonSerializer.Serialize(item),
ItemType = collection.Key,
Total = total,
};
yield return new SyncTransferItem
{
LastSynced = userSettings.LastSynced,
Synced = false,
Item = JsonSerializer.Serialize(item),
ItemType = key,
Total = (int)total,
};
}
}
}
if (userSettings.VaultKey != null)
{
yield return new SyncTransferItem
{
LastSynced = userSettings.LastSynced,
Synced = false,
Item = JsonSerializer.Serialize(userSettings.VaultKey),
ItemType = "vaultKey",
Total = (int)total,
};
}
}
}
@@ -384,4 +626,33 @@ namespace Notesnook.API.Hubs
[MessagePack.Key("current")]
public int Current { get; set; }
}
[MessagePack.MessagePackObject]
public struct SyncTransferItemV2
{
[MessagePack.Key("items")]
[JsonPropertyName("items")]
public IEnumerable<SyncItem> Items { get; set; }
[MessagePack.Key("type")]
[JsonPropertyName("type")]
public string Type { get; set; }
[MessagePack.Key("count")]
[JsonPropertyName("count")]
public int Count { get; set; }
}
[MessagePack.MessagePackObject]
public struct SyncMetadata
{
[MessagePack.Key("vaultKey")]
[JsonPropertyName("vaultKey")]
public EncryptedData VaultKey { get; set; }
[MessagePack.Key("lastSynced")]
[JsonPropertyName("lastSynced")]
public long LastSynced { get; set; }
// [MessagePack.Key("total")]
// public long TotalItems { get; set; }
}
}

View File

@@ -52,5 +52,14 @@ namespace Notesnook.API.Models
[BsonElement("salt")]
[DataMember(Name = "salt")]
public string Salt { get; set; }
public override bool Equals(object obj)
{
if (obj is EncryptedData encryptedData)
{
return IV == encryptedData.IV && Salt == encryptedData.Salt && Cipher == encryptedData.Cipher && Length == encryptedData.Length;
}
return base.Equals(obj);
}
}
}

View File

@@ -50,12 +50,21 @@ namespace Notesnook.API.Repositories
return ALGORITHMS.Contains(algorithm);
}
public async Task<IEnumerable<T>> GetItemsSyncedAfterAsync(string userId, long timestamp)
public Task<long> CountItemsSyncedAfterAsync(string userId, long timestamp)
{
var cursor = await Collection.FindAsync(n => (n.DateSynced > timestamp) && n.UserId.Equals(userId));
return cursor.ToList();
return Collection.CountDocumentsAsync(n => (n.DateSynced > timestamp) && n.UserId.Equals(userId));
}
public Task<IAsyncCursor<SyncItem>> FindItemsSyncedAfter(string userId, long timestamp, int batchSize)
{
return Collection.FindAsync(n => (n.DateSynced > timestamp) && n.UserId.Equals(userId), new FindOptions<SyncItem>
{
BatchSize = batchSize,
AllowDiskUse = true,
AllowPartialResults = false,
NoCursorTimeout = true,
Sort = new SortDefinitionBuilder<SyncItem>().Ascending((a) => a.Id)
});
}
// public async Task DeleteIdsAsync(string[] ids, string userId, CancellationToken token = default(CancellationToken))
// {
// await Collection.DeleteManyAsync<T>((i) => ids.Contains(i.Id) && i.UserId == userId, token);
@@ -63,10 +72,10 @@ namespace Notesnook.API.Repositories
public void DeleteByUserId(string userId)
{
dbContext.AddCommand((handle, ct) => Collection.DeleteManyAsync<T>(handle, (i) => i.UserId == userId, cancellationToken: ct));
dbContext.AddCommand((handle, ct) => Collection.DeleteManyAsync(handle, (i) => i.UserId == userId, cancellationToken: ct));
}
public async Task UpsertAsync(T item, string userId, long dateSynced)
public async Task UpsertAsync(SyncItem item, string userId, long dateSynced)
{
if (item.Length > 15 * 1024 * 1024)