diff --git a/Streetwriters.Common/Accessors/WampServiceAccessor.cs b/Streetwriters.Common/Accessors/WampServiceAccessor.cs new file mode 100644 index 0000000..b60b4de --- /dev/null +++ b/Streetwriters.Common/Accessors/WampServiceAccessor.cs @@ -0,0 +1,48 @@ +/* +This file is part of the Notesnook Sync Server project (https://notesnook.com/) + +Copyright (C) 2023 Streetwriters (Private) Limited + +This program is free software: you can redistribute it and/or modify +it under the terms of the Affero GNU General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +Affero GNU General Public License for more details. + +You should have received a copy of the Affero GNU General Public License +along with this program. If not, see . +*/ + +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Hosting; +using Streetwriters.Common.Interfaces; + +namespace Streetwriters.Common.Accessors +{ + public class WampServiceAccessor(Server server) : IHostedService + { + public IUserAccountService UserAccountService { get; set; } + public IUserSubscriptionService? UserSubscriptionService { get; set; } + + public async Task StartAsync(CancellationToken cancellationToken) + { + await InitAsync(); + } + + private async Task InitAsync() + { + this.UserAccountService = await WampServers.IdentityServer.GetServiceAsync(InitAsync); + if (!Constants.IS_SELF_HOSTED && server != Servers.SubscriptionServer) + { + this.UserSubscriptionService = await WampServers.SubscriptionServer.GetServiceAsync(InitAsync); + } + } + + public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask; + } +} \ No newline at end of file diff --git a/Streetwriters.Common/Extensions/AppBuilderExtensions.cs b/Streetwriters.Common/Extensions/AppBuilderExtensions.cs index b9da4b8..6b2121c 100644 --- a/Streetwriters.Common/Extensions/AppBuilderExtensions.cs +++ b/Streetwriters.Common/Extensions/AppBuilderExtensions.cs @@ -26,11 +26,11 @@ using Microsoft.AspNetCore.Hosting; using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.HttpOverrides; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; using WampSharp.AspNetCore.WebSockets.Server; using WampSharp.Binding; using WampSharp.V2; using WampSharp.V2.Realm; -using Microsoft.Extensions.Hosting; namespace Streetwriters.Common.Extensions { @@ -55,9 +55,9 @@ namespace Streetwriters.Common.Extensions return app; } - public static IApplicationBuilder UseWamp(this IApplicationBuilder app, WampServer server, Action> action) where T : new() + public static IApplicationBuilder UseWamp(this IApplicationBuilder app, WampServer server, Action action) { - WampHost host = new WampHost(); + WampHost host = new(); app.Map(server.Endpoint, builder => { @@ -81,10 +81,8 @@ namespace Streetwriters.Common.Extensions public static T GetScopedService(this IApplicationBuilder app) where T : notnull { - using (var scope = app.ApplicationServices.CreateScope()) - { - return scope.ServiceProvider.GetRequiredService(); - } + using var scope = app.ApplicationServices.CreateScope(); + return scope.ServiceProvider.GetRequiredService(); } public static IApplicationBuilder UseForwardedHeadersWithKnownProxies(this IApplicationBuilder app, IWebHostEnvironment env, string forwardedForHeaderName = null) diff --git a/Streetwriters.Common/Extensions/ServiceCollectionServiceExtensions.cs b/Streetwriters.Common/Extensions/ServiceCollectionServiceExtensions.cs index be29d13..8635a62 100644 --- a/Streetwriters.Common/Extensions/ServiceCollectionServiceExtensions.cs +++ b/Streetwriters.Common/Extensions/ServiceCollectionServiceExtensions.cs @@ -18,6 +18,7 @@ along with this program. If not, see . */ using Microsoft.Extensions.DependencyInjection; +using Streetwriters.Common.Accessors; using Streetwriters.Data.DbContexts; using Streetwriters.Data.Repositories; @@ -25,6 +26,13 @@ namespace Streetwriters.Common.Extensions { public static class ServiceCollectionServiceExtensions { + public static IServiceCollection AddWampServiceAccessor(this IServiceCollection services, Server server) + { + services.AddSingleton((provider) => new WampServiceAccessor(server)); + services.AddHostedService(provider => provider.GetRequiredService()); + return services; + } + public static IServiceCollection AddRepository(this IServiceCollection services, string collectionName, string database) where T : class { services.AddSingleton((provider) => MongoDbContext.GetMongoCollection(provider.GetRequiredService(), database, collectionName)); diff --git a/Streetwriters.Common/Helpers/WampHelper.cs b/Streetwriters.Common/Helpers/WampHelper.cs index b641035..529c64a 100644 --- a/Streetwriters.Common/Helpers/WampHelper.cs +++ b/Streetwriters.Common/Helpers/WampHelper.cs @@ -28,15 +28,28 @@ namespace Streetwriters.Common.Helpers { public class WampHelper { - public static async Task OpenWampChannelAsync(string server, string realmName) + public static async Task OpenWampChannelAsync(string server, string realmName) { DefaultWampChannelFactory channelFactory = new(); IWampChannel channel = channelFactory.CreateJsonChannel(server, realmName); - await channel.Open(); + var isConnected = false; + while (!isConnected) + { + try + { + await channel.Open(); + isConnected = true; + } + catch + { + await Task.Delay(5000); + continue; + } + } - return channel.RealmProxy; + return channel; } public static void PublishMessage(IWampRealmProxy realm, string topicName, T message) diff --git a/Streetwriters.Common/WampServers.cs b/Streetwriters.Common/WampServers.cs index 4fab04e..bf182e1 100644 --- a/Streetwriters.Common/WampServers.cs +++ b/Streetwriters.Common/WampServers.cs @@ -24,72 +24,74 @@ using System.Reactive.Subjects; using System.Threading.Tasks; using Streetwriters.Common.Helpers; using Streetwriters.Common.Interfaces; +using WampSharp.V2; using WampSharp.V2.Client; namespace Streetwriters.Common { - public class WampServer where T : new() + public class WampServer { - private readonly ConcurrentDictionary Channels = new(); - public required string Endpoint { get; set; } public required string Address { get; set; } - public T Topics { get; set; } = new T(); public required string Realm { get; set; } - private async Task GetChannelAsync(string topic) + private IWampChannel? channel = null; + private async Task GetChannelAsync() { - if (!Channels.TryGetValue(topic, out IWampRealmProxy? channel) || channel == null || !channel.Monitor.IsConnected) - { - channel = await WampHelper.OpenWampChannelAsync(Address, Realm); - Channels.AddOrUpdate(topic, (key) => channel, (key, old) => channel); - } + if (channel != null && channel.RealmProxy.Monitor.IsConnected) + return channel; + channel = await WampHelper.OpenWampChannelAsync(Address, Realm); return channel; } - public async Task GetServiceAsync(string topic) where V : class + public async Task GetServiceAsync(Func? onDisconnect = null) where V : class { - var channel = await GetChannelAsync(topic); - return channel.Services.GetCalleeProxy(); + var channel = await GetChannelAsync(); + if (onDisconnect != null) + { + channel.RealmProxy.Monitor.ConnectionBroken += (s, e) => onDisconnect(); + channel.RealmProxy.Monitor.ConnectionError += (s, e) => onDisconnect(); + } + return channel.RealmProxy.Services.GetCalleeProxy(); } public async Task PublishMessageAsync(string topic, V message) { - IWampRealmProxy channel = await GetChannelAsync(topic); - WampHelper.PublishMessage(channel, topic, message); + IWampChannel channel = await GetChannelAsync(); + WampHelper.PublishMessage(channel.RealmProxy, topic, message); } public async Task PublishMessagesAsync(string topic, IEnumerable messages) { - IWampRealmProxy channel = await GetChannelAsync(topic); - WampHelper.PublishMessages(channel, topic, messages); + IWampChannel channel = await GetChannelAsync(); + WampHelper.PublishMessages(channel.RealmProxy, topic, messages); } } public class WampServers { - public static WampServer MessengerServer { get; } = new WampServer + public static WampServer MessengerServer { get; } = new WampServer { Endpoint = "/wamp", Address = $"{Servers.MessengerServer.WS()}/wamp", Realm = "messages", }; - public static WampServer SubscriptionServer { get; } = new WampServer + public static WampServer SubscriptionServer { get; } = new WampServer { Endpoint = "/wamp", Address = $"{Servers.SubscriptionServer.WS()}/wamp", Realm = "messages", }; - public static WampServer IdentityServer { get; } = new WampServer + public static WampServer IdentityServer { get; } = new WampServer { Endpoint = "/wamp", Address = $"{Servers.IdentityServer.WS()}/wamp", Realm = "messages", }; - public static WampServer NotesnookServer { get; } = new WampServer + public static WampServer NotesnookServer { get; } = new WampServer { Endpoint = "/wamp", Address = $"{Servers.NotesnookAPI.WS()}/wamp", @@ -97,28 +99,23 @@ namespace Streetwriters.Common }; } - public class MessengerServerTopics + public struct MessengerServerTopics { public const string SendSSETopic = "co.streetwriters.sse.send"; } - public class SubscriptionServerTopics + public struct SubscriptionServerTopics { public const string UserSubscriptionServiceTopic = "co.streetwriters.subscriptions.subscriptions"; - public const string CreateSubscriptionTopic = "co.streetwriters.subscriptions.create"; public const string CreateSubscriptionV2Topic = "co.streetwriters.subscriptions.v2.create"; public const string DeleteSubscriptionTopic = "co.streetwriters.subscriptions.delete"; } - public class IdentityServerTopics + public struct IdentityServerTopics { public const string UserAccountServiceTopic = "co.streetwriters.identity.users"; public const string ClearCacheTopic = "co.streetwriters.identity.clear_cache"; public const string DeleteUserTopic = "co.streetwriters.identity.delete_user"; } - - public class NotesnookServerTopics - { - } } \ No newline at end of file