可观察对象,它读取流,直到流结束或出现错误

本文关键字:结束 错误 观察 读取 对象 | 更新日期: 2023-09-27 17:54:25

使用响应式扩展,我如何创建一个可观察对象,它将不断调用流上的Read方法并将结果传播给它的观察者?

或者这是完全错误的处理事情的方式?我应该实现我自己的IObservable吗?

可观察对象,它读取流,直到流结束或出现错误

我从来没有遇到过实现我自己的可观察对象有意义的情况。

试试这个:

public static IObservable<byte[]> ObservableRead(Stream stream, int bufferSize)
{
    return Observable.Create<byte[]>(o =>
    {
        var buffer = new byte[bufferSize];
        var read = 0;
        try
        {
            while (true)
            {
                read = stream.Read(buffer, 0, buffer.Length);
                if (read == 0)
                {
                    break;
                }
                var results = buffer.Take(read).ToArray();
                //Always return a copy
                //never the buffer for concurrency's sake.
                o.OnNext(results);
            }
        }
        catch (Exception ex)
        {
            o.OnError(ex);
        }
        finally
        {
            o.OnCompleted();
        }
        return Disposable.Empty;
    });
}
相关文章: