在Azure服务结构服务中设置信号和服务总线

本文关键字:服务 信号 设置 总线 Azure 结构 | 更新日期: 2023-09-27 18:08:17

我正在将现有的云服务WorkerRole作为无状态服务移植到Service Fabric。原始的云服务使用SignalR和服务总线(作为SignalR背板),向任何侦听的客户端发送通知。有一个Startup类可以做一些设置:

class Startup
{
    public void Configuration(IAppBuilder app)
    {
        String connectionString = "Endpoint=sb://[name].servicebus.windows.net/;SharedSecretIssuer=owner;SharedSecretValue=[key]";
        GlobalHost.DependencyResolver.UseServiceBus(connectionString, "InSys");
        app.MapSignalR();
        Notifications.Hub = GlobalHost.ConnectionManager.GetHubContext<MyHub>();
    }
}

在WorkerRole的OnStart()方法中,我用:

var endpoint = RoleEnvironment.CurrentRoleInstance.InstanceEndpoints["HttpEndpoint"];
var baseUri = $"{endpoint.Protocol}://{endpoint.IPEndpoint}";
var app = WebApp.Start<Startup>(new StartOptions(url: baseUri));

对于服务结构内的无状态服务,这(即连接到SignalR服务总线背板)是如何完成的?

在Azure服务结构服务中设置信号和服务总线

在https://github.com/marcinbudny/SignalRSelfHostScaleOut的帮助下(这是使用Redis的scaleout的一个例子),我想我已经搞定了。

在ServiceManifest.xml中,我添加了以下端点:

<Endpoint Protocol="http" Name="ServiceEndpoint" Type="Input" Port="8322" />

我还添加了一个Startup类:

public static class Startup
{
    public static void ConfigureApp(IAppBuilder app)
    {
        String connectionString = "Endpoint=sb://[name].servicebus.windows.net/;SharedSecretIssuer=owner;SharedSecretValue=[value]";
        GlobalHost.DependencyResolver.UseServiceBus(connectionString, "InSys");
        app.MapSignalR();
        Notifications.Hub = GlobalHost.ConnectionManager.GetHubContext<InSysMainHub>();
    }
}

还添加了一个OwinCommunicationListener类:

public class OwinCommunicationListener : ICommunicationListener
{
    private readonly ServiceEventSource eventSource;
    private readonly Action<IAppBuilder> startup;
    private readonly ServiceContext serviceContext;
    private readonly string endpointName;
    private readonly string appRoot;
    private IDisposable webApp;
    private string publishAddress;
    private string listeningAddress;
    public OwinCommunicationListener(Action<IAppBuilder> startup, ServiceContext serviceContext, ServiceEventSource eventSource, string endpointName)
        : this(startup, serviceContext, eventSource, endpointName, null)
    {
    }
    public OwinCommunicationListener(Action<IAppBuilder> startup, ServiceContext serviceContext, ServiceEventSource eventSource, string endpointName, string appRoot)
    {
        if (startup == null)
        {
            throw new ArgumentNullException(nameof(startup));
        }
        if (serviceContext == null)
        {
            throw new ArgumentNullException(nameof(serviceContext));
        }
        if (endpointName == null)
        {
            throw new ArgumentNullException(nameof(endpointName));
        }
        if (eventSource == null)
        {
            throw new ArgumentNullException(nameof(eventSource));
        }
        this.startup = startup;
        this.serviceContext = serviceContext;
        this.endpointName = endpointName;
        this.eventSource = eventSource;
        this.appRoot = appRoot;
    }

    public Task<string> OpenAsync(CancellationToken cancellationToken)
    {
        var serviceEndpoint = this.serviceContext.CodePackageActivationContext.GetEndpoint(this.endpointName);
        var protocol = serviceEndpoint.Protocol;
        int port = serviceEndpoint.Port;
        if (this.serviceContext is StatefulServiceContext)
        {
            StatefulServiceContext statefulServiceContext = (StatefulServiceContext) serviceContext;
            listeningAddress = string.Format(
                CultureInfo.InvariantCulture,
                "{0}://+:{1}/{2}{3}/{4}/{5}",
                protocol,
                port,
                string.IsNullOrWhiteSpace(appRoot)
                    ? string.Empty
                    : appRoot.TrimEnd('/') + '/',
                statefulServiceContext.PartitionId,
                statefulServiceContext.ReplicaId,
                Guid.NewGuid());
        }
        else if (serviceContext is StatelessServiceContext)
        {
            listeningAddress = string.Format(
                CultureInfo.InvariantCulture,
                "{0}://+:{1}/{2}",
                protocol,
                port,
                string.IsNullOrWhiteSpace(appRoot)
                    ? string.Empty
                    : appRoot.TrimEnd('/') + '/');
        }
        else
        {
            throw new InvalidOperationException();
        }
        publishAddress = listeningAddress.Replace("+", FabricRuntime.GetNodeContext().IPAddressOrFQDN);
        try
        {
            eventSource.Message("Starting web server on " + listeningAddress);
            webApp = WebApp.Start(listeningAddress, appBuilder => startup.Invoke(appBuilder));
            eventSource.Message("Listening on " + this.publishAddress);
            return Task.FromResult(this.publishAddress);
        }
        catch (Exception ex)
        {
            eventSource.Message("Web server failed to open endpoint {0}. {1}", this.endpointName, ex.ToString());
            StopWebServer();
            throw;
        }
    }
    public Task CloseAsync(CancellationToken cancellationToken)
    {
        this.eventSource.Message("Closing web server on endpoint {0}", this.endpointName);
        this.StopWebServer();
        return Task.FromResult(true);
    }
    public void Abort()
    {
        this.eventSource.Message("Aborting web server on endpoint {0}", this.endpointName);
        this.StopWebServer();
    }
    private void StopWebServer()
    {
        if (this.webApp != null)
        {
            try
            {
                this.webApp.Dispose();
            }
            catch (ObjectDisposedException)
            {
                // no-op
            }
        }
    }
}

最后,我将无状态服务代码中的createserviceinstancelistener方法更改为:

protected override IEnumerable<ServiceInstanceListener> CreateServiceInstanceListeners()
    {
        return new[]
        {
            new ServiceInstanceListener(serviceContext => new OwinCommunicationListener(Startup.ConfigureApp, serviceContext, ServiceEventSource.Current, "ServiceEndpoint"))
        };
    }

使用Owin侦听器创建无状态服务。然后在启动时配置信号r和背板(服务总线或sql)。理想情况下,您将面临的问题是协商(Signalr客户端与服务器之间的握手)。此时尝试配置跨域请求,持久连接的示例代码如下所示。

还要注意appBuilder这一行。使用aesdataprotectorprovider ("Your Key"),因为它很重要。这样做的结果是,大多数情况下您都无法获得HTTP 400连接。这是因为SignalR在握手时至少会发出2个请求,而这些请求通常会到达两台不同的机器。

感谢marcin budny的解释。

var config = new HttpConfiguration();
// Configure your origins as required.
var cors = new EnableCorsAttribute("*", "*", "*");
config.EnableCors(cors);
FormatterConfig.ConfigureFormatters(config.Formatters);
RouteConfig.RegisterRoutes(config.Routes);
appBuilder.UseWebApi(config);
GlobalHost.DependencyResolver.UseServiceBus("yourconnection string comes here", "signalrbackplaneserver");
appBuilder.UseAesDataProtectorProvider("some password");
appBuilder.Map("/echo", map =>
{
                map.UseCors(CorsOptions.AllowAll).RunSignalR<MyEndPoint>();
});