sync: add upsertmany for faster bulk upserts

This commit is contained in:
Abdullah Atta
2024-06-07 11:16:06 +05:00
parent 98c5f0c96f
commit 292f2d4ece
2 changed files with 51 additions and 14 deletions

View File

@@ -80,21 +80,21 @@ namespace Notesnook.API.Hubs
await base.OnConnectedAsync();
}
private Action<SyncItem, string, long> MapTypeToUpsertAction(string type)
private Action<IEnumerable<SyncItem>, string, long> MapTypeToUpsertAction(string type)
{
return type switch
{
"settingitem" => Repositories.Settings.Upsert,
"attachment" => Repositories.Attachments.Upsert,
"note" => Repositories.Notes.Upsert,
"notebook" => Repositories.Notebooks.Upsert,
"content" => Repositories.Contents.Upsert,
"shortcut" => Repositories.Shortcuts.Upsert,
"reminder" => Repositories.Reminders.Upsert,
"relation" => Repositories.Relations.Upsert,
"color" => Repositories.Colors.Upsert,
"vault" => Repositories.Vaults.Upsert,
"tag" => Repositories.Tags.Upsert,
"settingitem" => Repositories.Settings.UpsertMany,
"attachment" => Repositories.Attachments.UpsertMany,
"note" => Repositories.Notes.UpsertMany,
"notebook" => Repositories.Notebooks.UpsertMany,
"content" => Repositories.Contents.UpsertMany,
"shortcut" => Repositories.Shortcuts.UpsertMany,
"reminder" => Repositories.Reminders.UpsertMany,
"relation" => Repositories.Relations.UpsertMany,
"color" => Repositories.Colors.UpsertMany,
"vault" => Repositories.Vaults.UpsertMany,
"tag" => Repositories.Tags.UpsertMany,
_ => null,
};
}

View File

@@ -130,8 +130,45 @@ namespace Notesnook.API.Repositories
);
dbContext.AddCommand((handle, ct) => Collection.ReplaceOneAsync(handle, filter, item, new ReplaceOptions { IsUpsert = true }, ct));
// await base.UpsertAsync(item, (x) => (x.ItemId == item.ItemId) && x.UserId == userId);
base.Upsert(item, (x) => x.UserId == userId && x.ItemId == item.ItemId);
}
public void UpsertMany(IEnumerable<SyncItem> items, string userId, long dateSynced)
{
var userIdFilter = Builders<SyncItem>.Filter.Eq("UserId", userId);
var writes = new List<WriteModel<SyncItem>>();
foreach (var item in items)
{
if (item.Length > 15 * 1024 * 1024)
{
throw new Exception($"Size of item \"{item.ItemId}\" is too large. Maximum allowed size is 15 MB.");
}
if (!IsValidAlgorithm(item.Algorithm))
{
throw new Exception($"Invalid alg identifier {item.Algorithm}");
}
// Handle case where the cipher is corrupted.
if (!IsBase64String(item.Cipher))
{
Slogger<SyncHub>.Error("Upsert", "Corrupted", item.ItemId, item.Length.ToString(), item.Cipher);
throw new Exception($"Corrupted item \"{item.ItemId}\" in collection \"{this.collectionName}\". Please report this error to support@streetwriters.co.");
}
var filter = Builders<SyncItem>.Filter.And(
userIdFilter,
Builders<SyncItem>.Filter.Eq("ItemId", item.ItemId)
);
item.DateSynced = dateSynced;
item.UserId = userId;
writes.Add(new ReplaceOneModel<SyncItem>(filter, item)
{
IsUpsert = true
});
}
dbContext.AddCommand((handle, ct) => Collection.BulkWriteAsync(handle, writes, options: new BulkWriteOptions { IsOrdered = false }, ct));
}
private static bool IsBase64String(string value)