异步.从NetworkStream块中读取60秒.(Rx)

本文关键字:60秒 Rx 读取 NetworkStream 异步 | 更新日期: 2023-09-27 18:15:46

当流中没有数据时,我尝试读取流块60秒。当有一些数据时,读取按预期完成。我如何重写下面的代码,使它只能读取时流。DataAvailable是真的吗?

我想我需要一些类似Observable的东西。而(dataAvailableObserver AsyncRead) . .

    public static IObservable<byte[]> AsyncRead(this NetworkStream stream, int bufferSize)
    {
        return Observable.Create<byte[]>(
            o => Observable.Defer(() => AsyncReadChunk(stream, bufferSize))
                     .Repeat()
                     .Subscribe(dataChunk =>
                                    {
                                        if (dataChunk.Length > 0)
                                        {
                                            o.OnNext(dataChunk);
                                            return;
                                        }
                                        Debug.Assert(!stream.DataAvailable);
                                        o.OnCompleted();
                                    }, o.OnError, o.OnCompleted));
    }
    public static IObservable<byte[]> AsyncReadChunk(this NetworkStream stream, int bufferSize)
    {
        var buffer = new byte[bufferSize];
        return Observable.FromAsyncPattern<byte[], int, int, int>(stream.BeginRead, stream.EndRead)(buffer, 0, bufferSize)
            .Select(cbRead =>
            {
                Console.WriteLine("Data chunk received.");
                var dataChunk = new byte[cbRead];
                Buffer.BlockCopy(buffer, 0, dataChunk, 0, cbRead);
                return dataChunk;
            });
    }

我发现读取较小的buffersize,因为较大的缓冲区会导致等待缓冲区被填满(例如在我的场景中,传入的数据是小包)。

异步.从NetworkStream块中读取60秒.(Rx)

由于使用的是Defer,因此必须检查Defer逻辑中的可用数据。最简单的方法是在AsyncReadChunk方法中执行检查,例如:

public static IObservable<byte[]> AsyncReadChunk(this NetworkStream stream, int bufferSize) 
{ 
    if (!stream.DataAvailable)
    {
        return Observable.Empty<byte[]>();
    }
    else
    {
        var buffer = new byte[bufferSize]; 
        return Observable.FromAsyncPattern<byte[], int, int, int>(stream.BeginRead, stream.EndRead)(buffer, 0, bufferSize) 
            .Select(cbRead => 

我有点不确定这是否有帮助,但这是我用来读取流的tooobservable()方法。

public static class ObservableApmExtensions
{
    public static IObservable<byte> ToObservable(this FileStream source)
    {
        return source.ToObservable(4096, Scheduler.CurrentThread);
    }
    public static IObservable<byte> ToObservable(this FileStream source, int buffersize, IScheduler scheduler)
    {
        return Observable.Create<byte>(o =>
        {
            var initialState = new StreamReaderState(source, buffersize);
            var subscription = new MultipleAssignmentDisposable();
            Action<StreamReaderState, Action<StreamReaderState>> action =
                (state, self) =>
                {
                    subscription.Disposable = state.ReadNext()
                        .Subscribe(
                            bytesRead =>
                            {
                                for (int i = 0; i < bytesRead; i++)
                                {
                                    o.OnNext(state.Buffer[i]);
                                }
                                if (bytesRead > 0)
                                    self(state);
                                else
                                    o.OnCompleted();
                            },
                            o.OnError);
                };
            var scheduledAction = scheduler.Schedule(initialState, action);
            return new CompositeDisposable(scheduledAction, subscription);
        });
    }
    private sealed class StreamReaderState
    {
        private readonly int _bufferSize;
        private readonly Func<byte[], int, int, IObservable<int>> _factory;
        public StreamReaderState(Stream source, int bufferSize)
        {
            _bufferSize = bufferSize;
            _factory = Observable.FromAsyncPattern<byte[], int, int, int>(source.BeginRead, source.EndRead);
            Buffer = new byte[bufferSize];
        }
        public IObservable<int> ReadNext()
        {
            return _factory(Buffer, 0, _bufferSize);
        }
        public byte[] Buffer { get; set; }
    }
}

我没有尝试过使用NetworkStream,但听起来你可以交换检查

if (bytesRead > 0)

if (source.DataAvailable)

您还需要将源类型更改为NetworkStream。

我认为我代码中的调度可以帮助你解决阻塞问题。另一种选择,如果合适的话(我仍然不太明白你的问题),你可以使用.切换并创建一个嵌套的可观察对象

这意味着当一些数据通过时,您将其全部读取,直到完成,然后完成。一旦你完成,你开始另一个序列,这将是任何进一步的数据。

s1 --1-0-1-1|
s2          ---1-0-0-1-|        
s3                     ---0-0-1-0-1|
etc..
out--1-0-1-1---1-0-0-1----0-0-1-0-1|

s1, s2, s3等是在stream.DataAvailable之前的数据突发序列。然后这些内部流将完成,并启动一个请求(创建另一个内部可观察序列s2, s3, sN)。Switch(或Merge或Concat)都将能够将这些多个序列扁平化为单个序列,供您的用户使用。

另一个可能更容易编码的替代方法是使用IEnumerable>。这些都很容易创建,只需像这样的方法

public IEnumerable<IObservable<byte>> ConstantRead(string path)
{
    while (true)
    {
        yield return Observable.Using(
                () => new FileStream(path, FileMode.Open, FileAccess.Read, FileShare.None),
                stream => stream.ToObservable(4096, Scheduler.ThreadPool));
    }
}

更改您的网络流要求。

然后像这样压平

_subscription = ConstantRead(@"C:'Users'Lee'MyFile.zip")
                .Concat()
                .Subscribe(...

我希望这对你有帮助。

坎贝尔李