sync: migrate sync devices from fs to mongodb
This commit is contained in:
@@ -46,12 +46,14 @@ namespace Notesnook.API.Hubs
|
||||
Task<bool> SendMonographs(IEnumerable<MonographMetadata> monographs);
|
||||
Task<bool> SendInboxItems(IEnumerable<InboxSyncItem> inboxItems);
|
||||
Task PushCompleted();
|
||||
Task PushCompletedV2(string deviceId);
|
||||
}
|
||||
|
||||
[Authorize]
|
||||
public class SyncV2Hub : Hub<ISyncV2HubClient>
|
||||
{
|
||||
private ISyncItemsRepositoryAccessor Repositories { get; }
|
||||
private SyncDeviceService SyncDeviceService { get; }
|
||||
private readonly IUnitOfWork unit;
|
||||
private static readonly string[] CollectionKeys = [
|
||||
"settingitem",
|
||||
@@ -67,14 +69,15 @@ namespace Notesnook.API.Hubs
|
||||
"relation", // relations must sync at the end to prevent invalid state
|
||||
];
|
||||
private readonly FrozenDictionary<string, Action<IEnumerable<SyncItem>, string, long>> UpsertActionsMap;
|
||||
private readonly Func<string, string[], bool, int, Task<IAsyncCursor<SyncItem>>>[] Collections;
|
||||
private readonly Func<string, IEnumerable<string>, bool, int, Task<IAsyncCursor<SyncItem>>>[] Collections;
|
||||
ILogger<SyncV2Hub> Logger { get; }
|
||||
|
||||
public SyncV2Hub(ISyncItemsRepositoryAccessor syncItemsRepositoryAccessor, IUnitOfWork unitOfWork, ILogger<SyncV2Hub> logger)
|
||||
public SyncV2Hub(ISyncItemsRepositoryAccessor syncItemsRepositoryAccessor, IUnitOfWork unitOfWork, SyncDeviceService syncDeviceService, ILogger<SyncV2Hub> logger)
|
||||
{
|
||||
Logger = logger;
|
||||
Repositories = syncItemsRepositoryAccessor;
|
||||
unit = unitOfWork;
|
||||
SyncDeviceService = syncDeviceService;
|
||||
|
||||
Collections = [
|
||||
Repositories.Settings.FindItemsById,
|
||||
@@ -133,7 +136,7 @@ namespace Notesnook.API.Hubs
|
||||
|
||||
if (!await unit.Commit()) return 0;
|
||||
|
||||
new SyncDeviceService(new SyncDevice(userId, deviceId)).AddIdsToOtherDevices(pushItem.Items.Select((i) => $"{i.ItemId}:{pushItem.Type}").ToList());
|
||||
await SyncDeviceService.AddIdsToOtherDevicesAsync(userId, deviceId, pushItem.Items.Select((i) => new ItemKey(i.ItemId, pushItem.Type)));
|
||||
return 1;
|
||||
}
|
||||
finally
|
||||
@@ -149,14 +152,22 @@ namespace Notesnook.API.Hubs
|
||||
return true;
|
||||
}
|
||||
|
||||
private async IAsyncEnumerable<SyncTransferItemV2> PrepareChunks(string userId, string[] ids, int size, bool resetSync, long maxBytes)
|
||||
public async Task<bool> PushCompletedV2(string deviceId)
|
||||
{
|
||||
var userId = Context.User?.FindFirstValue("sub") ?? throw new HubException("User not found.");
|
||||
await Clients.OthersInGroup(userId).PushCompleted();
|
||||
await Clients.OthersInGroup(userId).PushCompletedV2(deviceId);
|
||||
return true;
|
||||
}
|
||||
|
||||
private async IAsyncEnumerable<SyncTransferItemV2> PrepareChunks(string userId, HashSet<ItemKey> ids, int size, bool resetSync, long maxBytes)
|
||||
{
|
||||
var itemsProcessed = 0;
|
||||
for (int i = 0; i < Collections.Length; i++)
|
||||
{
|
||||
var type = CollectionKeys[i];
|
||||
|
||||
var filteredIds = ids.Where((id) => id.EndsWith($":{type}")).Select((id) => id.Split(":")[0]).ToArray();
|
||||
var filteredIds = ids.Where((id) => id.Type == type).Select((id) => id.ItemId).ToArray();
|
||||
if (!resetSync && filteredIds.Length == 0) continue;
|
||||
|
||||
using var cursor = await Collections[i](userId, filteredIds, resetSync, size);
|
||||
@@ -220,61 +231,47 @@ namespace Notesnook.API.Hubs
|
||||
|
||||
SyncEventCounterSource.Log.FetchV2();
|
||||
|
||||
var device = new SyncDevice(userId, deviceId);
|
||||
var deviceService = new SyncDeviceService(device);
|
||||
if (!deviceService.IsDeviceRegistered()) deviceService.RegisterDevice();
|
||||
|
||||
device.LastAccessTime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
|
||||
|
||||
var isResetSync = deviceService.IsSyncReset();
|
||||
if (!deviceService.IsUnsynced() &&
|
||||
!deviceService.IsSyncPending() &&
|
||||
!isResetSync)
|
||||
return new SyncV2Metadata { Synced = true };
|
||||
var device = await SyncDeviceService.GetDeviceAsync(userId, deviceId);
|
||||
if (device == null)
|
||||
device = await SyncDeviceService.RegisterDeviceAsync(userId, deviceId);
|
||||
else
|
||||
await SyncDeviceService.UpdateLastAccessTimeAsync(userId, deviceId);
|
||||
|
||||
var stopwatch = Stopwatch.StartNew();
|
||||
try
|
||||
{
|
||||
string[] ids = deviceService.FetchUnsyncedIds();
|
||||
var ids = await SyncDeviceService.FetchUnsyncedIdsAsync(userId, deviceId);
|
||||
if (!device.IsSyncReset && ids.Count == 0)
|
||||
return new SyncV2Metadata { Synced = true };
|
||||
|
||||
var chunks = PrepareChunks(
|
||||
userId,
|
||||
ids,
|
||||
size: 1000,
|
||||
resetSync: isResetSync,
|
||||
resetSync: device.IsSyncReset,
|
||||
maxBytes: 7 * 1024 * 1024
|
||||
);
|
||||
|
||||
var userSettings = await Repositories.UsersSettings.FindOneAsync((u) => u.UserId.Equals(userId));
|
||||
if (userSettings.VaultKey != null)
|
||||
{
|
||||
if (!await Clients.Caller.SendVaultKey(userSettings.VaultKey).WaitAsync(TimeSpan.FromMinutes(10))) throw new HubException("Client rejected vault key.");
|
||||
}
|
||||
|
||||
|
||||
await foreach (var chunk in chunks)
|
||||
{
|
||||
if (!await Clients.Caller.SendItems(chunk).WaitAsync(TimeSpan.FromMinutes(10))) throw new HubException("Client rejected sent items.");
|
||||
|
||||
if (!isResetSync)
|
||||
if (!device.IsSyncReset)
|
||||
{
|
||||
var syncedIds = chunk.Items.Select((i) => $"{i.ItemId}:{chunk.Type}").ToHashSet();
|
||||
ids = ids.Where((id) => !syncedIds.Contains(id)).ToArray();
|
||||
deviceService.WritePendingIds(ids);
|
||||
ids.ExceptWith(chunk.Items.Select(i => new ItemKey(i.ItemId, chunk.Type)));
|
||||
await SyncDeviceService.WritePendingIdsAsync(userId, deviceId, ids);
|
||||
}
|
||||
}
|
||||
|
||||
if (includeMonographs)
|
||||
{
|
||||
var isSyncingMonographsForFirstTime = !device.HasInitialMonographsSync;
|
||||
var unsyncedMonographs = ids.Where((id) => id.EndsWith(":monograph")).ToHashSet();
|
||||
var unsyncedMonographIds = unsyncedMonographs.Select((id) => id.Split(":")[0]).ToArray();
|
||||
FilterDefinition<Monograph> filter = isResetSync || isSyncingMonographsForFirstTime
|
||||
? Builders<Monograph>.Filter.Eq("UserId", userId)
|
||||
var unsyncedMonographIds = ids.Where(k => k.Type == "monograph").Select(k => k.ItemId);
|
||||
FilterDefinition<Monograph> filter = device.IsSyncReset
|
||||
? Builders<Monograph>.Filter.Eq(m => m.UserId, userId)
|
||||
: Builders<Monograph>.Filter.And(
|
||||
Builders<Monograph>.Filter.Eq("UserId", userId),
|
||||
Builders<Monograph>.Filter.Eq(m => m.UserId, userId),
|
||||
Builders<Monograph>.Filter.Or(
|
||||
Builders<Monograph>.Filter.In("ItemId", unsyncedMonographIds),
|
||||
Builders<Monograph>.Filter.In(m => m.ItemId, unsyncedMonographIds),
|
||||
Builders<Monograph>.Filter.In("_id", unsyncedMonographIds)
|
||||
)
|
||||
);
|
||||
@@ -285,30 +282,26 @@ namespace Notesnook.API.Hubs
|
||||
Password = m.Password,
|
||||
SelfDestruct = m.SelfDestruct,
|
||||
Title = m.Title,
|
||||
ItemId = m.ItemId ?? m.Id.ToString(),
|
||||
ViewCount = m.ViewCount
|
||||
ItemId = m.ItemId ?? m.Id.ToString()
|
||||
}).ToListAsync();
|
||||
|
||||
if (userMonographs.Count > 0 && !await Clients.Caller.SendMonographs(userMonographs).WaitAsync(TimeSpan.FromMinutes(10)))
|
||||
throw new HubException("Client rejected monographs.");
|
||||
|
||||
device.HasInitialMonographsSync = true;
|
||||
}
|
||||
|
||||
if (includeInboxItems)
|
||||
{
|
||||
var unsyncedInboxItems = ids.Where((id) => id.EndsWith(":inboxItems")).ToHashSet();
|
||||
var unsyncedInboxItemIds = unsyncedInboxItems.Select((id) => id.Split(":")[0]).ToArray();
|
||||
var userInboxItems = isResetSync
|
||||
var unsyncedInboxItemIds = ids.Where(k => k.Type == "inbox_item").Select(k => k.ItemId);
|
||||
var userInboxItems = device.IsSyncReset
|
||||
? await Repositories.InboxItems.FindAsync(m => m.UserId == userId)
|
||||
: await Repositories.InboxItems.FindAsync(m => m.UserId == userId && unsyncedInboxItemIds.Contains(m.ItemId));
|
||||
: await Repositories.InboxItems.FindAsync(m => m.UserId == userId && unsyncedInboxItemIds.Contains(m.ItemId ?? m.Id.ToString()));
|
||||
if (userInboxItems.Any() && !await Clients.Caller.SendInboxItems(userInboxItems).WaitAsync(TimeSpan.FromMinutes(10)))
|
||||
{
|
||||
throw new HubException("Client rejected inbox items.");
|
||||
}
|
||||
}
|
||||
|
||||
deviceService.Reset();
|
||||
await SyncDeviceService.ResetAsync(userId, deviceId);
|
||||
|
||||
return new SyncV2Metadata
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user