foreach循环中task/await的最佳实践
本文关键字:最佳 await 循环 task foreach | 更新日期: 2023-09-27 18:05:23
我在一个使用task/await的foreach中有一些耗时的代码。它包括从数据库中提取数据,生成html,将其发送到API,并将回复保存到DB。
模型是这样的
List<label> labels = db.labels.ToList();
foreach (var x in list)
{
var myLabels = labels.Where(q => !db.filter.Where(y => x.userid ==y.userid))
.Select(y => y.ID)
.Contains(q.id))
//Render the HTML
//do some fast stuff with objects
List<response> res = await api.sendMessage(object); //POST
//put all the responses in the db
foreach (var r in res)
{
db.responses.add(r);
}
db.SaveChanges();
}
时间方面,生成Html并将其发布到API似乎占用了大部分时间。
理想情况下,如果我能生成下一个项目的HTML,并等待帖子完成,然后再发布下一个项目,那将是伟大的。
也欢迎其他想法。怎么做呢?
我首先想到在foreach
上面添加一个Task
,并等待它完成,然后再进行下一个POST,但是我如何处理最后一个循环…感觉很乱…
您可以并行执行,但每个任务需要不同的上下文。
实体框架不是线程安全的,所以如果你不能在并行任务中使用一个上下文。
var tasks = myLabels.Select( async label=>{
using(var db = new MyDbContext ()){
// do processing...
var response = await api.getresponse();
db.Responses.Add(response);
await db.SaveChangesAsync();
}
});
await Task.WhenAll(tasks);
在这种情况下,所有的任务都是并行运行的,并且每个任务都有自己的上下文。
如果你不为每个任务创建新的上下文,你会得到这个问题上提到的错误实体框架支持并行异步查询吗?
在我看来,这是一个架构问题,而不是代码问题。
你可以把你的工作分成两个独立的部分:
- 从数据库中获取数据并生成HTML
- 发送API请求并保存响应到数据库
你可以并行运行它们,并使用队列来协调:每当你的HTML准备好了,它就被添加到队列中,另一个worker从那里开始,获取HTML并发送给API。
这两个部分也可以以多线程的方式完成,例如,你可以同时处理队列中的多个项目,方法是让一组工作人员在队列中查找要处理的项目。
这是生产者/消费者模式的尖叫:一个生产者生产数据的速度不同于消费者消费数据的速度。一旦生产者不再生产任何东西,它就会通知消费者不再需要任何数据。
MSDN有一个很好的例子,其中几个数据流块被链接在一起:一个块的输出是另一个块的输入。
演练:创建数据流管道
思路如下:
- 创建一个类来生成HTML。
- 这个类有一个System.Threading.Tasks.Dataflow.BufferBlock类的对象
<T
> - 异步过程创建所有HTML输出并等待SendAsync数据到bufferBlock
- 缓冲块实现接口ISourceBlock
<T
>。该类将其作为get属性公开:
代码:
class MyProducer<T>
{
private System.Threading.Tasks.Dataflow.BufferBlock<T> bufferBlock = new BufferBlock<T>();
public ISourceBlock<T> Output {get {return this.bufferBlock;}
public async ProcessAsync()
{
while (somethingToProduce)
{
T producedData = ProduceOutput(...)
await this.bufferBlock.SendAsync(producedData);
}
// no date to send anymore. Mark the output complete:
this.bufferBlock.Complete()
}
}
- 第二个类使用这个ISourceBlock。它将在这个源块等待,直到数据到达并处理它。
- 在异步函数中执行此操作 当没有可用数据时
- 停止
代码:
public class MyConsumer<T>
{
ISourceBlock<T> Source {get; set;}
public async Task ProcessAsync()
{
while (await this.Source.OutputAvailableAsync())
{ // there is input of type T, read it:
var input = await this.Source.ReceiveAsync();
// process input
}
// if here, no more input expected. finish.
}
}
现在把它们放在一起:
private async Task ProduceOutput<T>()
{
var producer = new MyProducer<T>();
var consumer = new MyConsumer<T>() {Source = producer.Output};
var producerTask = Task.Run( () => producer.ProcessAsync());
var consumerTask = Task.Run( () => consumer.ProcessAsync());
// while both tasks are working you can do other things.
// wait until both tasks are finished:
await Task.WhenAll(new Task[] {producerTask, consumerTask});
}
为简单起见,我省略了异常处理和取消。StackOverFlow有关于异常处理和取消任务的文章:
- 保持UI响应使用任务,处理AggregateException
- 取消异步任务或任务列表
这就是我最终使用的:(https://stackoverflow.com/a/25877042/275990)
List<ToSend> sendToAPI = new List<ToSend>();
List<label> labels = db.labels.ToList();
foreach (var x in list) {
var myLabels = labels.Where(q => !db.filter.Where(y => x.userid ==y.userid))
.Select(y => y.ID)
.Contains(q.id))
//Render the HTML
//do some fast stuff with objects
sendToAPI.add(the object with HTML);
}
int maxParallelPOSTs=5;
await TaskHelper.ForEachAsync(sendToAPI, maxParallelPOSTs, async i => {
using (NasContext db2 = new NasContext()) {
List<response> res = await api.sendMessage(i.object); //POST
//put all the responses in the db
foreach (var r in res)
{
db2.responses.add(r);
}
db2.SaveChanges();
}
});
public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body) {
return Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(dop)
select Task.Run(async delegate {
using (partition)
while (partition.MoveNext()) {
await body(partition.Current).ContinueWith(t => {
if (t.Exception != null) {
string problem = t.Exception.ToString();
}
//observe exceptions
});
}
}));
}
基本上让我生成HTML同步,这很好,因为它只需要几秒钟来生成1000,但让我发布和保存到DB异步,与我预定义的线程一样多。在这种情况下,我将发布到Mandrill API,并行发布没有问题。