异步代码中的标志、循环和锁定

本文关键字:循环 锁定 标志 代码 异步 | 更新日期: 2023-09-27 18:28:39

我要做的是创建一个'侦听器',它一次侦听几个不同的Tcp端口,并将消息通过管道传送给任何观察者。

伪代码:

private bool _Listen = false;
public void Start()
{
    _Listen = true;
    Task.Factory.StartNew(() => Listen(1);
    Task.Factory.StartNew(() => Listen(2);
}
public void Stop()
{
    _Listen = false;
}
private async void Listen(int port)
{
     var tcp = new TcpClient();
     while(_Listen)
     {
          await tcp.ConnectAsync(ip, port);
          using (/*networkStream, BinaryReader, etc*/)
          {
               while(_Listen)
               {
                   //Read from binary reader and OnNext to IObservable
               }
          }
     }
}

(为简洁起见,我在两个 while 中省略了 try/catch,这两个 while 也检查了标志(

我的问题是:我应该锁定标志吗,如果是这样,它如何与异步/等待位绑定?

异步代码中的标志、循环和锁定

首先,您应该将返回类型更改为 Task,而不是 void。 async void方法本质上是即发即弃,不能等待或取消。它们的存在主要是为了允许创建异步事件处理程序或类似事件的代码。它们绝不应用于正常的异步操作。

协作取消/中止/停止异步操作的 TPL 方法是使用 CancelToken。可以检查令牌的 IsCancelRequest 属性,以查看是否需要取消操作并停止。

更好的是,框架提供的大多数异步方法都接受 CancelToken,因此您可以立即停止它们,而无需等待它们返回。您可以使用 NetworkStream 的 ReadAsync(Byte[]、Int32、Int32、CancelToken(来读取数据,并在有人调用您的 Stop 方法时立即取消。

您可以将代码更改为如下所示的内容:

    CancellationTokenSource _source;
    public void Start()
    {
        _source = new CancellationTokenSource();            
        Task.Factory.StartNew(() => Listen(1, _source.Token),_source.Token);
        Task.Factory.StartNew(() => Listen(2, _source.Token), _source.Token);
    }
    public void Stop()
    {
        _source.Cancel();
    }

    private async Task Listen(int port,CancellationToken token)
    {
        var tcp = new TcpClient();
        while(!token.IsCancellationRequested)
        {
            await tcp.ConnectAsync(ip, port);
            using (var stream=tcp.GetStream())
            {
                ...
                try
                {
                    await stream.ReadAsync(buffer, offset, count, token);
                }
                catch (OperationCanceledException ex)
                {
                    //Handle Cancellation
                }
                ...
            }
        }
    }

您可以在托管线程中的取消中阅读有关取消的更多信息,包括有关如何轮询、注册取消回调、侦听多个令牌等的建议。

try/catch块之所以存在,是因为await如果任务被取消,则会引发异常。您可以通过对 ReadAsync 返回的任务调用 ContinueWith 并检查 IsCanceled 标志来避免这种情况:

    private async Task Listen(int port,CancellationToken token)
    {
        var tcp = new TcpClient();
        while(!token.IsCancellationRequested)
        {
            await tcp.ConnectAsync(ip, port);
            using (var stream=tcp.GetStream())
            {
                ///...
                await stream.ReadAsync(buffer, offset, count, token)
                    .ContinueWith(t =>
                    {
                        if (t.IsCanceled)
                        {
                            //Do some cleanup?
                        }
                        else
                        {
                            //Process the buffer and send notifications
                        }
                    });
                ///...
            }
        }
    }

await现在等待一个简单的Task,该在延续完成时完成

您可能最好一直坚持使用 RX,而不是使用 Task。这是我编写的一些代码,用于使用 RX 连接到 UDP 套接字。

public IObservable<UdpReceiveResult> StreamObserver
(int localPort, TimeSpan? timeout = null)
{

    return Linq.Observable.Create<UdpReceiveResult>(observer =>
    {
        UdpClient client = new UdpClient(localPort);
        var o = Linq.Observable.Defer(() => client.ReceiveAsync().ToObservable());
        IDisposable subscription = null;
        if ((timeout != null)) {
            subscription = Linq.Observable.Timeout(o.Repeat(), timeout.Value).Subscribe(observer);
        } else {
            subscription = o.Repeat().Subscribe(observer);
        }
        return Disposable.Create(() =>
        {
            client.Close();
            subscription.Dispose();
            // Seems to take some time to close a socket so
            // when we resubscribe there is an error. I
            // really do NOT like this hack. TODO see if
            // this can be improved
            Thread.Sleep(TimeSpan.FromMilliseconds(200));
        });
    });
}

我应该锁定标志吗,如果是这样,它如何与异步/等待位绑定?

您需要以某种方式同步对标志的访问。如果不这样做,则允许编译器进行以下优化:

bool compilerGeneratedLocal = _Listen;
while (compilerGeneratedLocal)
{
    // body of the loop
}

这会让你的代码出错。

一些选项可以解决这个问题:

  1. bool标志标记为volatile。这将确保始终读取标志的当前值。
  2. 使用CancellationToken(如Panagiotis Kanavos建议的那样(。这将确保以线程安全的方式访问基础标志。它还具有许多异步方法支持的优点 CancellationToken ,因此您也可以取消它们。

当您可能处理多个线程时,某种形式的事件(例如ManualResetEventSlim(将是一个更明显的选择。

private ManualResetEventSlim _Listen;
public void Start()
{
    _Listen = new ManualResetEventSlim(true);
    Task.Factory.StartNew(() => Listen(1);
    Task.Factory.StartNew(() => Listen(2);
}
public void Stop()
{
    _Listen.Reset();
}
private async void Listen(int port)
{
     var tcp = new TcpClient();
     while(_Listen.IsSet)
     {