保证变压器块输出序列
本文关键字:输出 变压器 | 更新日期: 2023-09-27 17:56:47
来自 TPL 文档
与
ActionBlock<TInput>
一样,TransformBlock<TInput,TOutput>
默认值 一次处理一条消息,保持严格的FIFO排序。
但是,在多线程场景中,即如果多个线程"同时"执行SendAsync
,然后通过调用ReceiveAsync
"等待"结果,我们如何保证将某些内容发布到TransformBlock<TInput,TOutput>
中的线程实际上得到了它正在等待的预期结果?
在我的实验中,似乎"保证"我想要的结果的方法是添加选项BoundedCapacity = 1
.至少线程在发送和接收时仍然不会被阻止。
如果我不这样做,某些线程将收到用于另一个线程的结果。
在这个特定用例中,这是正确的方法吗?
以下是一些说明我担忧的代码:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace ConsoleTransformBlock
{
class Program
{
private readonly static TransformBlock<int, int> _pipeline;
static Program()
{
_pipeline = new TransformBlock<int, int>(async (input) =>
{
await Task.Delay(RandomGen2.Next(5, 100)).ConfigureAwait(false);
return input;
},
new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 }); // this is the fix???
}
static void Main(string[] args)
{
var dop = System.Environment.ProcessorCount;// 8-core
Parallel.For(0, dop, new ParallelOptions() { MaxDegreeOfParallelism = dop },
(d) =>
{
DoStuff().Wait();
});
Console.WriteLine("Parallel For Done ...");
var tasks = new Task[dop];
for (var i = 0; i < dop; i++)
{
var temp = i;
tasks[temp] = Task.Factory.StartNew
(async () => await DoStuff().ConfigureAwait(false),
CancellationToken.None,
TaskCreationOptions.LongRunning,
TaskScheduler.Default).Unwrap();
}
Task.WaitAll(tasks);
}
private static async Task DoStuff()
{
for (var i = 0; i < 100; i++)
{
var temp = RandomGen2.Next();
await _pipeline.SendAsync(temp).ConfigureAwait(false);
Console.WriteLine("Just sent {0}, now waiting {1}...", new object[] { temp, System.Threading.Thread.CurrentThread.ManagedThreadId });
await Task.Delay(RandomGen2.Next(5, 50)).ConfigureAwait(false);
var result = await _pipeline.ReceiveAsync().ConfigureAwait(false);
Console.WriteLine("Received {0}... {1}", new object[] { result, System.Threading.Thread.CurrentThread.ManagedThreadId });
if (result != temp)
{
var error = string.Format("************** Sent {0} But Received {1}", temp, result, System.Threading.Thread.CurrentThread.ManagedThreadId);
Console.WriteLine(error);
break;
}
}
}
/// <summary>
/// Thread-Safe Random Generator
/// </summary>
public static class RandomGen2
{
private static Random _global = new Random();
[ThreadStatic]
private static Random _local;
public static int Next()
{
return Next(0, int.MaxValue);
}
public static int Next(int max)
{
return Next(0, max);
}
public static int Next(int min, int max)
{
Random inst = _local;
if (inst == null)
{
int seed;
lock (_global) seed = _global.Next();
_local = inst = new Random(seed);
}
return inst.Next(min, max);
}
}
}
}
TransformBlock
已经保持了FIFO顺序。将项目发布到块的顺序是项目从块返回的确切顺序。
如果指定的最大并行度大于 1,则会同时处理多条消息,因此,可能不会按照接收消息的顺序处理消息。但是,从块输出消息的顺序将正确排序。
从数据流(任务并行库)
你可以从这个例子中看到这一点:
private static async Task MainAsync()
{
var transformBlock = new TransformBlock<int, int>(async input =>
{
await Task.Delay(RandomGen2.Next(5, 100));
return input;
}, new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 10});
foreach (var number in Enumerable.Range(0,100))
{
await transformBlock.SendAsync(number);
}
for (int i = 0; i < 100; i++)
{
var result = await transformBlock.ReceiveAsync();
Console.WriteLine(result);
}
}
其中订单将订购 0-99。
但是,您似乎想要的是与线程的一些相关性,因此线程会将项目发布到块中,然后接收其结果。这并不适合TPL数据流,TPL数据流应该更像是一个块的管道。你可以用BoundedCapacity = 1
破解它,但你可能不应该。