将字符串作为流读取而不进行复制

本文关键字:复制 读取 字符串 | 更新日期: 2023-09-27 18:25:32

我在字符串中有一些数据。我有一个函数,它将流作为输入。我想将我的数据提供给我的函数,而不必将完整的字符串复制到流中。从本质上讲,我正在寻找一个可以包装字符串并从中读取的流类

我在网上看到的唯一建议是StringReader,它不是一个流,或者创建一个内存流并写入它,这意味着复制数据。我可以编写自己的流对象,但棘手的部分是处理编码,因为流处理字节。有没有一种方法可以做到这一点而不编写新的流类?

我正在BizTalk中实现管道组件。BizTalk完全通过流处理所有事务,因此您总是以流的形式将事务传递给BizTalk。BizTalk总是以小块的形式从该流中读取,因此,如果我可以从流中读取BizTalk想要的内容,则将整个字符串复制到流中(尤其是在字符串很大的情况下)是没有意义的。

将字符串作为流读取而不进行复制

这里有一个合适的StringReaderStream,它有以下缺点:

  • Read的缓冲区必须至少为maxBytesPerChar长。通过保留内部一个字符的buff = new byte[maxBytesPerChar],可以为小缓冲区实现Read。但对于大多数用法来说并不是必需的
  • 没有Seek,可以进行查找,但一般来说会非常棘手。(有些寻求案例,如寻求开始、寻求结束,执行起来很简单。)
/// <summary>
/// Convert string to byte stream.
/// <para>
/// Slower than <see cref="Encoding.GetBytes()"/>, but saves memory for a large string.
/// </para>
/// </summary>
public class StringReaderStream : Stream
{
    private string input;
    private readonly Encoding encoding;
    private int maxBytesPerChar;
    private int inputLength;
    private int inputPosition;
    private readonly long length;
    private long position;
    public StringReaderStream(string input)
        : this(input, Encoding.UTF8)
    { }
    public StringReaderStream(string input, Encoding encoding)
    {
        this.encoding = encoding ?? throw new ArgumentNullException(nameof(encoding));
        this.input = input;
        inputLength = input == null ? 0 : input.Length;
        if (!string.IsNullOrEmpty(input))
            length = encoding.GetByteCount(input);
            maxBytesPerChar = encoding == Encoding.ASCII ? 1 : encoding.GetMaxByteCount(1);
    }
    public override bool CanRead => true;
    public override bool CanSeek => false;
    public override bool CanWrite => false;
    public override long Length => length;
    public override long Position
    {
        get => position;
        set => throw new NotImplementedException();
    }
    public override void Flush()
    {
    }
    public override int Read(byte[] buffer, int offset, int count)
    {
        if (inputPosition >= inputLength)
            return 0;
        if (count < maxBytesPerChar)
            throw new ArgumentException("count has to be greater or equal to max encoding byte count per char");
        int charCount = Math.Min(inputLength - inputPosition, count / maxBytesPerChar);
        int byteCount = encoding.GetBytes(input, inputPosition, charCount, buffer, offset);
        inputPosition += charCount;
        position += byteCount;
        return byteCount;
    }
    public override long Seek(long offset, SeekOrigin origin)
    {
        throw new NotImplementedException();
    }
    public override void SetLength(long value)
    {
        throw new NotImplementedException();
    }
    public override void Write(byte[] buffer, int offset, int count)
    {
        throw new NotImplementedException();
    }
}

虽然这个问题最初被标记为c#-4.0,但在.NET 5中引入Encoding.CreateTranscodingStream:可以很容易地完成

创建一个流,用于在内部编码和外部编码之间对数据进行代码转换,类似于Convert(Encoding, Encoding, Byte[])

诀窍是定义直接访问string的字节的底层UnicodeStream,然后将其封装在转码流中,以呈现具有所需编码的流式内容。

以下类和扩展方法完成了这项工作:

public static partial class TextExtensions
{
    public static Encoding PlatformCompatibleUnicode => BitConverter.IsLittleEndian ? Encoding.Unicode : Encoding.BigEndianUnicode;
    static bool IsPlatformCompatibleUnicode(this Encoding encoding) => BitConverter.IsLittleEndian ? encoding.CodePage == 1200 : encoding.CodePage == 1201;
    
