ActionBlock有时似乎忽略了MaxDegreeOfParallelism

本文关键字:MaxDegreeOfParallelism ActionBlock | 更新日期: 2023-09-27 18:22:03

当发布到ActionBlock时,我们看到了意外的行为,在ActionBlock中,即使MaxDegreeOfParallelism为1,并行处理似乎也在发生。下面是场景。

发布到ActionBlock的类如下所示:

public class ByteHandler {
...
   public ByteHandler(ByteHandlingCallback delegateCallback){
       _byteHandlingActionBlock = new ActionBlock<byte[]>(bytes => delegateCallback.HandleBytes(bytes));
   }

   public void HandleBytes(byte[] bytes){
       _byteHandlingActionBlock.Post(bytes);
   }

在下游,我们将字节反序列化为对象,并根据它们的类型将这些对象(我们称之为Notifications)传递给处理程序:

public class NotificationHandler{
   private readonly Dictionary<string, AutoResetEvent> _secondNoticeReceivedEvents;
   public void HandleInitialNotification(InitialNotification notification){
       var secondNoticeReceivedEvent = new AutoResetEvent(false);
       if (!_secondNoticeReceivedEvents.TryAdd(notification.ID, secondNoticeReceivedEvent)) return;
       DoSomethingDownstream(notification);
       if (secondNoticeReceivedEvent.WaitOne(_timeout))
            DoSomethingElseDownstream();
       else
            throw new Exception("Second notification not received within timeout!");
       _secondNoticeReceivedEvents.TryRemove(notification.ID, out secondNoticeReceivedEvent);
   }
   public void HandleSecondNotification(SecondNotification notification){
       AutoResetEvent secondNoticeReceivedEvent;
       if (_secondNoticeReceivedEvents.TryRemove(notification.ID, out secondNoticeReceivedEvent))
            secondNoticeReceivedEvent.Set();
   }

此处理程序有一个致命错误:InitialNotifications在其相应的SecondNotifications之前进入,但HandleInitialNotification在退出之前等待HandleSecondNotification,因此线程永远不会到达HandleSecondNotification。

通常,我们会看到HandleInitialNotification被阻塞,直到等待HandleSecondNotification超时,然后在同一线程上继续执行挂起的SecondNotification。这是我们通常在日志中看到的:

2013-07-05 13:27:25,755 [13] INFO  Received InitialNotification for: XX
2013-07-05 13:27:35,758 [13] WARN  Second notification not not received for XX within timeout!
2013-07-05 13:27:35,761 [13] INFO  Received SecondNotification for: XX

这不是代码的工作方式,但考虑到它的编写方式,它应该总是超时等待SecondNotification。然而,我们偶尔也会看到HandleInitialNotification在超时之前完成,HandleSecondNotification在另一个线程上被及时处理:

2013-07-05 13:38:13,232 [56] INFO  Received InitialNotification for: YY
2013-07-05 13:38:13,258 [11] INFO  Received SecondNotification for: YY

由于我们使用默认的ActionBlock,MaxDegreeOfParallelism应该是1。那么,当发布到ActionBlock的原始线程正在阻塞时,第二个线程(源自ActionBlock)是如何接收SecondNotification的呢?

ActionBlock有时似乎忽略了MaxDegreeOfParallelism

请注意,您的超时/警告几乎正好在10秒内发生,而所有其他事件都在毫秒内发生。很可能您正在以某种方式同步调用通知处理程序方法(HandleInitialNotification&HandleSecondNotification),以便在操作块获取其缓冲区中的下一项之前,操作块等待HandleInitialNotification中的超时完成。您需要异步执行超时等待。

相关文章:
  • 没有找到相关文章