/* This file is part of the Notesnook Sync Server project (https://notesnook.com/) Copyright (C) 2023 Streetwriters (Private) Limited This program is free software: you can redistribute it and/or modify it under the terms of the Affero GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the Affero GNU General Public License for more details. You should have received a copy of the Affero GNU General Public License along with this program. If not, see . */ using System; using System.Collections.Frozen; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Linq.Expressions; using System.Security.Claims; using System.Text.Json.Serialization; using System.Threading.Tasks; using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.SignalR; using Microsoft.Extensions.Logging; using MongoDB.Driver; using Notesnook.API.Authorization; using Notesnook.API.Interfaces; using Notesnook.API.Models; using Notesnook.API.Services; using Streetwriters.Data.Interfaces; namespace Notesnook.API.Hubs { public interface ISyncV2HubClient { Task SendItems(SyncTransferItemV2 transferItem); Task SendVaultKey(EncryptedData vaultKey); Task SendMonographs(IEnumerable monographs); Task SendInboxItems(IEnumerable inboxItems); Task PushCompleted(); } [Authorize] public class SyncV2Hub : Hub { private ISyncItemsRepositoryAccessor Repositories { get; } private readonly IUnitOfWork unit; private static readonly string[] CollectionKeys = [ "settingitem", "attachment", "note", "notebook", "content", "shortcut", "reminder", "color", "tag", "vault", "relation", // relations must sync at the end to prevent invalid state ]; private readonly FrozenDictionary, string, long>> UpsertActionsMap; private readonly Func>>[] Collections; ILogger Logger { get; } public SyncV2Hub(ISyncItemsRepositoryAccessor syncItemsRepositoryAccessor, IUnitOfWork unitOfWork, ILogger logger) { Logger = logger; Repositories = syncItemsRepositoryAccessor; unit = unitOfWork; Collections = [ Repositories.Settings.FindItemsById, Repositories.Attachments.FindItemsById, Repositories.Notes.FindItemsById, Repositories.Notebooks.FindItemsById, Repositories.Contents.FindItemsById, Repositories.Shortcuts.FindItemsById, Repositories.Reminders.FindItemsById, Repositories.Colors.FindItemsById, Repositories.Tags.FindItemsById, Repositories.Vaults.FindItemsById, Repositories.Relations.FindItemsById, ]; UpsertActionsMap = new Dictionary, string, long>> { { "settingitem", Repositories.Settings.UpsertMany }, { "attachment", Repositories.Attachments.UpsertMany }, { "note", Repositories.Notes.UpsertMany }, { "notebook", Repositories.Notebooks.UpsertMany }, { "content", Repositories.Contents.UpsertMany }, { "shortcut", Repositories.Shortcuts.UpsertMany }, { "reminder", Repositories.Reminders.UpsertMany }, { "relation", Repositories.Relations.UpsertMany }, { "color", Repositories.Colors.UpsertMany }, { "vault", Repositories.Vaults.UpsertMany }, { "tag", Repositories.Tags.UpsertMany }, }.ToFrozenDictionary(); } public override async Task OnConnectedAsync() { var result = new SyncRequirement().IsAuthorized(Context.User, new PathString("/hubs/sync/v2")); if (!result.Succeeded) { var reason = result.AuthorizationFailure?.FailureReasons.FirstOrDefault(); throw new HubException(reason?.Message ?? "Unauthorized"); } var id = Context.User?.FindFirstValue("sub") ?? throw new HubException("User not found."); await Groups.AddToGroupAsync(Context.ConnectionId, id); await base.OnConnectedAsync(); } public async Task PushItems(string deviceId, SyncTransferItemV2 pushItem) { var userId = Context.User?.FindFirstValue("sub") ?? throw new HubException("Please login to sync."); SyncEventCounterSource.Log.PushV2(); var stopwatch = Stopwatch.StartNew(); try { var UpsertItems = UpsertActionsMap[pushItem.Type] ?? throw new Exception($"Invalid item type: {pushItem.Type}."); UpsertItems(pushItem.Items, userId, 1); if (!await unit.Commit()) return 0; new SyncDeviceService(new SyncDevice(userId, deviceId)).AddIdsToOtherDevices(pushItem.Items.Select((i) => $"{i.ItemId}:{pushItem.Type}").ToList()); return 1; } finally { SyncEventCounterSource.Log.RecordPushDuration(stopwatch.ElapsedMilliseconds); } } public async Task PushCompleted() { var userId = Context.User?.FindFirstValue("sub") ?? throw new HubException("User not found."); await Clients.OthersInGroup(userId).PushCompleted(); return true; } private async IAsyncEnumerable PrepareChunks(string userId, string[] ids, int size, bool resetSync, long maxBytes) { var itemsProcessed = 0; for (int i = 0; i < Collections.Length; i++) { var type = CollectionKeys[i]; var filteredIds = ids.Where((id) => id.EndsWith($":{type}")).Select((id) => id.Split(":")[0]).ToArray(); if (!resetSync && filteredIds.Length == 0) continue; using var cursor = await Collections[i](userId, filteredIds, resetSync, size); var chunk = new List(); long totalBytes = 0; long METADATA_BYTES = 5 * 1024; while (await cursor.MoveNextAsync()) { foreach (var item in cursor.Current) { chunk.Add(item); totalBytes += item.Length + METADATA_BYTES; if (totalBytes >= maxBytes) { itemsProcessed += chunk.Count; yield return new SyncTransferItemV2 { Items = chunk, Type = type, Count = itemsProcessed }; totalBytes = 0; chunk.Clear(); } } } if (chunk.Count > 0) { itemsProcessed += chunk.Count; yield return new SyncTransferItemV2 { Items = chunk, Type = type, Count = itemsProcessed }; } } } public async Task RequestFetch(string deviceId) { return await HandleRequestFetch(deviceId, false, false); } public async Task RequestFetchV2(string deviceId) { return await HandleRequestFetch(deviceId, true, false); } public async Task RequestFetchV3(string deviceId) { return await HandleRequestFetch(deviceId, true, true); } private async Task HandleRequestFetch(string deviceId, bool includeMonographs, bool includeInboxItems) { var userId = Context.User?.FindFirstValue("sub") ?? throw new HubException("Please login to sync."); SyncEventCounterSource.Log.FetchV2(); var device = new SyncDevice(userId, deviceId); var deviceService = new SyncDeviceService(device); if (!deviceService.IsDeviceRegistered()) deviceService.RegisterDevice(); device.LastAccessTime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); var isResetSync = deviceService.IsSyncReset(); if (!deviceService.IsUnsynced() && !deviceService.IsSyncPending() && !isResetSync) return new SyncV2Metadata { Synced = true }; var stopwatch = Stopwatch.StartNew(); try { string[] ids = deviceService.FetchUnsyncedIds(); var chunks = PrepareChunks( userId, ids, size: 1000, resetSync: isResetSync, maxBytes: 7 * 1024 * 1024 ); var userSettings = await Repositories.UsersSettings.FindOneAsync((u) => u.UserId.Equals(userId)); if (userSettings.VaultKey != null) { if (!await Clients.Caller.SendVaultKey(userSettings.VaultKey).WaitAsync(TimeSpan.FromMinutes(10))) throw new HubException("Client rejected vault key."); } await foreach (var chunk in chunks) { if (!await Clients.Caller.SendItems(chunk).WaitAsync(TimeSpan.FromMinutes(10))) throw new HubException("Client rejected sent items."); if (!isResetSync) { var syncedIds = chunk.Items.Select((i) => $"{i.ItemId}:{chunk.Type}").ToHashSet(); ids = ids.Where((id) => !syncedIds.Contains(id)).ToArray(); deviceService.WritePendingIds(ids); } } if (includeMonographs) { var isSyncingMonographsForFirstTime = !device.HasInitialMonographsSync; var unsyncedMonographs = ids.Where((id) => id.EndsWith(":monograph")).ToHashSet(); var unsyncedMonographIds = unsyncedMonographs.Select((id) => id.Split(":")[0]).ToArray(); FilterDefinition filter = isResetSync || isSyncingMonographsForFirstTime ? Builders.Filter.Eq("UserId", userId) : Builders.Filter.And( Builders.Filter.Eq("UserId", userId), Builders.Filter.Or( Builders.Filter.In("ItemId", unsyncedMonographIds), Builders.Filter.In("_id", unsyncedMonographIds) ) ); var userMonographs = await Repositories.Monographs.Collection.Find(filter).Project((m) => new MonographMetadata { DatePublished = m.DatePublished, Deleted = m.Deleted, Password = m.Password, SelfDestruct = m.SelfDestruct, Title = m.Title, ItemId = m.ItemId ?? m.Id.ToString(), ViewCount = m.ViewCount }).ToListAsync(); if (userMonographs.Count > 0 && !await Clients.Caller.SendMonographs(userMonographs).WaitAsync(TimeSpan.FromMinutes(10))) throw new HubException("Client rejected monographs."); device.HasInitialMonographsSync = true; } if (includeInboxItems) { var unsyncedInboxItems = ids.Where((id) => id.EndsWith(":inboxItems")).ToHashSet(); var unsyncedInboxItemIds = unsyncedInboxItems.Select((id) => id.Split(":")[0]).ToArray(); var userInboxItems = isResetSync ? await Repositories.InboxItems.FindAsync(m => m.UserId == userId) : await Repositories.InboxItems.FindAsync(m => m.UserId == userId && unsyncedInboxItemIds.Contains(m.ItemId)); if (userInboxItems.Any() && !await Clients.Caller.SendInboxItems(userInboxItems).WaitAsync(TimeSpan.FromMinutes(10))) { throw new HubException("Client rejected inbox items."); } } deviceService.Reset(); return new SyncV2Metadata { Synced = true, }; } finally { SyncEventCounterSource.Log.RecordFetchDuration(stopwatch.ElapsedMilliseconds); } } } [MessagePack.MessagePackObject] public struct SyncV2Metadata { [MessagePack.Key("synced")] [JsonPropertyName("synced")] public bool Synced { get; set; } } [MessagePack.MessagePackObject] public struct SyncTransferItemV2 { [MessagePack.Key("items")] [JsonPropertyName("items")] public IEnumerable Items { get; set; } [MessagePack.Key("type")] [JsonPropertyName("type")] public string Type { get; set; } [MessagePack.Key("count")] [JsonPropertyName("count")] public int Count { get; set; } } }