foreach循环中task/await的最佳实践

本文关键字:最佳 await 循环 task foreach | 更新日期: 2023-09-27 18:05:23

我在一个使用task/await的foreach中有一些耗时的代码。它包括从数据库中提取数据,生成html,将其发送到API,并将回复保存到DB。

模型是这样的

List<label> labels = db.labels.ToList();
foreach (var x in list) 
{
    var myLabels = labels.Where(q => !db.filter.Where(y => x.userid ==y.userid))
                         .Select(y => y.ID)
                         .Contains(q.id))
    //Render the HTML
    //do some fast stuff with objects
    List<response> res = await api.sendMessage(object);  //POST
    //put all the responses in the db
    foreach (var r in res) 
    {
        db.responses.add(r);
    }
    db.SaveChanges();
}

时间方面,生成Html并将其发布到API似乎占用了大部分时间。

理想情况下,如果我能生成下一个项目的HTML,并等待帖子完成,然后再发布下一个项目,那将是伟大的。

也欢迎其他想法。怎么做呢?

我首先想到在foreach上面添加一个Task,并等待它完成,然后再进行下一个POST,但是我如何处理最后一个循环…感觉很乱…

foreach循环中task/await的最佳实践

您可以并行执行,但每个任务需要不同的上下文。

实体框架不是线程安全的,所以如果你不能在并行任务中使用一个上下文。

var tasks = myLabels.Select( async label=>{
    using(var db = new MyDbContext ()){
        // do processing...
        var response = await api.getresponse();
        db.Responses.Add(response);
        await db.SaveChangesAsync();
    } 
});
await Task.WhenAll(tasks);

在这种情况下,所有的任务都是并行运行的,并且每个任务都有自己的上下文。

如果你不为每个任务创建新的上下文,你会得到这个问题上提到的错误实体框架支持并行异步查询吗?

在我看来,这是一个架构问题,而不是代码问题。

你可以把你的工作分成两个独立的部分:

  1. 从数据库中获取数据并生成HTML
  2. 发送API请求并保存响应到数据库

你可以并行运行它们,并使用队列来协调:每当你的HTML准备好了,它就被添加到队列中,另一个worker从那里开始,获取HTML并发送给API。

这两个部分也可以以多线程的方式完成,例如,你可以同时处理队列中的多个项目,方法是让一组工作人员在队列中查找要处理的项目。

这是生产者/消费者模式的尖叫:一个生产者生产数据的速度不同于消费者消费数据的速度。一旦生产者不再生产任何东西,它就会通知消费者不再需要任何数据。

MSDN有一个很好的例子,其中几个数据流块被链接在一起:一个块的输出是另一个块的输入。

演练:创建数据流管道

思路如下:

  • 创建一个类来生成HTML。
  • 这个类有一个System.Threading.Tasks.Dataflow.BufferBlock类的对象<T>
  • 异步过程创建所有HTML输出并等待SendAsync数据到bufferBlock
  • 缓冲块实现接口ISourceBlock <T>。该类将其作为get属性公开:

代码:

class MyProducer<T>
{
    private System.Threading.Tasks.Dataflow.BufferBlock<T> bufferBlock = new BufferBlock<T>();
    public ISourceBlock<T> Output {get {return this.bufferBlock;}
    public async ProcessAsync()
    {
        while (somethingToProduce)
        {
            T producedData = ProduceOutput(...)
            await this.bufferBlock.SendAsync(producedData);
        }
        // no date to send anymore. Mark the output complete:
        this.bufferBlock.Complete()
    }
}
    第二个类使用这个ISourceBlock。它将在这个源块等待,直到数据到达并处理它。
  • 在异步函数中执行此操作
  • 当没有可用数据时
  • 停止

代码:

public class MyConsumer<T>
{
    ISourceBlock<T> Source {get; set;}
    public async Task ProcessAsync()
    {
        while (await this.Source.OutputAvailableAsync())
        {   // there is input of type T, read it:
            var input = await this.Source.ReceiveAsync();
            // process input
        }
        // if here, no more input expected. finish.
    }
}

现在把它们放在一起:

private async Task ProduceOutput<T>()
{
    var producer = new MyProducer<T>();
    var consumer = new MyConsumer<T>() {Source = producer.Output};
    var producerTask = Task.Run( () => producer.ProcessAsync());
    var consumerTask = Task.Run( () => consumer.ProcessAsync());
    // while both tasks are working you can do other things.
    // wait until both tasks are finished:
    await Task.WhenAll(new Task[] {producerTask, consumerTask});
}

为简单起见,我省略了异常处理和取消。StackOverFlow有关于异常处理和取消任务的文章:

  • 保持UI响应使用任务,处理AggregateException
  • 取消异步任务或任务列表

这就是我最终使用的:(https://stackoverflow.com/a/25877042/275990)

List<ToSend> sendToAPI = new List<ToSend>();
List<label> labels = db.labels.ToList();
foreach (var x in list) {
    var myLabels = labels.Where(q => !db.filter.Where(y => x.userid ==y.userid))
                         .Select(y => y.ID)
                         .Contains(q.id))
    //Render the HTML
    //do some fast stuff with objects
    sendToAPI.add(the object with HTML);
}
int maxParallelPOSTs=5;
await TaskHelper.ForEachAsync(sendToAPI, maxParallelPOSTs, async i => {
    using (NasContext db2 = new NasContext()) {
        List<response> res = await api.sendMessage(i.object);  //POST
        //put all the responses in the db
        foreach (var r in res) 
        {
            db2.responses.add(r);
        }
        db2.SaveChanges();
    }
});


    public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body) {
        return Task.WhenAll(
            from partition in Partitioner.Create(source).GetPartitions(dop)
            select Task.Run(async delegate {
                using (partition)
                    while (partition.MoveNext()) {
                        await body(partition.Current).ContinueWith(t => {
                            if (t.Exception != null) {
                                string problem = t.Exception.ToString();
                            }
                            //observe exceptions
                        });
                    }
            }));
    }

基本上让我生成HTML同步,这很好,因为它只需要几秒钟来生成1000,但让我发布和保存到DB异步,与我预定义的线程一样多。在这种情况下,我将发布到Mandrill API,并行发布没有问题。