diff --git a/Streetwriters.Common/Extensions/WampRealmExtensions.cs b/Streetwriters.Common/Extensions/WampRealmExtensions.cs index 4b86694..0dea9bb 100644 --- a/Streetwriters.Common/Extensions/WampRealmExtensions.cs +++ b/Streetwriters.Common/Extensions/WampRealmExtensions.cs @@ -18,6 +18,8 @@ along with this program. If not, see . */ using System; +using System.Reactive.Disposables; +using System.Threading; using Microsoft.AspNetCore.Builder; using Streetwriters.Common.Interfaces; using WampSharp.AspNetCore.WebSockets.Server; @@ -38,5 +40,27 @@ namespace Streetwriters.Common.Extensions { return realm.Services.GetSubject(topicName).Subscribe(async (message) => await handler.Process(message)); } + + public static IDisposable SubscribeWithSemaphore(this IWampHostedRealm realm, string topicName, IMessageHandler handler) + { + var semaphore = new SemaphoreSlim(1, 1); + var subscriber = realm.Services.GetSubject(topicName).Subscribe(async (message) => + { + await semaphore.WaitAsync(); + try + { + await handler.Process(message); + } + finally + { + semaphore.Release(); + } + }); + return Disposable.Create(() => + { + subscriber.Dispose(); + semaphore.Dispose(); + }); + } } } \ No newline at end of file diff --git a/Streetwriters.Common/Helpers/WampHelper.cs b/Streetwriters.Common/Helpers/WampHelper.cs index 4d01fa0..b641035 100644 --- a/Streetwriters.Common/Helpers/WampHelper.cs +++ b/Streetwriters.Common/Helpers/WampHelper.cs @@ -1,47 +1,55 @@ -/* -This file is part of the Notesnook Sync Server project (https://notesnook.com/) - -Copyright (C) 2023 Streetwriters (Private) Limited - -This program is free software: you can redistribute it and/or modify -it under the terms of the Affero GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -Affero GNU General Public License for more details. - -You should have received a copy of the Affero GNU General Public License -along with this program. If not, see . -*/ - -using System.Reactive.Subjects; -using System.Threading.Tasks; -using Streetwriters.Common.Messages; -using WampSharp.V2; -using WampSharp.V2.Client; - -namespace Streetwriters.Common.Helpers -{ - public class WampHelper - { - public static async Task OpenWampChannelAsync(string server, string realmName) - { - DefaultWampChannelFactory channelFactory = new(); - - IWampChannel channel = channelFactory.CreateJsonChannel(server, realmName); - - await channel.Open(); - - return channel.RealmProxy; - } - - public static void PublishMessage(IWampRealmProxy realm, string topicName, T message) - { - var subject = realm.Services.GetSubject(topicName); - subject.OnNext(message); - } - } +/* +This file is part of the Notesnook Sync Server project (https://notesnook.com/) + +Copyright (C) 2023 Streetwriters (Private) Limited + +This program is free software: you can redistribute it and/or modify +it under the terms of the Affero GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +Affero GNU General Public License for more details. + +You should have received a copy of the Affero GNU General Public License +along with this program. If not, see . +*/ + +using System.Collections.Generic; +using System.Reactive.Subjects; +using System.Threading.Tasks; +using Streetwriters.Common.Messages; +using WampSharp.V2; +using WampSharp.V2.Client; + +namespace Streetwriters.Common.Helpers +{ + public class WampHelper + { + public static async Task OpenWampChannelAsync(string server, string realmName) + { + DefaultWampChannelFactory channelFactory = new(); + + IWampChannel channel = channelFactory.CreateJsonChannel(server, realmName); + + await channel.Open(); + + return channel.RealmProxy; + } + + public static void PublishMessage(IWampRealmProxy realm, string topicName, T message) + { + var subject = realm.Services.GetSubject(topicName); + subject.OnNext(message); + } + + public static void PublishMessages(IWampRealmProxy realm, string topicName, IEnumerable messages) + { + var subject = realm.Services.GetSubject(topicName); + foreach (var message in messages) + subject.OnNext(message); + } + } } \ No newline at end of file diff --git a/Streetwriters.Common/WampServers.cs b/Streetwriters.Common/WampServers.cs index c3269b9..69feedb 100644 --- a/Streetwriters.Common/WampServers.cs +++ b/Streetwriters.Common/WampServers.cs @@ -1,126 +1,140 @@ -/* -This file is part of the Notesnook Sync Server project (https://notesnook.com/) - -Copyright (C) 2023 Streetwriters (Private) Limited - -This program is free software: you can redistribute it and/or modify -it under the terms of the Affero GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -Affero GNU General Public License for more details. - -You should have received a copy of the Affero GNU General Public License -along with this program. If not, see . -*/ - -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Reactive.Subjects; -using System.Threading.Tasks; -using Streetwriters.Common.Helpers; -using Streetwriters.Common.Interfaces; -using WampSharp.V2.Client; - -namespace Streetwriters.Common -{ - public class WampServer where T : new() - { - private readonly ConcurrentDictionary Channels = new(); - - public string Endpoint { get; set; } - public string Address { get; set; } - public T Topics { get; set; } = new T(); - public string Realm { get; set; } - - private async Task GetChannelAsync(string topic) - { - if (!Channels.TryGetValue(topic, out IWampRealmProxy channel) || !channel.Monitor.IsConnected) - { - channel = await WampHelper.OpenWampChannelAsync(Address, Realm); - Channels.AddOrUpdate(topic, (key) => channel, (key, old) => channel); - } - return channel; - } - - public async Task GetServiceAsync(string topic) where V : class - { - var channel = await GetChannelAsync(topic); - return channel.Services.GetCalleeProxy(); - } - - public async Task PublishMessageAsync(string topic, V message) - { - try - { - IWampRealmProxy channel = await GetChannelAsync(topic); - WampHelper.PublishMessage(channel, topic, message); - } - catch (Exception ex) - { - await Slogger>.Error(nameof(PublishMessageAsync), ex.ToString()); - throw ex; - } - } - } - - public class WampServers - { - public static WampServer MessengerServer { get; } = new WampServer - { - Endpoint = "/wamp", - Address = $"{Servers.MessengerServer.WS()}/wamp", - Realm = "messages", - }; - - public static WampServer SubscriptionServer { get; } = new WampServer - { - Endpoint = "/wamp", - Address = $"{Servers.SubscriptionServer.WS()}/wamp", - Realm = "messages", - }; - - public static WampServer IdentityServer { get; } = new WampServer - { - Endpoint = "/wamp", - Address = $"{Servers.IdentityServer.WS()}/wamp", - Realm = "messages", - }; - - public static WampServer NotesnookServer { get; } = new WampServer - { - Endpoint = "/wamp", - Address = $"{Servers.NotesnookAPI.WS()}/wamp", - Realm = "messages", - }; - } - - public class MessengerServerTopics - { - public const string SendSSETopic = "co.streetwriters.sse.send"; - } - - public class SubscriptionServerTopics - { - public const string UserSubscriptionServiceTopic = "co.streetwriters.subscriptions.subscriptions"; - - public const string CreateSubscriptionTopic = "co.streetwriters.subscriptions.create"; +/* +This file is part of the Notesnook Sync Server project (https://notesnook.com/) + +Copyright (C) 2023 Streetwriters (Private) Limited + +This program is free software: you can redistribute it and/or modify +it under the terms of the Affero GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +Affero GNU General Public License for more details. + +You should have received a copy of the Affero GNU General Public License +along with this program. If not, see . +*/ + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Reactive.Subjects; +using System.Threading.Tasks; +using Streetwriters.Common.Helpers; +using Streetwriters.Common.Interfaces; +using WampSharp.V2.Client; + +namespace Streetwriters.Common +{ + public class WampServer where T : new() + { + private readonly ConcurrentDictionary Channels = new(); + + public string Endpoint { get; set; } + public string Address { get; set; } + public T Topics { get; set; } = new T(); + public string Realm { get; set; } + + private async Task GetChannelAsync(string topic) + { + if (!Channels.TryGetValue(topic, out IWampRealmProxy channel) || !channel.Monitor.IsConnected) + { + channel = await WampHelper.OpenWampChannelAsync(Address, Realm); + Channels.AddOrUpdate(topic, (key) => channel, (key, old) => channel); + } + return channel; + } + + public async Task GetServiceAsync(string topic) where V : class + { + var channel = await GetChannelAsync(topic); + return channel.Services.GetCalleeProxy(); + } + + public async Task PublishMessageAsync(string topic, V message) + { + try + { + IWampRealmProxy channel = await GetChannelAsync(topic); + WampHelper.PublishMessage(channel, topic, message); + } + catch (Exception ex) + { + await Slogger>.Error(nameof(PublishMessageAsync), ex.ToString()); + throw ex; + } + } + + public async Task PublishMessagesAsync(string topic, IEnumerable messages) + { + try + { + IWampRealmProxy channel = await GetChannelAsync(topic); + WampHelper.PublishMessages(channel, topic, messages); + } + catch (Exception ex) + { + await Slogger>.Error(nameof(PublishMessagesAsync), ex.ToString()); + throw ex; + } + } + } + + public class WampServers + { + public static WampServer MessengerServer { get; } = new WampServer + { + Endpoint = "/wamp", + Address = $"{Servers.MessengerServer.WS()}/wamp", + Realm = "messages", + }; + + public static WampServer SubscriptionServer { get; } = new WampServer + { + Endpoint = "/wamp", + Address = $"{Servers.SubscriptionServer.WS()}/wamp", + Realm = "messages", + }; + + public static WampServer IdentityServer { get; } = new WampServer + { + Endpoint = "/wamp", + Address = $"{Servers.IdentityServer.WS()}/wamp", + Realm = "messages", + }; + + public static WampServer NotesnookServer { get; } = new WampServer + { + Endpoint = "/wamp", + Address = $"{Servers.NotesnookAPI.WS()}/wamp", + Realm = "messages", + }; + } + + public class MessengerServerTopics + { + public const string SendSSETopic = "co.streetwriters.sse.send"; + } + + public class SubscriptionServerTopics + { + public const string UserSubscriptionServiceTopic = "co.streetwriters.subscriptions.subscriptions"; + + public const string CreateSubscriptionTopic = "co.streetwriters.subscriptions.create"; public const string CreateSubscriptionV2Topic = "co.streetwriters.subscriptions.v2.create"; - public const string DeleteSubscriptionTopic = "co.streetwriters.subscriptions.delete"; - } - - public class IdentityServerTopics - { - public const string UserAccountServiceTopic = "co.streetwriters.identity.users"; - public const string ClearCacheTopic = "co.streetwriters.identity.clear_cache"; - public const string DeleteUserTopic = "co.streetwriters.identity.delete_user"; - } - - public class NotesnookServerTopics - { - } + public const string DeleteSubscriptionTopic = "co.streetwriters.subscriptions.delete"; + } + + public class IdentityServerTopics + { + public const string UserAccountServiceTopic = "co.streetwriters.identity.users"; + public const string ClearCacheTopic = "co.streetwriters.identity.clear_cache"; + public const string DeleteUserTopic = "co.streetwriters.identity.delete_user"; + } + + public class NotesnookServerTopics + { + } } \ No newline at end of file