从字符串创建一个空流,当没有数据时stream . read()等待

本文关键字:数据 stream 等待 read 创建 字符串 一个 | 更新日期: 2023-09-27 17:50:25

我正试图用我从字符串创建的流替换我从TcpClient.GetStream()获得的流。

我正在使用以下方法来创建所述流:

public Stream GenerateStreamFromString(string s)
{
    MemoryStream stream = new MemoryStream();
    StreamWriter writer = new StreamWriter(stream);
    writer.Write(s);
    writer.Flush();
    stream.Position = 0;
    return stream;
}

然而,这个流是通过使用库中的Stream.Read()来读取的,我不想改变。问题是我用一个空字符串创建这个流,因为对象需要一个流来启动,通常当使用TcpClient流时,它会停止在Stream.Read(),直到它有东西要读,但不是用我从字符串创建的流。

所以我的问题,我如何创建一个空流,我可以以后从字符串添加数据?

从字符串创建一个空流,当没有数据时stream . read()等待

在内部使用BlockingCollection<>作为队列,您可以这样写:

public class WatitingStream : Stream
{
    private BlockingCollection<byte[]> Packets = new BlockingCollection<byte[]>();
    private byte[] IncompletePacket;
    private int IncompletePacketOffset;
    public WatitingStream()
    {
    }
    protected override void Dispose(bool disposing)
    {
        if (disposing)
        {
            Packets.CompleteAdding();
        }
        base.Dispose(disposing);
    }
    public override bool CanRead
    {
        get { return Packets.IsCompleted; }
    }
    public override bool CanSeek
    {
        get { return false; }
    }
    public override bool CanWrite
    {
        get { return Packets.IsAddingCompleted; }
    }
    public override void Flush()
    {
    }
    public override long Length
    {
        get
        {
            throw new NotSupportedException();
        }
    }
    public override long Position
    {
        get
        {
            throw new NotSupportedException();
        }
        set
        {
            throw new NotSupportedException();
        }
    }
    public override int Read(byte[] buffer, int offset, int count)
    {
        if (count == 0)
        {
            return 0;
        }
        byte[] packet;
        int packetOffset;
        if (IncompletePacket != null)
        {
            packet = IncompletePacket;
            packetOffset = IncompletePacketOffset;
        }
        else
        {
            if (Packets.IsCompleted)
            {
                return 0;
            }
            packet = Packets.Take();
            packetOffset = 0;
        }
        int read = Math.Min(packet.Length - packetOffset, count);
        Buffer.BlockCopy(packet, packetOffset, buffer, offset, read);
        packetOffset += read;
        if (packetOffset < packet.Length)
        {
            IncompletePacket = packet;
            IncompletePacketOffset = packetOffset;
        }
        else
        {
            IncompletePacket = null;
            IncompletePacketOffset = 0;
        }
        return read;
    }
    public override long Seek(long offset, SeekOrigin origin)
    {
        throw new NotSupportedException();
    }
    public override void SetLength(long value)
    {
        throw new NotSupportedException();
    }
    public override void Write(byte[] buffer, int offset, int count)
    {
        if (count == 0)
        {
            return;
        }
        byte[] packet = new byte[count];
        Buffer.BlockCopy(buffer, offset, packet, 0, count);
        Packets.Add(packet);
    }
}

作为普通流使用。Write不阻塞。Read

必须做出一些决定:这个Stream是基于"数据包"的。它不会Write零长度的数据包,并且Read将返回一个数据包的数据。Read将不会在下一个数据包上继续。如果在Read之后还有数据留在数据包中,那么这些数据将被保存到下一个ReadDispose()将停止Write(因此,如果"客户端"在"服务器"之前执行Dispose(),如果服务器试图执行Write,则服务器将获得异常)。如果"服务器"先执行Dispose(),则"客户端"可以完成仍然存在的数据包的读取。显然,可以(很容易)将这个类分成两个类(一个Server和一个Client),其中Server保留BlockingCollection<>,而客户端有对"服务器"的引用。这将解决" Dispose() "异常/问题(但会使代码大小加倍:-))