    public static Stream AsStream(this string @string, Encoding encoding = default) => 
        (@string ?? throw new ArgumentNullException(nameof(@string))).AsMemory().AsStream(encoding);
    public static Stream AsStream(this ReadOnlyMemory<char> charBuffer, Encoding encoding = default) =>
        ((encoding ??= Encoding.UTF8).IsPlatformCompatibleUnicode())
            ? new UnicodeStream(charBuffer)
            : Encoding.CreateTranscodingStream(new UnicodeStream(charBuffer), PlatformCompatibleUnicode, encoding, false);
}
sealed class UnicodeStream : Stream
{
    const int BytesPerChar = 2;
    // By sealing UnicodeStream we avoid a lot of the complexity of MemoryStream.
    ReadOnlyMemory<char> charMemory;
    int position = 0;
    Task<int> _cachedResultTask; // For async reads, avoid allocating a Task.FromResult<int>(nRead) every time we read.
    public UnicodeStream(string @string) : this((@string ?? throw new ArgumentNullException(nameof(@string))).AsMemory()) { }
    public UnicodeStream(ReadOnlyMemory<char> charMemory) => this.charMemory = charMemory;
    public override int Read(Span<byte> buffer)
    {
        EnsureOpen();
        var charPosition = position / BytesPerChar;
        // MemoryMarshal.AsBytes will throw on strings longer than int.MaxValue / 2, so only slice what we need. 
        var byteSlice = MemoryMarshal.AsBytes(charMemory.Slice(charPosition, Math.Min(charMemory.Length - charPosition, 1 + buffer.Length / BytesPerChar)).Span);
        var slicePosition = position % BytesPerChar;
        var nRead = Math.Min(buffer.Length, byteSlice.Length - slicePosition);
        byteSlice.Slice(slicePosition, nRead).CopyTo(buffer);
        position += nRead;
        return nRead;
    }
    public override int Read(byte[] buffer, int offset, int count) 
    {
        ValidateBufferArgs(buffer, offset, count);
        return Read(buffer.AsSpan(offset, count));
    }
    public override int ReadByte()
    {
        // Could be optimized.
        Span<byte> span = stackalloc byte[1];
        return Read(span) == 0 ? -1 : span[0];
    }
    public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
    {
        EnsureOpen();
        if (cancellationToken.IsCancellationRequested) 
            return ValueTask.FromCanceled<int>(cancellationToken);
        try
        {
            return new ValueTask<int>(Read(buffer.Span));
        }
        catch (Exception exception)
        {
            return ValueTask.FromException<int>(exception);
        }   
    }
    
    public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
    {
        ValidateBufferArgs(buffer, offset, count);
        var valueTask = ReadAsync(buffer.AsMemory(offset, count));
        if (!valueTask.IsCompletedSuccessfully)
            return valueTask.AsTask();
        var lastResultTask = _cachedResultTask;
        return (lastResultTask != null && lastResultTask.Result == valueTask.Result) ? lastResultTask : (_cachedResultTask = Task.FromResult<int>(valueTask.Result));
    }
    void EnsureOpen()
    {
        if (position == -1)
            throw new ObjectDisposedException(GetType().Name);
    }
    
    // https://learn.microsoft.com/en-us/dotnet/api/system.io.stream.flush?view=net-5.0
    // In a class derived from Stream that doesn't support writing, Flush is typically implemented as an empty method to ensure full compatibility with other Stream types since it's valid to flush a read-only stream.
    public override void Flush() { }
    public override Task FlushAsync(CancellationToken cancellationToken) => cancellationToken.IsCancellationRequested ? Task.FromCanceled(cancellationToken) : Task.CompletedTask;
    public override bool CanRead => true;
    public override bool CanSeek => false;
    public override bool CanWrite => false;
    public override long Length => throw new NotSupportedException();
    public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); }
    public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
    public override void SetLength(long value) => throw new NotSupportedException();
    public override void Write(byte[] buffer, int offset, int count) =>  throw new NotSupportedException();
    
    protected override void Dispose(bool disposing)
    {
        try 
        {
            if (disposing) 
            {
                _cachedResultTask = null;
                charMemory = default;
                position = -1;
            }
        }
        finally 
        {
            base.Dispose(disposing);
        }
    }   
    
    static void ValidateBufferArgs(byte[] buffer, int offset, int count)
    {
        if (buffer == null)
            throw new ArgumentNullException(nameof(buffer));
        if (offset < 0 || count < 0)
            throw new ArgumentOutOfRangeException();
        if (count > buffer.Length - offset)
            throw new ArgumentException();
    }
}   

