使用Rx恢复异步读取

本文关键字:读取 异步 恢复 Rx 使用 | 更新日期: 2023-09-27 18:04:41

今天又是一个Rx问题

简单地说,我正在使用异步IO来操作流。然而,我们都知道,当使用异步读取时,我们不一定得到我们想要的所有字节——因此在XAsync方法上返回int。我想知道我怎么能告诉Rx Observable重试读取,没有从流中读取正确的字节数,并由正确的量偏移?

目前,我有这个,但不知道如何设置偏移参数在ReadAsync.

    private IDisposable _streamMessageContract;
    private readonly byte[] _readBuffer = new byte[8024];
    public void Start()
    {
        // Subscribe to the stream for dataz
        _streamMessageContract = Observable.FromAsync<int>(() => _stream.ReadAsync(_readBuffer, 0, _readBuffer.Length))
            .Repeat()
            .Subscribe(
                y => _RawBytesReceived(_readBuffer, y),
               ex => _Exception(ex),
               () => _StreamClosed());
    }
    #region Helpers
    private void _RawBytesReceived(byte[] bytes, int actualBytesRead)
    {
    }
    private void _StreamClosed()
    {
    }
    private void _Exception(Exception e)
    {
    }
    #endregion

使用Rx恢复异步读取

最简单的方法是在闭包中使用局部变量,加上Defer来强制可观察对象在每次迭代时重新计算其函数。

假设您想在当前块结束后继续读取下一个块,您将以这样的内容结束…

// An observable that will yield the next block in the stream.
// If the resulting block length is less than blockSize, then the
// end of the stream was reached.
private IObservable<byte[]> ReadBlock(Stream stream, int blockSize)
{
    // Wrap the whole thing in Defer() so that we
    // get a new memory block each time this observable is subscribed.
    return Observable.Defer(() =>
    {
        var block = new byte[blockSize];
        int numRead = 0;
        return Observable
            .Defer(() => Observable.FromAsync<int>(() =>
                stream.ReadAsync(block, numRead, block.Length - numRead)))
            .Do(y -=> numRead += y)
            .Repeat()
            .TakeWhile(y => y > 0 && numRead != blockSize) // take until EOF or entire block is read
            .LastOrDefaultAsync()  // only emit the final result of this operation
            .Select(_ =>
            {
                // If we hit EOF, then resize our result array to
                // the number of bytes actually read
                if (numRead < blockSize)
                {
                    block = block.Take(numRead).ToArray();
                }
                return block;
            });
    });
}
public void Start()
{
    // Subscribe to the stream for dataz.
    // Just keep reading blocks until
    // we get the final (under-sized) block
    _streamMessageContract = ReadBlock(stream, 8024)
        .Repeat()
        .TakeWhile(block => block.Length == 8024) // if block is small then that means we hit the end of the stream
        .Subscribe(
           block => _RawBytesReceived(block),
           ex => _Exception(ex),
           () => _StreamClosed());
}