SslStream.ReadAsync从不调用InnerStream.ReadAsync

本文关键字:ReadAsync InnerStream 调用 SslStream | 更新日期: 2023-09-27 17:50:11

我不确定是否我错过了什么,或者确实有一个设计缺陷在SslStream(和,可能,其他流类可以包装内部流)。

考虑以下伪代码:

MyStream
{
  Task<int> ReadAsync()
  { ... }
}
...
{
  MyStream my = new MyStream();
  SslStream ssl = new SslStream(my);
  ssl.ReadAsync(...);
}

虽然可以预期SslStream。ReadAsync最终会调用MyStream。ReadAsync,它不会。相反,它会调用MyStream。BeginRead(如果有定义的话)。如果MyStream。如果没有定义BeginRead,其行为将很难预测(它将取决于MyStream是从哪个类派生的等等)。

简而言之,要使SslStream的async/await方法按预期工作,需要实现内部流类的BeginXXX/EndXXX(非async/await方法)。

BeginXXX/EndXXX模式对于开发来说比async/await模式要复杂得多(对我来说,这就是引入async/await的原因——使异步编程更容易)。但是仍然要求开发BeginXXX/EndXXX方法违背了async/await的目的。

此外,需要知道SslStream类的内部实现(因为它可以直接调用InnerStream)。ReadAsync(如果实现方式不同)。我的意思是SslStream的公共签名并没有清楚地为我提供足够的信息,我是否应该在我的内部流类中实现ReadAsync或BeginRead。

为此,我需要使用试错方法或检查SslStream的源代码(以及它的Stream父类,因为SslStream继承了基本流类的ReadAsync)。这似乎不是一种可靠和直接的编写代码的方法。

当前实现的异步/等待方法,如ReadAsync在SslStream/Stream类的原因?

SslStream.ReadAsync从不调用InnerStream.ReadAsync

是的,特别是Stream有点混乱,因为它早在async/await之前就存在了。例如,ReadAsync的默认实现实际上将在线程池线程上执行阻塞读取。

我建议您将ReadAsync重写为常规TAP方法,并将BeginRead/EndRead重写为该TAP方法的APM包装器。MSDN文档对此有最好的模式(正确处理callbackstate),除了我更喜欢稍微调整一下,这样EndRead的任何异常都不会包装在AggregateException中:

public static IAsyncResult ToBegin<T>(
    Task<T> task, AsyncCallback callback, object state)
{
  var tcs = new TaskCompletionSource<T>(state);
  task.ContinueWith(t =>
  {
    if (t.IsFaulted) tcs.TrySetException(t.Exception.InnerExceptions)
    else if (t.IsCanceled) tcs.TrySetCanceled();
    else tcs.TrySetResult(t.Result);
    if (callback != null) callback(tcs.Task);
  }, TaskScheduler.Default);
  return tcs.Task;
}
public static T ToEnd<T>(IAsyncResult result)
{
  // Original MSDN code uses Task<T>.Result
  return ((Task<T>)result).GetAwaiter().GetResult();
}

Task实现了IAsyncResult,所以你应该能够摆脱

return ReadAsync(...);

作为BeginRead的实现。可能会更复杂一些,比如你需要将回调作为那个任务的延续连接起来。但是你可以重用这些代码。

顺便说一句,你打破了流API契约,因为BeginRead做了一些与ReadAsync不同的事情。严格来说,这是你的错,而不是框架的问题。

我的意思是SslStream的公共签名没有清楚地为我提供足够的信息,我是否应该在我的内部流类中实现ReadAsync或BeginRead。

不需要知道。当你从一个类继承时,你继承了这个类的所有义务。你必须实现所有东西,因为这是你对该类用户的承诺。明显的例子:如果你从IEnumerator派生,你不能只实现MoveNext而忽略Current,并期望调用者工作。

ReadAsync只是对更深层次异步api的帮助包装。

