用响应式扩展观察传入的websocket消息
本文关键字:websocket 消息 观察 响应 扩展 | 更新日期: 2023-09-27 18:03:37
我想使用linq来处理通过websocket连接接收的事件。这是我目前所看到的:
private static void Main()
{
string WsEndpoint = "wss://push.planetside2.com/streaming?environment=ps2&service-id=s:quicktesting";
using (WebSocket ws = new WebSocket(WsEndpoint))
{
ws.OnMessage += Ws_OnMessage;
ws.Connect();
Console.ReadKey();
ws.Close();
}
}
private static void Ws_OnMessage(object sender, MessageEventArgs e)
{
Console.WriteLine(e.Data);
}
第一个让我困惑的想法是如何把ws.OnMessage
变成某种事件流。我在网上找不到任何例子来观察带有响应式扩展的外部事件源。我打算将消息解析为json对象,然后过滤和聚合它们。
有人可以提供一个例子,从websocket消息创建一个可观察对象,并订阅它?
编辑:最终工作代码
与所选答案的唯一区别是,我在将websocket传递给Observable.Using
之前初始化了它
//-------------------------------------------------------
// Create websocket connection
//-------------------------------------------------------
const string wsEndpoint = "wss://push.planetside2.com/streaming?environment=ps2&service-id=s:quicktesting";
WebSocket socket = new WebSocket(wsEndpoint);
//-------------------------------------------------------
// Create an observable by wrapping ws.OnMessage
//-------------------------------------------------------
var globalEventStream = Observable
.Using(
() => socket,
ws =>
Observable
.FromEventPattern<EventHandler<MessageEventArgs>, MessageEventArgs>(
handler => ws.OnMessage += handler,
handler => ws.OnMessage -= handler));
//---------------------------------------------------------
// Subscribe to globalEventStream
//---------------------------------------------------------
IDisposable subscription = globalEventStream.Subscribe(ep =>
{
Console.WriteLine("Event Recieved");
Console.WriteLine(ep.EventArgs.Data);
});
//----------------------------------------------------------
// Send message over websocket
//----------------------------------------------------------
socket.Connect();
socket.Send("test message");
// When finished, close the connection.
socket.Close();
你应该这样设置你的observable:
var observable =
Observable
.Using(
() => new WebSocket(WsEndpoint),
ws =>
Observable
.FromEventPattern<EventHandler<MessageEventArgs>, MessageEventArgs>(
handler => ws.OnMessage += handler,
handler => ws.OnMessage -= handler));
这将正确创建套接字,然后在可观察对象被订阅时观察事件。当订阅被处置时,它将正确地从事件分离并处置套接字。
observable
的类型为IObservable<EventPattern<MessageEventArgs>>
。你可以这样使用这个可观察对象:
IDisposable subscription = observable.Subscribe(ep =>
{
Console.WriteLine(ep.EventArgs.Data);
});
感谢您发布NuGet参考。
下面是工作代码:
const string WsEndpoint = "wss://push.planetside2.com/streaming?environment=ps2&service-id=s:quicktesting";
Console.WriteLine("Defining Observable:");
IObservable<EventPattern<WebSocketSharp.MessageEventArgs>> observable =
Observable
.Using(
() =>
{
var ws = new WebSocketSharp.WebSocket(WsEndpoint);
ws.Connect();
return ws;
},
ws =>
Observable
.FromEventPattern<EventHandler<WebSocketSharp.MessageEventArgs>, WebSocketSharp.MessageEventArgs>(
handler => ws.OnMessage += handler,
handler => ws.OnMessage -= handler));
Console.WriteLine("Subscribing to Observable:");
IDisposable subscription = observable.Subscribe(ep =>
{
Console.WriteLine("Event Recieved");
Console.WriteLine(ep.EventArgs.Data);
});
Console.WriteLine("Writing to Source:");
using (var source = new WebSocketSharp.WebSocket(WsEndpoint))
{
source.Connect();
source.Send("test");
}