使用RX创建IO绑定的可观察对象
本文关键字:观察 对象 绑定 RX 创建 IO 使用 | 更新日期: 2023-09-27 18:00:53
我有一个工作线程正在进行阻塞调用(ReadFrame
(以从套接字读取传入数据(IO绑定(。线程运行将数据馈送到CCD_,消费者可以观察到。
private void ReadLoop()
{
while (!IsDisposed)
{
var frame = _Socket.ReadFrame();
_ReceivedFrames.OnNext(frame);
}
}
我想知道是否有一种更RX的方式可以做到这一点。
以下是我的一个尝试(玩具示例(:
var src = Observable
.Repeat(Unit.Default)
.Select(_ =>
{
Thread.Sleep(1000); // simulated blocking ReadFrame call
return "data read from socket";
})
.SubscribeOn(ThreadPoolScheduler.Instance) // avoid blocking when subscribing
.ObserveOn(NewThreadScheduler.Default) // spin up new thread (?)
.Publish()
.RefCount();
var d = src.Subscribe(s => s.Dump()); // simulated consumer
Console.ReadLine(); // simulated main running code
d.Dispose(); // tear down
我正在努力正确使用ObserveOn
、SubscribeOn
和调度器。这个玩具示例似乎有效,但我不确定线程的生存期是否得到了正确的管理。
读卡器线程是否已通过d.Dispose()
调用关闭
我需要创建一个新线程吗
我应该使用Observable.Create
吗?怎样
以下是@Enigmatility:要求的额外信息
ReadLoop()
方法是符合以下接口的类的一部分:
public interface ICanSocket : IDisposable
{
IObservable<CanFrame> ReceivedFrames { get; }
IObserver<CanFrame> FramesToSend { get; }
}
其成员_Socket
在父ICanSocket
被布置时被布置(闭合(。
实现这一点的最"Rxy"的方法是使用Rxx,它有可观察的异步I/O方法。
看来你主要关心的是:
- 订阅时,不要阻塞订阅线程(也就是在后台线程上运行I/O线程(
- 当调用方取消订阅时,停止I/O线程
解决这些问题的一种方法是只使用异步创建方法:
// just use Task.Run to "background" the work
var src = Observable
.Create<CanFrame>((observer, cancellationToken) => Task.Run(() =>
{
while (!cancellationToken.IsCancellationRequested)
{
var frame = _Socket.ReadFrame();
if (frame == null) // end of stream?
{
// will send a Completed event
return;
}
observer.OnNext(frame);
}
}));
var d = src.Subscribe(s => s.Dump());
Console.ReadLine();
d.Dispose();