C# 在生产者/使用者中等待多个事件
本文关键字:等待 事件 使用者 生产者 | 更新日期: 2023-09-27 18:37:06
我正在使用生产者/消费者模式实现数据链路层。数据链路层有自己的线程和状态机,用于通过线路(以太网、RS-232...)传输数据链路协议。物理层的接口表示为 System.IO.Stream。另一个线程向数据链接对象写入消息并从中读取消息。
数据链接对象具有空闲状态,必须等待以下四种情况之一:
- 接收到一个字节
- 消息可从网络线程获得
- 保持活动状态计时器已过期
- 网络层取消了所有通信
我很难在不将通信拆分为读/写线程的情况下找出最佳方法(从而显着增加复杂性)。以下是我如何获得 3 分中的 4 分:
// Read a byte from 'stream'. Timeout after 10 sec. Monitor the cancellation token.
stream.ReadTimeout = 10000;
await stream.ReadAsync(buf, 0, 1, cts.Token);
或
BlockingCollection<byte[]> SendQueue = new ...;
...
// Check for a message from network layer. Timeout after 10 seconds.
// Monitor cancellation token.
SendQueue.TryTake(out msg, 10000, cts.Token);
我应该怎么做才能阻塞线程,等待所有四个条件?欢迎所有建议。我没有设置任何体系结构或数据结构。
编辑:********感谢大家的帮助。这是我的解决方案********
首先,我认为没有生产者/消费者队列的异步实现。所以我实现了类似于这个堆栈溢出帖子的东西。
我需要一个外部和内部取消源来停止使用者线程并分别取消中间任务,类似于本文。
byte[] buf = new byte[1];
using (CancellationTokenSource internalTokenSource = new CancellationTokenSource())
{
CancellationToken internalToken = internalTokenSource.Token;
CancellationToken stopToken = stopTokenSource.Token;
using (CancellationTokenSource linkedCts =
CancellationTokenSource.CreateLinkedTokenSource(stopToken, internalToken))
{
CancellationToken ct = linkedCts.Token;
Task<int> readTask = m_stream.ReadAsync(buf, 0, 1, ct);
Task<byte[]> msgTask = m_sendQueue.DequeueAsync(ct);
Task keepAliveTask = Task.Delay(m_keepAliveTime, ct);
// Wait for at least one task to complete
await Task.WhenAny(readTask, msgTask, keepAliveTask);
// Next cancel the other tasks
internalTokenSource.Cancel();
try {
await Task.WhenAll(readTask, msgTask, keepAliveTask);
} catch (OperationCanceledException e) {
if (e.CancellationToken == stopToken)
throw;
}
if (msgTask.IsCompleted)
// Send the network layer message
else if (readTask.IsCompleted)
// Process the byte from the physical layer
else
Contract.Assert(keepAliveTask.IsCompleted);
// Send a keep alive message
}
}
我会选择你的选项二,等待 4 个条件中的任何一个发生。假设您已经有 4 个任务作为可等待的方法:
var task1 = WaitForByteReceivedAsync();
var task2 = WaitForMessageAvailableAsync();
var task3 = WaitForKeepAliveTimerAsync();
var task4 = WaitForCommunicationCancelledAsync();
// now gather them
IEnumerable<Task<bool>> theTasks = new List<IEnumerable<Task<bool>>>{
task1, task2, task3, task4
};
// Wait for any of the things to complete
var result = await Task.WhenAny(theTasks);
上面的代码将在第一个任务完成后立即恢复,并忽略其他 3 个任务。
注意:
在WhenAny的文档中,它说:
返回的任务将始终以 RanToComplete(RanToComplete)状态结束,其结果设置为要完成的第一个任务。即使要完成的第一个任务以"已取消"或"故障"状态结束,也是如此。
因此,在相信发生的事情之前,请务必进行最后的检查:
if(result.Result.Result == true)
... // First Result is the task, the second is the bool that the task returns
在这种情况下,我只会使用取消令牌进行取消。像保持活动状态计时器这样的重复超时最好表示为计时器。
因此,我会将其建模为三个可取消的任务。一、取消令牌:
网络层取消了所有通信
CancellationToken token = ...;
然后,三个并发操作:
接收到一个字节
var readByteTask = stream.ReadAsync(buf, 0, 1, token);
保持活动状态计时器已过期
var keepAliveTimerTask = Task.Delay(TimeSpan.FromSeconds(10), token);
消息可从网络线程获得
这个有点棘手。您当前的代码使用 BlockingCollection<T>
,它不兼容异步。我建议切换到TPL Dataflow的BufferBlock<T>
或我自己的AsyncProducerConsumerQueue<T>
,其中任何一个都可以用作异步兼容的生产者/消费者队列(这意味着生产者可以是同步或异步的,消费者可以是同步或异步的)。
BufferBlock<byte[]> SendQueue = new ...;
...
var messageTask = SendQueue.ReceiveAsync(token);
然后,您可以使用Task.WhenAny
来确定已完成以下哪些任务:
var completedTask = await Task.WhenAny(readByteTask, keepAliveTimerTask, messageTask);
现在,您可以通过将completedTask
与其他进行比较并对其进行await
来检索结果:
if (completedTask == readByteTask)
{
// Throw an exception if there was a read error or cancellation.
await readByteTask;
var byte = buf[0];
...
// Continue reading
readByteTask = stream.ReadAsync(buf, 0, 1, token);
}
else if (completedTask == keepAliveTimerTask)
{
// Throw an exception if there was a cancellation.
await keepAliveTimerTask;
...
// Restart keepalive timer.
keepAliveTimerTask = Task.Delay(TimeSpan.FromSeconds(10), token);
}
else if (completedTask == messageTask)
{
// Throw an exception if there was a cancellation (or the SendQueue was marked as completed)
byte[] message = await messageTask;
...
// Continue reading
messageTask = SendQueue.ReceiveAsync(token);
}
取消读取后,您将无法知道数据是否已读取。 取消和读取彼此之间不是原子的。这种方法仅在取消后关闭直播时才有效。
队列方法更好。您可以创建一个链接CancellationTokenSource
,该随时被取消。不是传递cts.Token
而是传递您控制的令牌。
然后,您可以根据时间、另一个令牌和您喜欢的任何其他事件向该令牌发出信号。如果使用内置超时,队列将在内部执行相同的操作,以将传入令牌与超时链接。