SslStream.ReadAsync从不调用InnerStream.ReadAsync
本文关键字:ReadAsync InnerStream 调用 SslStream | 更新日期: 2023-09-27 17:50:11
我不确定是否我错过了什么,或者确实有一个设计缺陷在SslStream(和,可能,其他流类可以包装内部流)。
考虑以下伪代码:
MyStream
{
Task<int> ReadAsync()
{ ... }
}
...
{
MyStream my = new MyStream();
SslStream ssl = new SslStream(my);
ssl.ReadAsync(...);
}
虽然可以预期SslStream。ReadAsync最终会调用MyStream。ReadAsync,它不会。相反,它会调用MyStream。BeginRead(如果有定义的话)。如果MyStream。如果没有定义BeginRead,其行为将很难预测(它将取决于MyStream是从哪个类派生的等等)。
简而言之,要使SslStream的async/await方法按预期工作,需要实现内部流类的BeginXXX/EndXXX(非async/await方法)。
BeginXXX/EndXXX模式对于开发来说比async/await模式要复杂得多(对我来说,这就是引入async/await的原因——使异步编程更容易)。但是仍然要求开发BeginXXX/EndXXX方法违背了async/await的目的。
此外,需要知道SslStream类的内部实现(因为它可以直接调用InnerStream)。ReadAsync(如果实现方式不同)。我的意思是SslStream的公共签名并没有清楚地为我提供足够的信息,我是否应该在我的内部流类中实现ReadAsync或BeginRead。
为此,我需要使用试错方法或检查SslStream的源代码(以及它的Stream父类,因为SslStream继承了基本流类的ReadAsync)。这似乎不是一种可靠和直接的编写代码的方法。
当前实现的异步/等待方法,如ReadAsync在SslStream/Stream类的原因?
是的,特别是Stream
有点混乱,因为它早在async
/await
之前就存在了。例如,ReadAsync
的默认实现实际上将在线程池线程上执行阻塞读取。
我建议您将ReadAsync
重写为常规TAP方法,并将BeginRead
/EndRead
重写为该TAP方法的APM包装器。MSDN文档对此有最好的模式(正确处理callback
和state
),除了我更喜欢稍微调整一下,这样EndRead
的任何异常都不会包装在AggregateException
中:
public static IAsyncResult ToBegin<T>(
Task<T> task, AsyncCallback callback, object state)
{
var tcs = new TaskCompletionSource<T>(state);
task.ContinueWith(t =>
{
if (t.IsFaulted) tcs.TrySetException(t.Exception.InnerExceptions)
else if (t.IsCanceled) tcs.TrySetCanceled();
else tcs.TrySetResult(t.Result);
if (callback != null) callback(tcs.Task);
}, TaskScheduler.Default);
return tcs.Task;
}
public static T ToEnd<T>(IAsyncResult result)
{
// Original MSDN code uses Task<T>.Result
return ((Task<T>)result).GetAwaiter().GetResult();
}
Task
实现了IAsyncResult
,所以你应该能够摆脱
return ReadAsync(...);
作为BeginRead
的实现。可能会更复杂一些,比如你需要将回调作为那个任务的延续连接起来。但是你可以重用这些代码。
顺便说一句,你打破了流API契约,因为BeginRead
做了一些与ReadAsync
不同的事情。严格来说,这是你的错,而不是框架的问题。
我的意思是SslStream的公共签名没有清楚地为我提供足够的信息,我是否应该在我的内部流类中实现ReadAsync或BeginRead。
不需要知道。当你从一个类继承时,你继承了这个类的所有义务。你必须实现所有东西,因为这是你对该类用户的承诺。明显的例子:如果你从IEnumerator
派生,你不能只实现MoveNext
而忽略Current
,并期望调用者工作。
ReadAsync
只是对更深层次异步api的帮助包装。
如果你想覆盖异步行为,最好的选择是覆盖BeginRead
和EndRead
——ReadAsync
的默认实现将在它们之上构建任务(其他BeginRead
将调用底层流的BeginRead
)。
这只是因为ReadAsync
是后来添加的-事实上,在一些平台上,它们还不可用。但最终,期望ReadAsync
作为重载工作,与期望它自动使用您重载的Read
非常相似——它们并不真正适合。
但是不要担心-这并不会使使用基于Task
的api变得更加困难!Task
本身实现了IAsyncResult
。因此,在重写BeginRead
方法时,只需让它返回ReadAsync
:)
简化样例实现:
class MyStream : Stream
{
public override async Task<int> ReadAsync
(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
const string str = "Hi there!'r'n";
await Task.Delay(1000);
return Encoding.UTF8.GetBytes(str, 0, str.Length, buffer, offset);
}
public override IAsyncResult BeginRead
(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
{
return ReadAsync(buffer, offset, count).ContinueWith(t => callback(t));
}
public override int EndRead(IAsyncResult asyncResult)
{
// Stolen from Stephen. Nicer than rethrowing the inner exception manually :)
return ((Task<int>)asyncResult).GetAwaiter().GetResult();
}
}
显然,把它做成一个基类是很容易的——只允许在子类中重写ReadAsync
方法。如果这不是一个选项,你可以在扩展方法中管理功能(在简化的代码中看起来并不那么重要,但你可能希望在实际代码中有一些检查和错误处理;更重要的是,您确实想要处理state
—对于我的测试用例,实现它是不必要的,但它只是违反API的另一件事)。
正如@usr指出的那样,您忽略的一点是,确保流的行为一致是您的责任。这就像如果您只覆盖==
操作符而不覆盖Equals
或GetHashCode
操作符一样——导致的不一致是您的错,因为您应该确保一致性。另一方面,如果您只覆盖BeginRead
,它将工作得很好,因为ReadAsync
的默认实现调用BeginRead
—代码有保持向后兼容。但出于完全相同的原因,反过来它就不能同样工作了。
好吧,我已经写了一些东西,应该工作良好,即使传递一个state
(现在Task
和Task<T>
都更新了):
static class Extensions
{
struct Unit { }
public static IAsyncResult Apmize<T>(this Task<T> @this, AsyncCallback callback, object state)
{
return @this.ApmizeInternal<T>(callback, state);
}
public static IAsyncResult Apmize(this Task @this, AsyncCallback callback, object state)
{
return @this.ApmizeInternal<Unit>(callback, state);
}
private static IAsyncResult ApmizeInternal<T>(this Task @this, AsyncCallback callback, object state)
{
if (@this.AsyncState == state)
return @this.ContinueWith(t => callback(t));
var tcs = new TaskCompletionSource<T>(state);
@this.ContinueWith
(
t =>
{
if (t.IsFaulted) tcs.TrySetException(t.Exception.InnerExceptions);
else if (t.IsCanceled) tcs.TrySetCanceled();
else
{
if (t is Task<T>)
{
tcs.TrySetResult(((Task<T>)t).Result);
}
else
{
tcs.TrySetResult(default(T));
}
}
if (callback != null) callback(tcs.Task);
},
CancellationToken.None, TaskContinuationOptions.None, TaskScheduler.Default
);
return tcs.Task;
}
}
用法是这样的:
public override IAsyncResult BeginRead
(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
{
return ReadAsync(buffer, offset, count).Apmize(callback, state);
}
public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
{
return WriteAsync(buffer, offset, count).Apmize(callback, state);
}
它有点长,但它只是一个可重用的扩展方法。并且它可以正确地处理状态:)