注:

  • 您可以通过将stringchar []阵列或其切片转换为ReadOnlyMemory<char>缓冲区来进行流式传输。这种转换只是封装底层字符串或数组内存,而不分配任何内容。

  • 使用Encoding.GetBytes()对字符串的块进行编码的解决方案会被破坏,因为它们不会处理在块之间分割的代理对。为了正确处理代理项对,必须调用Encoding.GetEncoder()来初始保存Encoder。稍后,Encoder.GetBytes(ReadOnlySpan<Char>, Span<Byte>, flush: false)可以用于在chuck中进行编码,并记住调用之间的状态。

    (微软的TranscodingStream做到了这一点。)

  • 使用Encoding.Unicode将获得最佳性能,因为(在几乎所有.Net平台上)此编码与String类型本身的编码相同。

    当提供平台兼容的Unicode编码时,不使用TranscodingStream,并且返回的Stream直接从字符数据缓冲器读取。

  • 待办事项:

    • 在big-endian平台上进行测试(这种情况很少见)
    • 对长度超过int.MaxValue / 2的字符串进行测试

演示小提琴,包括一些基本测试。

您可以避免维护整个内容的副本,但您将被迫使用为每个字符产生相同字节数的编码。这样,您就可以通过Encoding.GetBytes(str, strIndex, byteCount, byte[], byteIndex)提供数据块,因为它们被直接请求到读取缓冲区中。

每个Stream.Read()操作将始终有一个复制操作,因为它允许调用者提供目标缓冲区。

Stream只能复制数据。此外,它处理的是byte s,而不是char s,因此您必须通过解码过程复制数据。但是,如果您想将字符串视为ASCII字节流,则可以创建一个实现Stream的类来完成此操作。例如:

public class ReadOnlyStreamStringWrapper : Stream
{
    private readonly string theString;
    public ReadOnlyStreamStringWrapper(string theString)
    {
        this.theString = theString;
    }
    public override void Flush()
    {
        throw new NotSupportedException();
    }
    public override long Seek(long offset, SeekOrigin origin)
    {
        switch (origin)
        {
            case SeekOrigin.Begin:
                if(offset < 0 || offset >= theString.Length)
                    throw new InvalidOperationException();
                Position = offset;
                break;
            case SeekOrigin.Current:
                if ((Position + offset) < 0)
                    throw new InvalidOperationException();
                if ((Position + offset) >= theString.Length)
                    throw new InvalidOperationException();
                Position += offset;
                break;
            case SeekOrigin.End:
                if ((theString.Length + offset) < 0)
                    throw new InvalidOperationException();
                if ((theString.Length + offset) >= theString.Length)
                    throw new InvalidOperationException();
                Position = theString.Length + offset;
                break;
        }
        return Position;
    }
    public override void SetLength(long value)
    {
        throw new NotSupportedException();
    }
    public override int Read(byte[] buffer, int offset, int count)
    {
        return Encoding.ASCII.GetBytes(theString, (int)Position, count, buffer, offset);
    }
    public override void Write(byte[] buffer, int offset, int count)
    {
        throw new NotSupportedException();
    }
    public override bool CanRead
    {
        get { return true; }
    }
    public override bool CanSeek
    {
        get { return true; }
    }
    public override bool CanWrite
    {
        get { return false; }
    }
    public override long Length
    {
        get { return theString.Length; }
    }
    public override long Position { get; set; }
}

但是,要避免"复制"数据,还有很多工作要做。。。