TPL数据流块
本文关键字:数据流 TPL | 更新日期: 2023-09-27 18:19:10
问题:为什么使用WriteOnceBlock
(或BufferBlock
)从另一个BufferBlock<Action>
(返回答案发生在发布的Action
)导致死锁(在此代码中)?
我认为类中的方法可以被认为是我们发送给对象的消息(就像Alan Kay提出的关于OOP的原始观点一样)。因此,我编写了这个通用的Actor
类,它有助于将普通对象转换为Actor
(当然,由于可变性和其他原因,这里有许多看不见的漏洞,但这不是这里的主要关注点)。
我们有这些定义:
public class Actor<T>
{
private readonly T _processor;
private readonly BufferBlock<Action<T>> _messageBox = new BufferBlock<Action<T>>();
public Actor(T processor)
{
_processor = processor;
Run();
}
public event Action<T> Send
{
add { _messageBox.Post(value); }
remove { }
}
private async void Run()
{
while (true)
{
var action = await _messageBox.ReceiveAsync();
action(_processor);
}
}
}
public interface IIdGenerator
{
long Next();
}
;为什么这段代码工作:
static void Main(string[] args)
{
var idGenerator1 = new IdInt64();
var idServer1 = new Actor<IIdGenerator>(idGenerator1);
const int n = 1000;
for (var i = 0; i < n; i++)
{
var t = new Task(() =>
{
var answer = new WriteOnceBlock<long>(null);
Action<IIdGenerator> action = x =>
{
var buffer = x.Next();
answer.Post(buffer);
};
idServer1.Send += action;
Trace.WriteLine(answer.Receive());
}, TaskCreationOptions.LongRunning); // Runs on a separate new thread
t.Start();
}
Console.WriteLine("press any key you like! :)");
Console.ReadKey();
Trace.Flush();
}
这段代码不能工作:
static void Main(string[] args)
{
var idGenerator1 = new IdInt64();
var idServer1 = new Actor<IIdGenerator>(idGenerator1);
const int n = 1000;
for (var i = 0; i < n; i++)
{
var t = new Task(() =>
{
var answer = new WriteOnceBlock<long>(null);
Action<IIdGenerator> action = x =>
{
var buffer = x.Next();
answer.Post(buffer);
};
idServer1.Send += action;
Trace.WriteLine(answer.Receive());
}, TaskCreationOptions.PreferFairness); // Runs and is managed by Task Scheduler
t.Start();
}
Console.WriteLine("press any key you like! :)");
Console.ReadKey();
Trace.Flush();
}
不同的TaskCreationOptions
在这里用来创建Task
s。也许我在这里对TPL数据流的概念是错误的,刚刚开始使用它(一个[ThreadStatic]
隐藏在某个地方?)。
您的代码的问题是这一部分:answer.Receive()
。当你将它移动到动作内部时,死锁不会发生:
var t = new Task(() =>
{
var answer = new WriteOnceBlock<long>(null);
Action<IIdGenerator> action = x =>
{
var buffer = x.Next();
answer.Post(buffer);
Trace.WriteLine(answer.Receive());
};
idServer1.Send += action;
});
t.Start();
为什么呢?与await answer.ReceiveAsnyc();
相反,answer.Receive();
阻塞线程,直到返回答案。当您使用TaskCreationOptions.LongRunning
时,每个任务都有自己的线程,所以没有问题,但是没有它(TaskCreationOptions.PreferFairness
无关紧要),所有线程池线程都在忙着等待,所以一切都慢得多。它实际上不会死锁,正如您在使用15而不是1000时所看到的那样。
还有其他的解决方案可以帮助理解这个问题:
- 在原始代码之前使用
ThreadPool.SetMinThreads(1000, 0);
增加线程池 - 使用
ReceiveAsnyc
:
Task.Run(async () =>
{
var answer = new WriteOnceBlock<long>(null);
Action<IIdGenerator> action = x =>
{
var buffer = x.Next();
answer.Post(buffer);
};
idServer1.Send += action;
Trace.WriteLine(await answer.ReceiveAsync());
});