TPL DataFlow Workflow
本文关键字:Workflow DataFlow TPL | 更新日期: 2023-09-27 18:08:34
我刚刚开始阅读TPL数据流,这对我来说真的很困惑。我读过很多关于这个话题的文章,但我不能很容易地消化它。可能是很难,也可能是我还没有开始领会这个意思。
我开始研究这个的原因是我想实现一个并行任务可以按顺序运行的场景,并发现TPL数据流可以这样使用。
我正在练习TPL和TPL数据流,我处于非常初学者的水平,所以我需要专家的帮助,他们可以指导我走向正确的方向。在我编写的测试方法中,我做了以下事情,
private void btnTPLDataFlow_Click(object sender, EventArgs e)
{
Stopwatch watch = new Stopwatch();
watch.Start();
txtOutput.Clear();
ExecutionDataflowBlockOptions execOptions = new ExecutionDataflowBlockOptions();
execOptions.MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded;
ActionBlock<string> actionBlock = new ActionBlock<string>(async v =>
{
await Task.Delay(200);
await Task.Factory.StartNew(
() => txtOutput.Text += v + Environment.NewLine,
CancellationToken.None,
TaskCreationOptions.None,
scheduler
);
}, execOptions);
for (int i = 1; i < 101; i++)
{
actionBlock.Post(i.ToString());
}
actionBlock.Complete();
watch.Stop();
lblTPLDataFlow.Text = Convert.ToString(watch.ElapsedMilliseconds / 1000);
}
现在这个过程是并行的,并且都是异步的(不冻结我的UI),但是生成的输出不是按顺序的,而我已经读到TPL数据流默认情况下保持元素的顺序。所以我的猜测是,然后我创建的任务是罪魁祸首,它没有以正确的顺序输出字符串。我说的对吗?
如果是这种情况,那么我如何使其异步和顺序?
我试图分离代码并试图将代码分发到不同的方法中,但我的这个尝试失败了,因为只有字符串输出到文本框,没有其他任何事情发生。
private async void btnTPLDataFlow_Click(object sender, EventArgs e)
{
Stopwatch watch = new Stopwatch();
watch.Start();
await TPLDataFlowOperation();
watch.Stop();
lblTPLDataFlow.Text = Convert.ToString(watch.ElapsedMilliseconds / 1000);
}
public async Task TPLDataFlowOperation()
{
var actionBlock = new ActionBlock<int>(async values => txtOutput.Text += await ProcessValues(values) + Environment.NewLine,
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded, TaskScheduler = scheduler });
for (int i = 1; i < 101; i++)
{
actionBlock.Post(i);
}
actionBlock.Complete();
await actionBlock.Completion;
}
private async Task<string> ProcessValues(int i)
{
await Task.Delay(200);
return "Test " + i;
}
我知道我写了一段糟糕的代码,但这是我第一次尝试使用TPL数据流
我如何使它异步和有序?
这有点矛盾。你可以让并发任务按顺序开始,但是你不能保证它们会按顺序运行或完成。
让我们检查一下你的代码,看看发生了什么。
首先,您选择了DataflowBlockOptions.Unbounded
。这告诉TPL Dataflow,它不应该限制允许并发运行的任务数量。因此,您的每个任务将按顺序在几乎相同的时间开始。
异步操作从await Task.Delay(200)
开始。这将导致你的方法被挂起,然后在大约 200毫秒后恢复。但是,此延迟不是精确的,并且在每次调用之间会有所不同。此外,在延迟之后恢复代码的机制可能会花费可变的时间。由于实际延迟的这种随机变化,那么下一个要运行的代码位现在是而不是顺序-导致您看到的差异。
您可能会觉得这个例子很有趣。这是一个控制台应用程序,可以简化一些操作。
class Program
{
static void Main(string[] args)
{
OutputNumbersWithDataflow();
OutputNumbersWithParallelLinq();
Console.ReadLine();
}
private static async Task HandleStringAsync(string s)
{
await Task.Delay(200);
Console.WriteLine("Handled {0}.", s);
}
private static void OutputNumbersWithDataflow()
{
var block = new ActionBlock<string>(
HandleStringAsync,
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
for (int i = 0; i < 20; i++)
{
block.Post(i.ToString());
}
block.Complete();
block.Completion.Wait();
}
private static string HandleString(string s)
{
// Perform some computation on s...
Thread.Sleep(200);
return s;
}
private static void OutputNumbersWithParallelLinq()
{
var myNumbers = Enumerable.Range(0, 20).AsParallel()
.AsOrdered()
.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
.WithMergeOptions(ParallelMergeOptions.NotBuffered);
var processed = from i in myNumbers
select HandleString(i.ToString());
foreach (var s in processed)
{
Console.WriteLine(s);
}
}
}
第一组数字是使用与您的方法非常相似的方法计算的-使用TPL数据流。这些数字顺序乱了。
OutputNumbersWithParallelLinq()
输出的第二组数字根本不使用Dataflow。它依赖于。net中内置的Parallel LINQ特性。这在后台线程上运行我的HandleString()
方法,但是将数据按顺序保存到结束。
这里的限制是PLINQ不允许您提供异步方法。(好吧,你可以,但它不会给你想要的行为。)HandleString()
是传统的同步方法;它只是在后台线程中执行。
这里有一个更复杂的数据流示例,它保留了正确的顺序:
private static void OutputNumbersWithDataflowTransformBlock()
{
Random r = new Random();
var transformBlock = new TransformBlock<string, string>(
async s =>
{
// Make the delay extra random, just to be sure.
await Task.Delay(160 + r.Next(80));
return s;
},
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
// For a GUI application you should also set the
// scheduler here to make sure the output happens
// on the correct thread.
var outputBlock = new ActionBlock<string>(
s => Console.WriteLine("Handled {0}.", s),
new ExecutionDataflowBlockOptions
{
SingleProducerConstrained = true,
MaxDegreeOfParallelism = 1
});
transformBlock.LinkTo(outputBlock, new DataflowLinkOptions { PropagateCompletion = true });
for (int i = 0; i < 20; i++)
{
transformBlock.Post(i.ToString());
}
transformBlock.Complete();
outputBlock.Completion.Wait();
}