StreamInsight:为RemoteObservable使用本地观察者
本文关键字:观察者 RemoteObservable StreamInsight | 更新日期: 2023-09-27 18:18:13
我一直在使用StreamInsight v2.3和它提供的新Rx功能。我正在研究事件溯源实现中SI的使用。我已经调整了一些MSDN示例代码,以获得以下内容:
服务器进程的代码:
using (var server = Server.Create("Default"))
{
var host = new ServiceHost(server.CreateManagementService());
host.AddServiceEndpoint(typeof(IManagementService), new WSHttpBinding(SecurityMode.Message), "http://localhost/SIDemo");
host.Open();
var myApp = server.CreateApplication("SIDemoApp");
var mySource = myApp.DefineObservable(() => Observable.Interval(TimeSpan.FromSeconds(1))).ToPointStreamable(x => PointEvent.CreateInsert(DateTimeOffset.Now, x), AdvanceTimeSettings.StrictlyIncreasingStartTime);
mySource.Deploy("demoSource");
Console.WriteLine("Hit enter to stop.");
Console.ReadLine();
host.Close();
}
客户端进程代码:
using (var server = Server.Connect(new System.ServiceModel.EndpointAddress(@"http://localhost/SIDemo")))
{
var myApp = server.Applications["SIDemoApp"];
var mySource = myApp.GetObservable<long>("demoSource");
using (var mySink = mySource.Subscribe(x => Console.WriteLine("Output - {0}", x)))
{
Console.WriteLine("Hit enter to stop.");
Console.ReadLine();
}
}
尝试运行此命令会产生以下错误:
我开始使用的示例代码定义了一个观察者和接收器,并将其绑定到StreamInsight服务器中。我试着在客户端过程中保持观察者。是否有一种方法可以在客户端应用程序中为远程StreamInsight源设置观察者?这是否必须通过客户机观察到的服务器中的WCF端点之类的东西来完成?从远程读取System.Reactive.Linq.IQbservable 1(系统。不支持Int64]'。使用'Microsoft.ComplexEventProcessing.Linq.RemoteProvider.Bind'方法使用远程观察器从源代码读取。
实际上错误是指向解决方案的。你需要'bind'到源代码。
请检查下面的代码片段:
//Get SOURCE from server
var serverSource = myApp.GetStreamable<long>("demoSource");
//SINK for 'demoSource'
var sinkToBind = myApp.DefineObserver<long>( ()=> Observer.Create<long>( value => Console.WriteLine( "From client : " + value)));
//BINDING
var processForSink = serverSource.Bind(sinkToBind).Run("processForSink");
还请注意,sink将在服务器上运行,而不是像我最初猜测的那样,它将在客户端上运行。如果你同时查看服务器和客户端的控制台应用程序,控制台输出将写入服务器应用程序。
如果甚至有一种方法在客户端运行sink,我不知道,我也想知道。