保证变压器块输出序列

本文关键字:输出 变压器 | 更新日期: 2023-09-27 17:56:47

来自 TPL 文档

ActionBlock<TInput> 一样,TransformBlock<TInput,TOutput>默认值 一次处理一条消息,保持严格的FIFO排序。

但是,在多线程场景中,即如果多个线程"同时"执行SendAsync,然后通过调用ReceiveAsync"等待"结果,我们如何保证将某些内容发布到TransformBlock<TInput,TOutput>中的线程实际上得到了它正在等待的预期结果?

在我的实验中,似乎"保证"我想要的结果的方法是添加选项BoundedCapacity = 1 .至少线程在发送和接收时仍然不会被阻止。

如果我不这样做,某些线程将收到用于另一个线程的结果。

在这个特定用例中,这是正确的方法吗?

以下是一些说明我担忧的代码:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace ConsoleTransformBlock
{
    class Program
    {
        private readonly static TransformBlock<int, int> _pipeline;
        static Program()
        {
            _pipeline = new TransformBlock<int, int>(async (input) =>
            {
                await Task.Delay(RandomGen2.Next(5, 100)).ConfigureAwait(false);
                return input;
            }, 
            new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 }); // this is the fix???
        }

        static void Main(string[] args)
        {
            var dop = System.Environment.ProcessorCount;// 8-core

            Parallel.For(0, dop, new ParallelOptions() { MaxDegreeOfParallelism = dop },
                 (d) =>
                 {
                     DoStuff().Wait();
                 });
            Console.WriteLine("Parallel For Done ...");
            var tasks = new Task[dop];
            for (var i = 0; i < dop; i++)
            {
             var temp = i;
             tasks[temp] = Task.Factory.StartNew
                (async () => await DoStuff().ConfigureAwait(false),
                CancellationToken.None,
                TaskCreationOptions.LongRunning,
                TaskScheduler.Default).Unwrap();
            }
            Task.WaitAll(tasks);

        }
        private static async Task DoStuff()
        {
            for (var i = 0; i < 100; i++)
            {
                var temp = RandomGen2.Next();
                await _pipeline.SendAsync(temp).ConfigureAwait(false);
                Console.WriteLine("Just sent {0}, now waiting {1}...", new object[] { temp, System.Threading.Thread.CurrentThread.ManagedThreadId });
                await Task.Delay(RandomGen2.Next(5, 50)).ConfigureAwait(false);
                var result = await _pipeline.ReceiveAsync().ConfigureAwait(false);
                Console.WriteLine("Received {0}... {1}", new object[] { result, System.Threading.Thread.CurrentThread.ManagedThreadId });
                if (result != temp)
                {
                    var error = string.Format("************** Sent {0} But Received {1}", temp, result, System.Threading.Thread.CurrentThread.ManagedThreadId);
                    Console.WriteLine(error);
                    break;
                }
            }
        }
        /// <summary>
        /// Thread-Safe Random Generator
        /// </summary>
        public static class RandomGen2
        {
            private static Random _global = new Random();
            [ThreadStatic]
            private static Random _local;
            public static int Next()
            {
                return Next(0, int.MaxValue);
            }
            public static int Next(int max)
            {
                return Next(0, max);
            }
            public static int Next(int min, int max)
            {
                Random inst = _local;
                if (inst == null)
                {
                    int seed;
                    lock (_global) seed = _global.Next();
                    _local = inst = new Random(seed);
                }
                return inst.Next(min, max);
            }
        }
    }
}

保证变压器块输出序列

TransformBlock已经保持了FIFO顺序。将项目发布到块的顺序是项目从块返回的确切顺序。

如果指定的最大并行度大于 1,则会同时处理多条消息,因此,可能不会按照接收消息的顺序处理消息。但是,从块输出消息的顺序将正确排序。

从数据流(任务并行库)

你可以从这个例子中看到这一点:

private static async Task MainAsync()
{
    var transformBlock = new TransformBlock<int, int>(async input =>
    {
        await Task.Delay(RandomGen2.Next(5, 100));
        return input;
    }, new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 10});
    foreach (var number in Enumerable.Range(0,100))
    {
        await transformBlock.SendAsync(number);
    }
    for (int i = 0; i < 100; i++)
    {
        var result = await transformBlock.ReceiveAsync();
        Console.WriteLine(result);
    }
}

其中订单将订购 0-99。

但是,您似乎想要的是与线程的一些相关性,因此线程会将项目发布到块中,然后接收其结果。这并不适合TPL数据流,TPL数据流应该更像是一个块的管道。你可以用BoundedCapacity = 1破解它,但你可能不应该。