有效地使用带有超时的数千个任务

本文关键字:任务 千个 超时 有效地 | 更新日期: 2023-09-27 18:10:01

我正在实现一个库L,它通过套接字与另一个应用程序a通信。

基本流程如下:

  1. L接a
  2. L向A发送约50,000条信息I,并且为每个发出的I创建一个任务T。
  3. L监听来自A的传入结果,一旦有了结果,就使用ATaskCompletionSource设置任务T
  4. 的结果
  5. L创建一个任务T2并设置超时时间(Task. whenany (T,Task. delay (xx))
  6. L使用Task.WhenAll(T2)等待所有发送信息的超时或结果。

管理底层数据结构根本不是问题。主要的问题是,在我的计算机上,用大约50,000个条目(创建50,000 *2+1任务)组装"主要"Task.WhenAll(T2)大约需要5-6秒。

然而,我想不出一种更轻量级的方法来完成同样的任务。它应该使用所有可用的内核,并且是非阻塞的,并且支持超时。

是否有一种方法来完成相同的使用并行或线程池类提高性能?

编辑:显示基本设置的代码是:https://dotnetfiddle.net/gIq2DP

有效地使用带有超时的数千个任务

启动总数n LongRunningTasks,其中n是机器上的内核数。每个任务应该在一个核心上运行。为每个想要发送的I创建50K个新任务是一种浪费。相反,将任务设计为接受I和套接字信息——这些信息将发送到

创建BlockingCollection<Tuple<I, SocketInfo>>。启动一个任务来填充这个阻塞集合。您之前创建的其他n个长时间运行的任务可以继续获取信息元组和发送信息的地址,然后在一个循环中为您执行任务,该循环将在阻塞收集完成时中断。

超时可以在长时间运行的任务本身中设置。

整个设置将使您的CPU繁忙到最大的有用的工作,而不是让它不必要地忙于50K任务的创建

由于在主存之外发生的操作(如网络操作)对于CPU来说非常非常慢,因此可以随意设置n,不仅等于机器中的核心数,甚至是该值的三倍。在我的代码演示中,我将其设置为仅等于核心数。

对于提供的链接中的代码,这是一种方法…

using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Linq.Expressions;
using System.Net.NetworkInformation;
using System.Threading.Tasks;
namespace TestConsoleApplication
{
    public static class Test
    {
        public static void Main()
        {
            TaskRunningTest();
        }
        private static void TaskRunningTest()
        {
            var s = new Stopwatch();
            const int totalInformationChunks = 50000;
            var baseProcessorTaskArray = new Task[Environment.ProcessorCount];
            var taskFactory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None);
            var tcs = new TaskCompletionSource<int>();
            var itemsToProcess = new BlockingCollection<Tuple<Information, Address>>(totalInformationChunks);
            s.Start();
            //Start a new task to populate the "itemsToProcess"
            taskFactory.StartNew(() =>
            {
                // Add Tuples of Information and Address to which this information is to be sent to.
                Console.WriteLine("Done intializing all the jobs...");
                // Finally signal that you are done by saying..
                itemsToProcess.CompleteAdding();
            });
            //Initializing the base tasks
            for (var index = 0; index < baseProcessorTaskArray.Length; index++)
            {
                var thisIndex = index;
                baseProcessorTaskArray[index] = taskFactory.StartNew(() =>
                {
                    while (!itemsToProcess.IsAddingCompleted && itemsToProcess.Count != 0)
                    {
                        Tuple<Information, Address> item;
                        itemsToProcess.TryTake(out item);
                        //Process the item
                        tcs.TrySetResult(thisIndex);
                    }
                });
            }
            // Need to provide new timeout logic now
            // Depending upon what you are trying to achieve with timeout, you can devise out the way
            // Wait for the base tasks to completely empty OR
            // timeout and then stop the stopwatch.
            Task.WaitAll(baseProcessorTaskArray); 
            s.Stop();
            Console.WriteLine(s.ElapsedMilliseconds);
        }
        private class Address
        {
            //This class should have the socket information
        }
        private class Information
        {
            //This class will have the Information to send
        }
    }
}

分析显示,大部分时间(90%?)都花在定时器的设置、过期和处置上。这对我来说似乎是合理的。

也许你可以创建自己的超廉价超时机制。将超时排队到按过期时间排序的优先级队列中。然后,每100毫秒运行一个计时器,并使该计时器过期优先级队列中到期的所有内容。

这样做的成本将是每个超时一个TaskCompletionSource和一些小的进一步处理。

您甚至可以通过从队列中删除超时并仅删除TaskCompletionSource来取消超时。