TPL数据流块

本文关键字:数据流 TPL | 更新日期: 2023-09-27 18:19:10

问题:为什么使用WriteOnceBlock(或BufferBlock)从另一个BufferBlock<Action>(返回答案发生在发布的Action)导致死锁(在此代码中)?

我认为类中的方法可以被认为是我们发送给对象的消息(就像Alan Kay提出的关于OOP的原始观点一样)。因此,我编写了这个通用的Actor类,它有助于将普通对象转换为Actor(当然,由于可变性和其他原因,这里有许多看不见的漏洞,但这不是这里的主要关注点)。

我们有这些定义:

public class Actor<T>
{
    private readonly T _processor;
    private readonly BufferBlock<Action<T>> _messageBox = new BufferBlock<Action<T>>();
    public Actor(T processor)
    {
        _processor = processor;
        Run();
    }
    public event Action<T> Send
    {
        add { _messageBox.Post(value); }
        remove { }
    }
    private async void Run()
    {
        while (true)
        {
            var action = await _messageBox.ReceiveAsync();
            action(_processor);
        }
    }
}
public interface IIdGenerator
{
    long Next();
}

;为什么这段代码工作:

static void Main(string[] args)
{
    var idGenerator1 = new IdInt64();
    var idServer1 = new Actor<IIdGenerator>(idGenerator1);
    const int n = 1000;
    for (var i = 0; i < n; i++)
    {
        var t = new Task(() =>
        {
            var answer = new WriteOnceBlock<long>(null);
            Action<IIdGenerator> action = x =>
            {
                var buffer = x.Next();
                answer.Post(buffer);
            };
            idServer1.Send += action;
            Trace.WriteLine(answer.Receive());
        }, TaskCreationOptions.LongRunning); // Runs on a separate new thread
        t.Start();
    }
    Console.WriteLine("press any key you like! :)");
    Console.ReadKey();
    Trace.Flush();
}

这段代码不能工作:

static void Main(string[] args)
{
    var idGenerator1 = new IdInt64();
    var idServer1 = new Actor<IIdGenerator>(idGenerator1);
    const int n = 1000;
    for (var i = 0; i < n; i++)
    {
        var t = new Task(() =>
        {
            var answer = new WriteOnceBlock<long>(null);
            Action<IIdGenerator> action = x =>
            {
                var buffer = x.Next();
                answer.Post(buffer);
            };
            idServer1.Send += action;
            Trace.WriteLine(answer.Receive());
        }, TaskCreationOptions.PreferFairness); // Runs and is managed by Task Scheduler 
        t.Start();
    }
    Console.WriteLine("press any key you like! :)");
    Console.ReadKey();
    Trace.Flush();
}

不同的TaskCreationOptions在这里用来创建Task s。也许我在这里对TPL数据流的概念是错误的,刚刚开始使用它(一个[ThreadStatic]隐藏在某个地方?)。

TPL数据流块

您的代码的问题是这一部分:answer.Receive()。当你将它移动到动作内部时,死锁不会发生:

var t = new Task(() =>
{
    var answer = new WriteOnceBlock<long>(null);
    Action<IIdGenerator> action = x =>
    {
        var buffer = x.Next();
        answer.Post(buffer);
        Trace.WriteLine(answer.Receive());
    };
    idServer1.Send += action;
});
t.Start();

为什么呢?与await answer.ReceiveAsnyc();相反,answer.Receive();阻塞线程,直到返回答案。当您使用TaskCreationOptions.LongRunning时,每个任务都有自己的线程,所以没有问题,但是没有它(TaskCreationOptions.PreferFairness无关紧要),所有线程池线程都在忙着等待,所以一切都慢得多。它实际上不会死锁,正如您在使用15而不是1000时所看到的那样。

还有其他的解决方案可以帮助理解这个问题:

  • 在原始代码之前使用ThreadPool.SetMinThreads(1000, 0);增加线程池
  • 使用ReceiveAsnyc:

Task.Run(async () =>
{
    var answer = new WriteOnceBlock<long>(null);
    Action<IIdGenerator> action = x =>
    {
        var buffer = x.Next();
        answer.Post(buffer);
    };
    idServer1.Send += action;          
    Trace.WriteLine(await answer.ReceiveAsync());
});