如果你想覆盖异步行为,最好的选择是覆盖BeginReadEndRead——ReadAsync的默认实现将在它们之上构建任务(其他BeginRead将调用底层流的BeginRead)。

这只是因为ReadAsync是后来添加的-事实上,在一些平台上,它们还不可用。但最终,期望ReadAsync作为重载工作,与期望它自动使用您重载的Read非常相似——它们并不真正适合。

但是不要担心-这并不会使使用基于Task的api变得更加困难!Task本身实现了IAsyncResult。因此,在重写BeginRead方法时,只需让它返回ReadAsync:)

简化样例实现:

class MyStream : Stream
{
    public override async Task<int> ReadAsync
      (byte[] buffer, int offset, int count, CancellationToken cancellationToken)
    {
        const string str = "Hi there!'r'n";
        await Task.Delay(1000);
        return Encoding.UTF8.GetBytes(str, 0, str.Length, buffer, offset);
    }
    public override IAsyncResult BeginRead
      (byte[] buffer, int offset, int count, AsyncCallback callback, object state)
    {
        return ReadAsync(buffer, offset, count).ContinueWith(t => callback(t));
    }
    public override int EndRead(IAsyncResult asyncResult)
    {
        // Stolen from Stephen. Nicer than rethrowing the inner exception manually :)
        return ((Task<int>)asyncResult).GetAwaiter().GetResult();
    }
}

显然,把它做成一个基类是很容易的——只允许在子类中重写ReadAsync方法。如果这不是一个选项,你可以在扩展方法中管理功能(在简化的代码中看起来并不那么重要,但你可能希望在实际代码中有一些检查和错误处理;更重要的是,您确实想要处理state—对于我的测试用例,实现它是不必要的,但它只是违反API的另一件事)。

正如@usr指出的那样,您忽略的一点是,确保流的行为一致是您的责任。这就像如果您只覆盖==操作符而不覆盖EqualsGetHashCode操作符一样——导致的不一致是您的错,因为您应该确保一致性。另一方面,如果您只覆盖BeginRead,它将工作得很好,因为ReadAsync的默认实现调用BeginRead—代码保持向后兼容。但出于完全相同的原因,反过来它就不能同样工作了。

编辑:

好吧,我已经写了一些东西,应该工作良好,即使传递一个state(现在TaskTask<T>都更新了):

static class Extensions
{
    struct Unit { }
    public static IAsyncResult Apmize<T>(this Task<T> @this, AsyncCallback callback, object state)
    {
        return @this.ApmizeInternal<T>(callback, state);
    }
    public static IAsyncResult Apmize(this Task @this, AsyncCallback callback, object state)
    {
        return @this.ApmizeInternal<Unit>(callback, state);
    }
    private static IAsyncResult ApmizeInternal<T>(this Task @this, AsyncCallback callback, object state)
    {
        if (@this.AsyncState == state)
            return @this.ContinueWith(t => callback(t));
        var tcs = new TaskCompletionSource<T>(state);
        @this.ContinueWith
            (
                t =>
                {
                    if (t.IsFaulted) tcs.TrySetException(t.Exception.InnerExceptions);
                    else if (t.IsCanceled) tcs.TrySetCanceled();
                    else
                    {
                        if (t is Task<T>) 
                        {
                            tcs.TrySetResult(((Task<T>)t).Result);
                        }
                        else
                        {
                            tcs.TrySetResult(default(T));
                        }
                    }
                    if (callback != null) callback(tcs.Task);
                },
                CancellationToken.None, TaskContinuationOptions.None, TaskScheduler.Default
            );
        return tcs.Task;
    }
}

用法是这样的:

public override IAsyncResult BeginRead
  (byte[] buffer, int offset, int count, AsyncCallback callback, object state)
{
    return ReadAsync(buffer, offset, count).Apmize(callback, state);
}
public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
{
    return WriteAsync(buffer, offset, count).Apmize(callback, state);
}

它有点长,但它只是一个可重用的扩展方法。并且它可以正确地处理状态:)