Microsoft TPL数据流-同步处理相关请求
本文关键字:请求 处理 同步 TPL 数据流 Microsoft | 更新日期: 2023-09-27 18:12:03
我提前为标题道歉,但这是我能想到的最好的描述动作的方式。
需求是处理消息总线的请求。传入的请求可能与关联或分组这些请求的id有关。我想要的行为是让请求流同步处理相关的id。然而,不同的id可以异步处理。
我正在使用concurrentdictionary来跟踪正在处理的请求和链接中的谓词。
这是为了提供对相关请求的同步处理。
然而,我得到的行为是,第一个请求得到处理,第二个请求被丢弃。
我已经附上了控制台应用程序的示例代码来模拟这个问题。
任何指导或反馈都将不胜感激。
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace ConsoleApplication2
{
class Program
{
static void Main(string[] args)
{
var requestTracker = new ConcurrentDictionary<string, string>();
var bufferBlock = new BufferBlock<Request>();
var actionBlock = new ActionBlock<Request>(x =>
{
Console.WriteLine("processing item {0}",x.Name);
Thread.Sleep(5000);
string itemOut = null;
requestTracker.TryRemove(x.Id, out itemOut);
});
bufferBlock.LinkTo(actionBlock, x => requestTracker.TryAdd(x.Id,x.Name));
var publisher = Task.Run(() =>
{
var request = new Request("item_1", "first item");
bufferBlock.SendAsync(request);
var request_1 = new Request("item_1", "second item");
bufferBlock.SendAsync(request_1);
});
publisher.Wait();
Console.ReadLine();
}
}
public class Request
{
public Request(string id, string name)
{
this.Id = id;
this.Name = name;
}
public string Id { get; set; }
public string Name { get; set; }
}
}
-
你说你想并行处理一些请求(至少我认为这就是你所说的"异步"的意思(,但默认情况下
ActionBlock
不是并行的。要更改此设置,请设置MaxDegreeOfParallelism
。 -
您正尝试使用
TryAdd()
作为过滤器,但由于以下两个原因无法使用:- 过滤器只调用一次,不会自动重试或类似的操作。这意味着,如果一个项目没有通过,它就永远不会通过,即使在阻止它的项目完成之后也是如此
- 如果一个项目被卡在块的输出队列中,那么没有其他项目会从该块中出来。这可能会显著降低并行度,即使您以某种方式解决了前一个问题
-
我认为这里最简单的解决方案是为每组设置一个块,这样,每组的项目将按顺序处理,但不同组的项目将并行处理。在代码中,它可能看起来像:
var processingBlocks = new Dictionary<string, ActionBlock<Request>>(); var splitterBlock = new ActionBlock<Request>(request => { ActionBlock<Request> processingBlock; if (!processingBlocks.TryGetValue(request.Id, out processingBlock)) { processingBlock = processingBlocks[request.Id] = new ActionBlock<Request>(r => /* process the request here */); } processingBlock.Post(request); });
这种方法的问题在于,组的处理块永远不会消失。如果你负担不起(这是内存泄漏(,因为你将有大量的组,那么I3arnon建议的哈希方法是可行的。
我相信这是因为您的LinkTo()
设置不正确。通过使用LinkTo()
并将函数作为参数传递,可以在条件中添加。所以这行:
bufferBlock.LinkTo(actionBlock, x => requestTracker.TryAdd(x.Id, x.Name));
本质上是说,如果你能够添加到并发字典中,就把数据从bufferBlock传递到actionBlock,这不一定有意义(至少在你的示例代码中(
相反,您应该将您的bufferBlock链接到没有lambda的actionblock,因为在这种情况下您不需要条件链接(至少根据您的示例代码,我不这么认为(。
此外,看看这个SO问题,看看你应该使用SendAsync()
还是Post()
,因为Post()
可以更容易地处理,只需将数据添加到管道中:TPL数据流,Post((和SendAsync((之间的功能区别是什么。SendAsync将返回一个任务,而Post将根据成功进入管道返回true/false。
因此,为了从根本上找出问题所在,您需要处理块的延续。MSDN上有一个很好的教程,在他们的TPL数据流介绍中:创建一个数据流管道它基本上看起来是这样的:
//link to section
bufferBlock.LinkTo(actionBlock);
//continuations
bufferBlock.Completion.ContinueWith(t =>
{
if(t.IsFaulted) ((IDataFlowBlock).actionBlock).Fault(t.Exception); //send the exception down the pipeline
else actionBlock.Complete(); //tell the next block that we're done with the bufferblock
});
然后,您可以在等待管道时捕获异常(AggregateException
(。您真的需要在实际代码中使用concurrentdictionary进行跟踪吗?因为当它无法添加时,这可能会导致问题,因为当linkto谓词返回false时,它不会将数据传递到管道的下一个块中。