将字符串作为流读取而不进行复制
本文关键字:复制 读取 字符串 | 更新日期: 2023-09-27 18:25:32
我在字符串中有一些数据。我有一个函数,它将流作为输入。我想将我的数据提供给我的函数,而不必将完整的字符串复制到流中。从本质上讲,我正在寻找一个可以包装字符串并从中读取的流类
我在网上看到的唯一建议是StringReader,它不是一个流,或者创建一个内存流并写入它,这意味着复制数据。我可以编写自己的流对象,但棘手的部分是处理编码,因为流处理字节。有没有一种方法可以做到这一点而不编写新的流类?
我正在BizTalk中实现管道组件。BizTalk完全通过流处理所有事务,因此您总是以流的形式将事务传递给BizTalk。BizTalk总是以小块的形式从该流中读取,因此,如果我可以从流中读取BizTalk想要的内容,则将整个字符串复制到流中(尤其是在字符串很大的情况下)是没有意义的。
这里有一个合适的StringReaderStream
,它有以下缺点:
Read
的缓冲区必须至少为maxBytesPerChar
长。通过保留内部一个字符的buff = new byte[maxBytesPerChar]
,可以为小缓冲区实现Read
。但对于大多数用法来说并不是必需的- 没有
Seek
,可以进行查找,但一般来说会非常棘手。(有些寻求案例,如寻求开始、寻求结束,执行起来很简单。)
/// <summary>
/// Convert string to byte stream.
/// <para>
/// Slower than <see cref="Encoding.GetBytes()"/>, but saves memory for a large string.
/// </para>
/// </summary>
public class StringReaderStream : Stream
{
private string input;
private readonly Encoding encoding;
private int maxBytesPerChar;
private int inputLength;
private int inputPosition;
private readonly long length;
private long position;
public StringReaderStream(string input)
: this(input, Encoding.UTF8)
{ }
public StringReaderStream(string input, Encoding encoding)
{
this.encoding = encoding ?? throw new ArgumentNullException(nameof(encoding));
this.input = input;
inputLength = input == null ? 0 : input.Length;
if (!string.IsNullOrEmpty(input))
length = encoding.GetByteCount(input);
maxBytesPerChar = encoding == Encoding.ASCII ? 1 : encoding.GetMaxByteCount(1);
}
public override bool CanRead => true;
public override bool CanSeek => false;
public override bool CanWrite => false;
public override long Length => length;
public override long Position
{
get => position;
set => throw new NotImplementedException();
}
public override void Flush()
{
}
public override int Read(byte[] buffer, int offset, int count)
{
if (inputPosition >= inputLength)
return 0;
if (count < maxBytesPerChar)
throw new ArgumentException("count has to be greater or equal to max encoding byte count per char");
int charCount = Math.Min(inputLength - inputPosition, count / maxBytesPerChar);
int byteCount = encoding.GetBytes(input, inputPosition, charCount, buffer, offset);
inputPosition += charCount;
position += byteCount;
return byteCount;
}
public override long Seek(long offset, SeekOrigin origin)
{
throw new NotImplementedException();
}
public override void SetLength(long value)
{
throw new NotImplementedException();
}
public override void Write(byte[] buffer, int offset, int count)
{
throw new NotImplementedException();
}
}
虽然这个问题最初被标记为c#-4.0,但在.NET 5中引入Encoding.CreateTranscodingStream
:可以很容易地完成
创建一个流,用于在内部编码和外部编码之间对数据进行代码转换,类似于
Convert(Encoding, Encoding, Byte[])
。
诀窍是定义直接访问string
的字节的底层UnicodeStream
,然后将其封装在转码流中,以呈现具有所需编码的流式内容。
以下类和扩展方法完成了这项工作:
public static partial class TextExtensions
{
public static Encoding PlatformCompatibleUnicode => BitConverter.IsLittleEndian ? Encoding.Unicode : Encoding.BigEndianUnicode;
static bool IsPlatformCompatibleUnicode(this Encoding encoding) => BitConverter.IsLittleEndian ? encoding.CodePage == 1200 : encoding.CodePage == 1201;
public static Stream AsStream(this string @string, Encoding encoding = default) =>
(@string ?? throw new ArgumentNullException(nameof(@string))).AsMemory().AsStream(encoding);
public static Stream AsStream(this ReadOnlyMemory<char> charBuffer, Encoding encoding = default) =>
((encoding ??= Encoding.UTF8).IsPlatformCompatibleUnicode())
? new UnicodeStream(charBuffer)
: Encoding.CreateTranscodingStream(new UnicodeStream(charBuffer), PlatformCompatibleUnicode, encoding, false);
}
sealed class UnicodeStream : Stream
{
const int BytesPerChar = 2;
// By sealing UnicodeStream we avoid a lot of the complexity of MemoryStream.
ReadOnlyMemory<char> charMemory;
int position = 0;
Task<int> _cachedResultTask; // For async reads, avoid allocating a Task.FromResult<int>(nRead) every time we read.
public UnicodeStream(string @string) : this((@string ?? throw new ArgumentNullException(nameof(@string))).AsMemory()) { }
public UnicodeStream(ReadOnlyMemory<char> charMemory) => this.charMemory = charMemory;
public override int Read(Span<byte> buffer)
{
EnsureOpen();
var charPosition = position / BytesPerChar;
// MemoryMarshal.AsBytes will throw on strings longer than int.MaxValue / 2, so only slice what we need.
var byteSlice = MemoryMarshal.AsBytes(charMemory.Slice(charPosition, Math.Min(charMemory.Length - charPosition, 1 + buffer.Length / BytesPerChar)).Span);
var slicePosition = position % BytesPerChar;
var nRead = Math.Min(buffer.Length, byteSlice.Length - slicePosition);
byteSlice.Slice(slicePosition, nRead).CopyTo(buffer);
position += nRead;
return nRead;
}
public override int Read(byte[] buffer, int offset, int count)
{
ValidateBufferArgs(buffer, offset, count);
return Read(buffer.AsSpan(offset, count));
}
public override int ReadByte()
{
// Could be optimized.
Span<byte> span = stackalloc byte[1];
return Read(span) == 0 ? -1 : span[0];
}
public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
{
EnsureOpen();
if (cancellationToken.IsCancellationRequested)
return ValueTask.FromCanceled<int>(cancellationToken);
try
{
return new ValueTask<int>(Read(buffer.Span));
}
catch (Exception exception)
{
return ValueTask.FromException<int>(exception);
}
}
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
ValidateBufferArgs(buffer, offset, count);
var valueTask = ReadAsync(buffer.AsMemory(offset, count));
if (!valueTask.IsCompletedSuccessfully)
return valueTask.AsTask();
var lastResultTask = _cachedResultTask;
return (lastResultTask != null && lastResultTask.Result == valueTask.Result) ? lastResultTask : (_cachedResultTask = Task.FromResult<int>(valueTask.Result));
}
void EnsureOpen()
{
if (position == -1)
throw new ObjectDisposedException(GetType().Name);
}
// https://learn.microsoft.com/en-us/dotnet/api/system.io.stream.flush?view=net-5.0
// In a class derived from Stream that doesn't support writing, Flush is typically implemented as an empty method to ensure full compatibility with other Stream types since it's valid to flush a read-only stream.
public override void Flush() { }
public override Task FlushAsync(CancellationToken cancellationToken) => cancellationToken.IsCancellationRequested ? Task.FromCanceled(cancellationToken) : Task.CompletedTask;
public override bool CanRead => true;
public override bool CanSeek => false;
public override bool CanWrite => false;
public override long Length => throw new NotSupportedException();
public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); }
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
public override void SetLength(long value) => throw new NotSupportedException();
public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();
protected override void Dispose(bool disposing)
{
try
{
if (disposing)
{
_cachedResultTask = null;
charMemory = default;
position = -1;
}
}
finally
{
base.Dispose(disposing);
}
}
static void ValidateBufferArgs(byte[] buffer, int offset, int count)
{
if (buffer == null)
throw new ArgumentNullException(nameof(buffer));
if (offset < 0 || count < 0)
throw new ArgumentOutOfRangeException();
if (count > buffer.Length - offset)
throw new ArgumentException();
}
}
注:
您可以通过将
string
、char []
阵列或其切片转换为ReadOnlyMemory<char>
缓冲区来进行流式传输。这种转换只是封装底层字符串或数组内存,而不分配任何内容。使用
Encoding.GetBytes()
对字符串的块进行编码的解决方案会被破坏,因为它们不会处理在块之间分割的代理对。为了正确处理代理项对,必须调用Encoding.GetEncoder()
来初始保存Encoder
。稍后,Encoder.GetBytes(ReadOnlySpan<Char>, Span<Byte>, flush: false)
可以用于在chuck中进行编码,并记住调用之间的状态。(微软的
TranscodingStream
做到了这一点。)使用
Encoding.Unicode
将获得最佳性能,因为(在几乎所有.Net平台上)此编码与String
类型本身的编码相同。当提供平台兼容的Unicode编码时,不使用
TranscodingStream
,并且返回的Stream
直接从字符数据缓冲器读取。待办事项:
- 在big-endian平台上进行测试(这种情况很少见)
- 对长度超过
int.MaxValue / 2
的字符串进行测试
演示小提琴,包括一些基本测试。
您可以避免维护整个内容的副本,但您将被迫使用为每个字符产生相同字节数的编码。这样,您就可以通过Encoding.GetBytes(str, strIndex, byteCount, byte[], byteIndex)
提供数据块,因为它们被直接请求到读取缓冲区中。
每个Stream.Read()
操作将始终有一个复制操作,因为它允许调用者提供目标缓冲区。
Stream
只能复制数据。此外,它处理的是byte
s,而不是char
s,因此您必须通过解码过程复制数据。但是,如果您想将字符串视为ASCII字节流,则可以创建一个实现Stream
的类来完成此操作。例如:
public class ReadOnlyStreamStringWrapper : Stream
{
private readonly string theString;
public ReadOnlyStreamStringWrapper(string theString)
{
this.theString = theString;
}
public override void Flush()
{
throw new NotSupportedException();
}
public override long Seek(long offset, SeekOrigin origin)
{
switch (origin)
{
case SeekOrigin.Begin:
if(offset < 0 || offset >= theString.Length)
throw new InvalidOperationException();
Position = offset;
break;
case SeekOrigin.Current:
if ((Position + offset) < 0)
throw new InvalidOperationException();
if ((Position + offset) >= theString.Length)
throw new InvalidOperationException();
Position += offset;
break;
case SeekOrigin.End:
if ((theString.Length + offset) < 0)
throw new InvalidOperationException();
if ((theString.Length + offset) >= theString.Length)
throw new InvalidOperationException();
Position = theString.Length + offset;
break;
}
return Position;
}
public override void SetLength(long value)
{
throw new NotSupportedException();
}
public override int Read(byte[] buffer, int offset, int count)
{
return Encoding.ASCII.GetBytes(theString, (int)Position, count, buffer, offset);
}
public override void Write(byte[] buffer, int offset, int count)
{
throw new NotSupportedException();
}
public override bool CanRead
{
get { return true; }
}
public override bool CanSeek
{
get { return true; }
}
public override bool CanWrite
{
get { return false; }
}
public override long Length
{
get { return theString.Length; }
}
public override long Position { get; set; }
}
但是,要避免"复制"数据,还有很多工作要做。。。