TcpListener:如何在等待 AcceptTcpClientAsync() 时停止侦听
本文关键字:在等待 AcceptTcpClientAsync TcpListener | 更新日期: 2023-09-27 18:34:07
如何在异步方法等待传入连接时正确关闭TcpListener。我在SO上找到了这段代码,这里是代码:
public class Server
{
private TcpListener _Server;
private bool _Active;
public Server()
{
_Server = new TcpListener(IPAddress.Any, 5555);
}
public async void StartListening()
{
_Active = true;
_Server.Start();
await AcceptConnections();
}
public void StopListening()
{
_Active = false;
_Server.Stop();
}
private async Task AcceptConnections()
{
while (_Active)
{
var client = await _Server.AcceptTcpClientAsync();
DoStuffWithClient(client);
}
}
private void DoStuffWithClient(TcpClient client)
{
// ...
}
}
和主要:
static void Main(string[] args)
{
var server = new Server();
server.StartListening();
Thread.Sleep(5000);
server.StopListening();
Console.Read();
}
此行上引发异常
await AcceptConnections();
当我调用Server.StopListening((时,对象被删除。
所以我的问题是,如何取消 AcceptTcpClientAsync(( 以正确关闭 TcpListener。
由于这里没有合适的工作示例,因此这里有一个:
假设您同时具有 cancellationToken
和 tcpListener
的范围,则可以执行以下操作:
using (cancellationToken.Register(() => tcpListener.Stop()))
{
try
{
var tcpClient = await tcpListener.AcceptTcpClientAsync();
// … carry on …
}
catch (InvalidOperationException)
{
// Either tcpListener.Start wasn't called (a bug!)
// or the CancellationToken was cancelled before
// we started accepting (giving an InvalidOperationException),
// or the CancellationToken was cancelled after
// we started accepting (giving an ObjectDisposedException).
//
// In the latter two cases we should surface the cancellation
// exception, or otherwise rethrow the original exception.
cancellationToken.ThrowIfCancellationRequested();
throw;
}
}
虽然有一个基于 Stephen Toub 博客文章的相当复杂的解决方案,但使用内置 .NET API 的解决方案要简单得多:
var cancellation = new CancellationTokenSource();
await Task.Run(() => listener.AcceptTcpClientAsync(), cancellation.Token);
// somewhere in another thread
cancellation.Cancel();
此解决方案不会终止挂起的接受调用。但其他解决方案也没有这样做,而且这个解决方案至少更短。
更新:一个更完整的示例,显示了取消信号后应该发生的情况:
var cancellation = new CancellationTokenSource();
var listener = new TcpListener(IPAddress.Any, 5555);
listener.Start();
try
{
while (true)
{
var client = await Task.Run(
() => listener.AcceptTcpClientAsync(),
cancellation.Token);
// use the client, pass CancellationToken to other blocking methods too
}
}
finally
{
listener.Stop();
}
// somewhere in another thread
cancellation.Cancel();
更新 2:Task.Run
仅在任务启动时检查取消令牌。为了加快接受循环的终止,您可能希望注册取消操作:
cancellation.Token.Register(() => listener.Stop());
为我工作:创建一个本地虚拟客户端以连接到侦听器,并且在连接被接受后,不要执行另一个异步接受(使用活动标志(。
// This is so the accept callback knows to not
_Active = false;
TcpClient dummyClient = new TcpClient();
dummyClient.Connect(m_listener.LocalEndpoint as IPEndPoint);
dummyClient.Close();
这可能是一个黑客,但它似乎比这里的其他选项更漂亮:)
StopListening
(释放套接字(是正确的。只要吞下那个特定的错误。您无法避免这种情况,因为无论如何您都需要以某种方式停止挂起的调用。否则,您将泄漏套接字和挂起的异步 IO,并且端口将继续使用。
定义此扩展方法:
public static class Extensions
{
public static async Task<TcpClient> AcceptTcpClientAsync(this TcpListener listener, CancellationToken token)
{
try
{
return await listener.AcceptTcpClientAsync();
}
catch (Exception ex) when (token.IsCancellationRequested)
{
throw new OperationCanceledException("Cancellation was requested while awaiting TCP client connection.", ex);
}
}
}
在使用扩展方法接受客户端连接之前,请执行以下操作:
token.Register(() => listener.Stop());
在不断侦听新的连接客户端时,我使用了以下解决方案:
public async Task ListenAsync(IPEndPoint endPoint, CancellationToken cancellationToken)
{
TcpListener listener = new TcpListener(endPoint);
listener.Start();
// Stop() typically makes AcceptSocketAsync() throw an ObjectDisposedException.
cancellationToken.Register(() => listener.Stop());
// Continually listen for new clients connecting.
try
{
while (true)
{
cancellationToken.ThrowIfCancellationRequested();
Socket clientSocket = await listener.AcceptSocketAsync();
}
}
catch (OperationCanceledException) { throw; }
catch (Exception) { cancellationToken.ThrowIfCancellationRequested(); }
}
- 当
CancellationToken
被取消时,我注册了一个回调以在TcpListener
实例上调用Stop()
。 -
AcceptSocketAsync
通常会立即抛出ObjectDisposedException
。 - 除了
OperationCanceledException
之外,我抓住了除Exception
,尽管向外部调用者抛出"理智"OperationCanceledException
。
我对async
编程很陌生,所以如果这种方法有问题,请原谅我 - 我很高兴看到它被指出并从中学习!
取消令牌有一个可用于停止服务器的委托。当服务器停止时,任何侦听连接调用都将引发套接字异常。
请参阅以下代码:
public class TcpListenerWrapper
{
// helper class would not be necessary if base.Active was public, c'mon Microsoft...
private class TcpListenerActive : TcpListener, IDisposable
{
public TcpListenerActive(IPEndPoint localEP) : base(localEP) {}
public TcpListenerActive(IPAddress localaddr, int port) : base(localaddr, port) {}
public void Dispose() { Stop(); }
public new bool Active => base.Active;
}
private TcpListenerActive server
public async Task StartAsync(int port, CancellationToken token)
{
if (server != null)
{
server.Stop();
}
server = new TcpListenerActive(IPAddress.Any, port);
server.Start(maxConnectionCount);
token.Register(() => server.Stop());
while (server.Active)
{
try
{
await ProcessConnection();
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
}
}
private async Task ProcessConnection()
{
using (TcpClient client = await server.AcceptTcpClientAsync())
{
// handle connection
}
}
}
https://learn.microsoft.com/en-us/dotnet/api/system.net.sockets.socket.beginaccept?view=net-5.0
若要取消对 BeginAccept 方法的挂起调用,请关闭套接字。当异步操作处于进度,则调用提供给 BeginAccept 方法的回调。一个对 EndAccept 方法的后续调用将引发ObjectDisposedException,以指示操作已被取消。
在这里,TcpListner.cs反编译了。
[HostProtection(SecurityAction.LinkDemand, ExternalThreading = true)]
public Task<TcpClient> AcceptTcpClientAsync()
{
return Task<TcpClient>.Factory.FromAsync(BeginAcceptTcpClient, EndAcceptTcpClient, null);
}
/// <summary>Asynchronously accepts an incoming connection attempt and creates a new <see cref="T:System.Net.Sockets.TcpClient" /> to handle remote host communication.</summary>
/// <returns>A <see cref="T:System.Net.Sockets.TcpClient" />.</returns>
/// <param name="asyncResult">An <see cref="T:System.IAsyncResult" /> returned by a call to the <see cref="M:System.Net.Sockets.TcpListener.BeginAcceptTcpClient(System.AsyncCallback,System.Object)" /> method.</param>
/// <PermissionSet>
/// <IPermission class="System.Security.Permissions.EnvironmentPermission, mscorlib, Version=2.0.3600.0, Culture=neutral, PublicKeyToken=b77a5c561934e089" version="1" Unrestricted="true" />
/// <IPermission class="System.Security.Permissions.FileIOPermission, mscorlib, Version=2.0.3600.0, Culture=neutral, PublicKeyToken=b77a5c561934e089" version="1" Unrestricted="true" />
/// <IPermission class="System.Security.Permissions.SecurityPermission, mscorlib, Version=2.0.3600.0, Culture=neutral, PublicKeyToken=b77a5c561934e089" version="1" Flags="UnmanagedCode, ControlEvidence" />
/// <IPermission class="System.Diagnostics.PerformanceCounterPermission, System, Version=2.0.3600.0, Culture=neutral, PublicKeyToken=b77a5c561934e089" version="1" Unrestricted="true" />
/// </PermissionSet>
public TcpClient EndAcceptTcpClient(IAsyncResult asyncResult)
{
if (Logging.On)
{
Logging.Enter(Logging.Sockets, this, "EndAcceptTcpClient", null);
}
if (asyncResult == null)
{
throw new ArgumentNullException("asyncResult");
}
LazyAsyncResult lazyResult = asyncResult as LazyAsyncResult;
Socket asyncSocket = (lazyResult == null) ? null : (lazyResult.AsyncObject as Socket);
if (asyncSocket == null)
{
throw new ArgumentException(SR.GetString("net_io_invalidasyncresult"), "asyncResult");
}
Socket socket = asyncSocket.EndAccept(asyncResult);
if (Logging.On)
{
Logging.Exit(Logging.Sockets, this, "EndAcceptTcpClient", socket);
}
return new TcpClient(socket);
}
/// <summary>Begins an asynchronous operation to accept an incoming connection attempt.</summary>
/// <returns>An <see cref="T:System.IAsyncResult" /> that references the asynchronous creation of the <see cref="T:System.Net.Sockets.TcpClient" />.</returns>
/// <param name="callback">An <see cref="T:System.AsyncCallback" /> delegate that references the method to invoke when the operation is complete.</param>
/// <param name="state">A user-defined object containing information about the accept operation. This object is passed to the <paramref name="callback" /> delegate when the operation is complete.</param>
/// <exception cref="T:System.Net.Sockets.SocketException">An error occurred while attempting to access the socket. See the Remarks section for more information. </exception>
/// <exception cref="T:System.ObjectDisposedException">The <see cref="T:System.Net.Sockets.Socket" /> has been closed. </exception>
/// <PermissionSet>
/// <IPermission class="System.Security.Permissions.EnvironmentPermission, mscorlib, Version=2.0.3600.0, Culture=neutral, PublicKeyToken=b77a5c561934e089" version="1" Unrestricted="true" />
/// <IPermission class="System.Security.Permissions.FileIOPermission, mscorlib, Version=2.0.3600.0, Culture=neutral, PublicKeyToken=b77a5c561934e089" version="1" Unrestricted="true" />
/// <IPermission class="System.Security.Permissions.SecurityPermission, mscorlib, Version=2.0.3600.0, Culture=neutral, PublicKeyToken=b77a5c561934e089" version="1" Flags="UnmanagedCode, ControlEvidence" />
/// <IPermission class="System.Diagnostics.PerformanceCounterPermission, System, Version=2.0.3600.0, Culture=neutral, PublicKeyToken=b77a5c561934e089" version="1" Unrestricted="true" />
/// </PermissionSet>
[HostProtection(SecurityAction.LinkDemand, ExternalThreading = true)]
public IAsyncResult BeginAcceptTcpClient(AsyncCallback callback, object state)
{
if (Logging.On)
{
Logging.Enter(Logging.Sockets, this, "BeginAcceptTcpClient", null);
}
if (!m_Active)
{
throw new InvalidOperationException(SR.GetString("net_stopped"));
}
IAsyncResult result = m_ServerSocket.BeginAccept(callback, state);
if (Logging.On)
{
Logging.Exit(Logging.Sockets, this, "BeginAcceptTcpClient", null);
}
return result;
}
和套接字.cs反编译。
/// <summary>Asynchronously accepts an incoming connection attempt and creates a new <see cref="T:System.Net.Sockets.Socket" /> to handle remote host communication.</summary>
/// <returns>A <see cref="T:System.Net.Sockets.Socket" /> to handle communication with the remote host.</returns>
/// <param name="asyncResult">An <see cref="T:System.IAsyncResult" /> that stores state information for this asynchronous operation as well as any user defined data. </param>
/// <exception cref="T:System.ArgumentNullException">
/// <paramref name="asyncResult" /> is null. </exception>
/// <exception cref="T:System.ArgumentException">
/// <paramref name="asyncResult" /> was not created by a call to <see cref="M:System.Net.Sockets.Socket.BeginAccept(System.AsyncCallback,System.Object)" />. </exception>
/// <exception cref="T:System.Net.Sockets.SocketException">An error occurred when attempting to access the socket. See the Remarks section for more information. </exception>
/// <exception cref="T:System.ObjectDisposedException">The <see cref="T:System.Net.Sockets.Socket" /> has been closed. </exception>
/// <exception cref="T:System.InvalidOperationException">
/// <see cref="M:System.Net.Sockets.Socket.EndAccept(System.IAsyncResult)" /> method was previously called. </exception>
/// <exception cref="T:System.NotSupportedException">Windows NT is required for this method. </exception>
/// <PermissionSet>
/// <IPermission class="System.Security.Permissions.EnvironmentPermission, mscorlib, Version=2.0.3600.0, Culture=neutral, PublicKeyToken=b77a5c561934e089" version="1" Unrestricted="true" />
/// <IPermission class="System.Security.Permissions.FileIOPermission, mscorlib, Version=2.0.3600.0, Culture=neutral, PublicKeyToken=b77a5c561934e089" version="1" Unrestricted="true" />
/// <IPermission class="System.Security.Permissions.SecurityPermission, mscorlib, Version=2.0.3600.0, Culture=neutral, PublicKeyToken=b77a5c561934e089" version="1" Flags="UnmanagedCode, ControlEvidence" />
/// <IPermission class="System.Diagnostics.PerformanceCounterPermission, System, Version=2.0.3600.0, Culture=neutral, PublicKeyToken=b77a5c561934e089" version="1" Unrestricted="true" />
/// </PermissionSet>
public Socket EndAccept(IAsyncResult asyncResult)
{
if (s_LoggingEnabled)
{
Logging.Enter(Logging.Sockets, this, "EndAccept", asyncResult);
}
if (CleanedUp)
{
throw new ObjectDisposedException(GetType().FullName);
}
byte[] buffer;
int bytesTransferred;
if (asyncResult != null && asyncResult is AcceptOverlappedAsyncResult)
{
return EndAccept(out buffer, out bytesTransferred, asyncResult);
}
if (asyncResult == null)
{
throw new ArgumentNullException("asyncResult");
}
AcceptAsyncResult castedAsyncResult = asyncResult as AcceptAsyncResult;
if (castedAsyncResult == null || castedAsyncResult.AsyncObject != this)
{
throw new ArgumentException(SR.GetString("net_io_invalidasyncresult"), "asyncResult");
}
if (castedAsyncResult.EndCalled)
{
throw new InvalidOperationException(SR.GetString("net_io_invalidendcall", "EndAccept"));
}
object result = castedAsyncResult.InternalWaitForCompletion();
castedAsyncResult.EndCalled = true;
Exception exception = result as Exception;
if (exception != null)
{
throw exception;
}
if (castedAsyncResult.ErrorCode != 0)
{
SocketException socketException = new SocketException(castedAsyncResult.ErrorCode);
UpdateStatusAfterSocketError(socketException);
if (s_LoggingEnabled)
{
Logging.Exception(Logging.Sockets, this, "EndAccept", socketException);
}
throw socketException;
}
Socket acceptedSocket = (Socket)result;
if (s_LoggingEnabled)
{
Logging.PrintInfo(Logging.Sockets, acceptedSocket, SR.GetString("net_log_socket_accepted", acceptedSocket.RemoteEndPoint, acceptedSocket.LocalEndPoint));
Logging.Exit(Logging.Sockets, this, "EndAccept", result);
}
return acceptedSocket;
}
似乎 AcceptTcpClientAsync(( 在内部使用了类似 BeginAccept(( 和 EndAccept(( 的东西。在 Socket 中.cs您可以查看 CleanedUp 是否为 true 抛出 ObjectDisposedException,这意味着侦听套接字已关闭。因此,关闭侦听套接字会导致 AcceptTcpClientAsync(( 抛出 ObjectDisposedException。
namespace TestTcpListenStop {
class Program {
static TcpListener listner;
static void Main(string[] args) {
for (int i = 0; i < 100; ++i) {
StartStopTest();
}
Console.ReadKey();
return;
}
static void StartStopTest() {
// start listner
listner = new TcpListener(IPAddress.Any, 17000);
listner.Start();
// start accept
Task tk = AcceptAsync();
// do other things
Task.Delay(1).Wait();
// close listen socket
listner.Stop();
tk.Wait();
return;
}
static async Task AcceptAsync() {
Console.WriteLine("Accepting client...");
TcpClient client;
while (true) {
try {
// Closing listen socket causes
// AcceptTcpClientAsync() throw ObjectDisposedException
client = await listner.AcceptTcpClientAsync().ConfigureAwait(false);
Console.WriteLine("A client has been accepted.");
}
catch (ObjectDisposedException) {
Console.WriteLine("This exception means listening socket closed.");
break;
}
// we just close.
client.Client.Shutdown(SocketShutdown.Both);
client.Close();
}
Console.WriteLine("AcceptAsync() terminated.");
}
}
}
https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.task.wait?view=net-5.0
取消取消令牌取消令牌对正在运行的任务没有影响,除非它也已传递取消令牌并准备处理取消。将 cancelToken 对象传递给此方法只是允许取消等待。
而且我认为使用取消令牌实际上不会阻止AcceptTcpClientAsync((。我们只是取消等待,而不是 AcceptTcpClientAsync((,因为 AcceptTcpClientAsync(( 没有接收取消令牌作为参数。只有关闭侦听套接字才能取消 AcceptTcpClientAsync((。请参阅来自 msdn 的以下内容。
public class Example {
public static void Main() {
CancellationTokenSource ts = new CancellationTokenSource();
Task t = Task.Run(() => {
Console.WriteLine("Calling Cancel...");
ts.Cancel();
Task.Delay(5000).Wait();
Console.WriteLine("Task ended delay...");
});
try {
Console.WriteLine("About to wait for the task to complete...");
t.Wait(ts.Token);
}
catch (OperationCanceledException e) {
Console.WriteLine("{0}: The wait has been canceled. Task status: {1:G}",
e.GetType().Name, t.Status);
Thread.Sleep(6000);
Console.WriteLine("After sleeping, the task status: {0:G}", t.Status);
}
ts.Dispose();
}
}
// The example displays output like the following:
// About to wait for the task to complete...
// Calling Cancel...
// OperationCanceledException: The wait has been canceled. Task status: Running
// Task ended delay...
// After sleeping, the task status: RanToCompletion