可观察对象,它读取流,直到流结束或出现错误
本文关键字:结束 错误 观察 读取 对象 | 更新日期: 2023-09-27 17:54:25
使用响应式扩展,我如何创建一个可观察对象,它将不断调用流上的Read方法并将结果传播给它的观察者?
或者这是完全错误的处理事情的方式?我应该实现我自己的IObservable吗?
我从来没有遇到过实现我自己的可观察对象有意义的情况。
试试这个:
public static IObservable<byte[]> ObservableRead(Stream stream, int bufferSize)
{
return Observable.Create<byte[]>(o =>
{
var buffer = new byte[bufferSize];
var read = 0;
try
{
while (true)
{
read = stream.Read(buffer, 0, buffer.Length);
if (read == 0)
{
break;
}
var results = buffer.Take(read).ToArray();
//Always return a copy
//never the buffer for concurrency's sake.
o.OnNext(results);
}
}
catch (Exception ex)
{
o.OnError(ex);
}
finally
{
o.OnCompleted();
}
return Disposable.Empty;
});
}