为什么我的Inject方法不将数据注入流

本文关键字:数据 注入 我的 Inject 方法 为什么 | 更新日期: 2023-09-27 18:28:41

我为C#Stream编写了一个包装类,因为我希望它能够将数据注入流中,并在通过ReadAsync()读取数据时调用函数。然而,"注入"部分不起作用,我不知道为什么。

class ExtendedStream : Stream
{
    private readonly Stream _originalStream;
    private readonly Action<byte[]> _readCallback;
    private ManualResetEvent dataInjected = new ManualResetEvent(false);
    private List<byte> data = new List<byte>();
    private int pos = 0;
    public ExtendedStream(Stream originalStream, Action<byte[]> readCallback)
    {
        _originalStream = originalStream;
        _readCallback = readCallback;
    }
    public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, System.Threading.CancellationToken cancellationToken)
    {
        var read = await _originalStream.ReadAsync(buffer, offset, count, cancellationToken);
        _readCallback(buffer);
        return read;
    }
    public void Inject(string text)
    {
        data.AddRange(new UTF8Encoding(false).GetBytes(text));
        dataInjected.Set();
    }
    private IEnumerable<byte> GetBytes(int count)
    {
        int returned = 0;
        while (returned == 0)
        {
            if (pos < data.Count)
            {
                while (pos < data.Count && returned < count)
                {
                    yield return data[pos];
                    pos += 1; returned += 1;
                }
            }
            else
            {
                dataInjected.Reset();
                dataInjected.WaitOne();
            }
        }
    }       
    public override int Read(byte[] buffer, int offset, int count)
    {
        var bytes = GetBytes(count).ToArray();
        for (int i = 0; offset + i < buffer.Length && i < bytes.Length; i++)
        {
            buffer[offset + i] = bytes[i];
        }
        return bytes.Length;
    }
    public override void Write(byte[] buffer, int offset, int count)
    {
        _originalStream.Write(buffer, offset, count);
    }
    public override bool CanRead
    {
        get { return _originalStream.CanRead; }
    }
    public override bool CanSeek
    {
        get { return _originalStream.CanSeek; }
    }
    public override bool CanWrite
    {
        get { return _originalStream.CanWrite; }
    }
    public override void Flush() 
    {
        _originalStream.Flush();
    }
    public override long Length
    {
        get { return _originalStream.Length; }
    }
    public override long Position
    {
        get { return _originalStream.Position; }
        set { _originalStream.Position = value; }
    }
    public override long Seek(long offset, SeekOrigin origin)
    {
        return _originalStream.Seek(offset, origin);
    }
    public override void SetLength(long value)
    {
        _originalStream.SetLength(value);
    }        
}

然后我用XmlReader读取流,如下所示。

using (XmlReader xmlReader = XmlReader.Create(_extendedStream, new XmlReaderSettings() { Async = true }))
{
                while (await xmlReader.ReadAsync())
                {                        
                    switch (xmlReader.NodeType)
                    {                            
                        case XmlNodeType.EndElement:
                            if (xmlReader.LocalName.Equals("test"))
                            {
                                _log.Debug("</test> injected!");                            
                            }
                            break;
                        default:                              
                            break;
                    }
}

如果我调用_extendedStream.Inject("</test>"),则永远不会注入数据。有人知道为什么吗?

为什么我的Inject方法不将数据注入流

您的ReadAsync方法委托给_originalStream.ReadAsync,所以我认为在您的示例代码中不会调用您重写的Read方法(发生注入魔术的地方)。

您可以将原始流的读取移动到Read方法中,而不是覆盖ReadAsync(这可能会变得复杂),该方法将由基本ReadAsync方法异步调用。

// UNTESTED CODE, MAY CONTAIN OFF BY ONE ERRORS
public override int Read(byte[] buffer, int offset, int count)
{
    int totalBytesRead = 0;
    var bytes = GetBytes(count).ToArray();
    // write injected bytes into buffer
    for (int i = 0; offset + i < buffer.Length && i < bytes.Length; i++)
    {
        buffer[offset + i] = bytes[i];
    }
    if(bytes.Length < count)
    {
        // we reached the end of the custom bytes, so read the rest from the original stream
        count = count - bytes.Length;
        offset += bytes.Length;
        totalBytesRead = _originalStream.Read(buffer, offset, count);
    }
    totalBytesRead += bytes.Length;
    _readCallback(buffer);
    return totalBytesRead;
}

这可能需要一些调整,以防止注入的数据被一次又一次地注入,但至少注入逻辑会运行。