有效地使用带有超时的数千个任务
本文关键字:任务 千个 超时 有效地 | 更新日期: 2023-09-27 18:10:01
我正在实现一个库L,它通过套接字与另一个应用程序a通信。
基本流程如下:
- L接a
- L向A发送约50,000条信息I,并且为每个发出的I创建一个任务T。
- L监听来自A的传入结果,一旦有了结果,就使用ATaskCompletionSource设置任务T 的结果
- L创建一个任务T2并设置超时时间(Task. whenany (T,Task. delay (xx))
- L使用Task.WhenAll(T2)等待所有发送信息的超时或结果。
管理底层数据结构根本不是问题。主要的问题是,在我的计算机上,用大约50,000个条目(创建50,000 *2+1任务)组装"主要"Task.WhenAll(T2)大约需要5-6秒。
然而,我想不出一种更轻量级的方法来完成同样的任务。它应该使用所有可用的内核,并且是非阻塞的,并且支持超时。是否有一种方法来完成相同的使用并行或线程池类提高性能?
编辑:显示基本设置的代码是:https://dotnetfiddle.net/gIq2DP
启动总数n LongRunningTasks
,其中n是机器上的内核数。每个任务应该在一个核心上运行。为每个想要发送的I创建50K个新任务是一种浪费。相反,将任务设计为接受I和套接字信息——这些信息将发送到。
创建BlockingCollection<Tuple<I, SocketInfo>>
。启动一个任务来填充这个阻塞集合。您之前创建的其他n个长时间运行的任务可以继续获取信息元组和发送信息的地址,然后在一个循环中为您执行任务,该循环将在阻塞收集完成时中断。
超时可以在长时间运行的任务本身中设置。
整个设置将使您的CPU繁忙到最大的有用的工作,而不是让它不必要地忙于50K任务的创建。
由于在主存之外发生的操作(如网络操作)对于CPU来说非常非常慢,因此可以随意设置n,不仅等于机器中的核心数,甚至是该值的三倍。在我的代码演示中,我将其设置为仅等于核心数。
对于提供的链接中的代码,这是一种方法…
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Linq.Expressions;
using System.Net.NetworkInformation;
using System.Threading.Tasks;
namespace TestConsoleApplication
{
public static class Test
{
public static void Main()
{
TaskRunningTest();
}
private static void TaskRunningTest()
{
var s = new Stopwatch();
const int totalInformationChunks = 50000;
var baseProcessorTaskArray = new Task[Environment.ProcessorCount];
var taskFactory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None);
var tcs = new TaskCompletionSource<int>();
var itemsToProcess = new BlockingCollection<Tuple<Information, Address>>(totalInformationChunks);
s.Start();
//Start a new task to populate the "itemsToProcess"
taskFactory.StartNew(() =>
{
// Add Tuples of Information and Address to which this information is to be sent to.
Console.WriteLine("Done intializing all the jobs...");
// Finally signal that you are done by saying..
itemsToProcess.CompleteAdding();
});
//Initializing the base tasks
for (var index = 0; index < baseProcessorTaskArray.Length; index++)
{
var thisIndex = index;
baseProcessorTaskArray[index] = taskFactory.StartNew(() =>
{
while (!itemsToProcess.IsAddingCompleted && itemsToProcess.Count != 0)
{
Tuple<Information, Address> item;
itemsToProcess.TryTake(out item);
//Process the item
tcs.TrySetResult(thisIndex);
}
});
}
// Need to provide new timeout logic now
// Depending upon what you are trying to achieve with timeout, you can devise out the way
// Wait for the base tasks to completely empty OR
// timeout and then stop the stopwatch.
Task.WaitAll(baseProcessorTaskArray);
s.Stop();
Console.WriteLine(s.ElapsedMilliseconds);
}
private class Address
{
//This class should have the socket information
}
private class Information
{
//This class will have the Information to send
}
}
}
分析显示,大部分时间(90%?)都花在定时器的设置、过期和处置上。这对我来说似乎是合理的。
也许你可以创建自己的超廉价超时机制。将超时排队到按过期时间排序的优先级队列中。然后,每100毫秒运行一个计时器,并使该计时器过期优先级队列中到期的所有内容。
这样做的成本将是每个超时一个TaskCompletionSource
和一些小的进一步处理。
您甚至可以通过从队列中删除超时并仅删除TaskCompletionSource
来取消超时。