为什么一个额外的异步操作使我的代码比根本不发生操作时更快

本文关键字:操作 代码 我的 一个 异步操作 为什么 | 更新日期: 2023-09-27 18:08:13

我正在制作一款基于短信的游戏(增值服务),其中必须每天向每个订阅者发送一个问题。有超过50万的用户,因此性能是一个关键因素。由于每个订阅者可以是具有不同竞争状态的不同变量,因此在发送文本消息之前必须为每个订阅者分别查询数据库。为了达到最佳性能,我使用。net任务并行库(TPL)来生成并行线程池线程,并在每个线程中尽可能多地执行异步操作,以最终尽快发送文本。

在描述实际问题之前,有必要给出一些关于代码的更多信息。

最初在代码中没有异步操作。我刚刚用默认的任务调度器将大约500,000个任务安排到线程池中,每个任务将通过例程工作,阻塞所有EF(实体框架)查询并依次完成其工作。它很好,但不够快。然后我把所有EF查询都改为异步查询,结果速度非常快,但是SQL server中有很多死锁和超时,大约三分之一的订阅者从未收到过文本!在尝试了不同的解决方案后,我决定不做太多的异步数据库操作,而我有超过500,000个任务在24核服务器上运行(至少有24个并发线程池线程)!我回滚了所有的更改(异步的),除了每个任务中的一个web服务调用,它仍然是异步的。

现在是奇怪的情况:

在我的代码中,我有一个布尔变量名为"isCrossSellActive"。当设置变量时,将发生更多的DB操作,并且将发生线程等待的异步web服务调用。当这个变量为false时,这些操作都不会发生,包括异步web服务调用。令人尴尬的是,当设置变量时,代码的运行速度要比没有设置变量时快得多!似乎由于某种原因,等待的异步代码(协作线程)使代码更快。

代码如下:

public async Task AutoSendMessages(...)
    {
        //Get list of subscriptions plus some initialization

        LimitedConcurrencyLevelTaskScheduler lcts = new LimitedConcurrencyLevelTaskScheduler(numberOfThreads);
        TaskFactory taskFactory = new TaskFactory(lcts);
        List<Task> tasks = new List<Task>();
        //....
        foreach (var sub in subscriptions)
        {
            AutoSendData data = new AutoSendData
            {
                ServiceId = serviceId,
                MSISDN = sub.subscriber,
                IsCrossSellActive = bolCrossSellHeader
            };
            tasks.Add(await taskFactory.StartNew(async (x) =>
            {
                await SendQuestion(x);
            }, data));
        }
        GC.Collect();
        try
        {
            Task.WaitAll(tasks.ToArray());
        }
        catch (AggregateException ae)
        {
            ae.Handle((ex) =>
            {
                _logRepo.LogException(1, "", ex);
                return true;
            });
        }
        await _autoSendRepo.SetAutoSendingStatusEnd(statusId);
    }
public async Task SendQuestion(object data)
    {
        //extract variables from input parameter
        try
        {
            if (isCrossSellActive)
            {
                int pieceCount = subscriptionRepo.GetSubscriberCarPieces(curSubscription.service, curSubscription.subscriber).Count(c => c.isConfirmed);
                foreach (var rule in csRules)
                {
                    if (rule.Applies) 
                    {
                        if (await HttpClientHelper.GetJsonAsync<bool>(url, rule.TargetServiceBaseAddress))
                        {
                            int noOfAddedPieces = SomeCalculations();
                            if (noOfAddedPieces > 0)
                            {
                            crossSellRepo.SetPromissedPieces(curSubscription.subscriber, curSubscription.service,
                                    rule.TargetShortCode, noOfAddedPieces, 0, rule.ExpirationLimitDays);
                            }
                        }
                    }
                }
            }
// The rest of the code. (Some db CRUD)
await SmsClient.SendSoapMessage(subscriber, smsBody);
        }
catch (Exception ex){//...}
}

为什么一个额外的异步操作使我的代码比根本不发生操作时更快

好的,感谢@usr和他给我的线索,问题终于解决了!他的评论引起了我对等待中的taskFactory.StartNew(…)行的注意,该行依次将新任务添加到"tasks"列表中,然后由Task.WaitAll(tasks);

首先,我在taskFactory.StartNew()之前删除了await关键字,它导致代码出现可怕的故障状态!然后,我将await关键字返回到taskFactory.StartNew()之前,并使用断点调试代码,令人惊讶的是,在第一个线程到达"SendQuestion"例程中的第一个await之前,线程一个接一个地依次运行。当设置"isCrossSellActive"标志时,尽管线程应该执行的任务更多,但第一个await关键字会更早到达,从而使下一个计划任务能够运行。但是当它没有设置时,唯一的await关键字是例程的最后一行,所以它最有可能顺序运行到最后。

usr在for循环中删除await关键字的建议似乎是正确的,但问题是Task.WaitAll()行将在错误的Task<Task<void>>列表上等待,而不是Task<void>。最后我使用了Task。运行而不是TaskFactory。开始新的,一切都改变了。现在服务运行良好。for循环中的最后一段代码是:

tasks.Add(Task.Run(async () =>
            {
                await SendQuestion(data);
            }));

,问题就解决了。谢谢大家。

注:阅读这篇关于任务的文章。运行和为什么运行TaskFactory。StartNew是危险的:http://blog.stephencleary.com/2013/08/startnew-is-dangerous.html

除非您添加一些分析来告诉您哪些代码现在花费的时间更长,否则很难判断。

如果没有看到更多的数字,我最好的猜测是短信服务不喜欢你在短时间内发送太多请求而阻塞。当您添加额外的DB呼叫时,额外的延迟会使短信服务更好地工作。

其他一些小细节:

等待任务。WhenAll通常比Task.WaitAll好一点。WaitAll意味着线程将等待。使死锁更容易发生。

代替:

        tasks.Add(await taskFactory.StartNew(async (x) =>
        {
            await SendQuestion(x);
        }, data));

你应该能做

        tasks.Add(SendQuestion(data));