如何正确地对要在C#中运行的任务进行排队
本文关键字:运行 任务 排队 正确地 | 更新日期: 2023-09-27 18:24:18
我有一个项的枚举(RunData.Demand
),每个项表示通过HTTP调用API的一些工作。如果我只需要foreach
完成所有工作,并在每次迭代期间调用API,效果会很好。然而,每次迭代都需要一两秒钟的时间,所以我想运行2-3个线程,并在它们之间分配工作。我在做什么:
ThreadPool.SetMaxThreads(2, 5); // Trying to limit the amount of threads
var tasks = RunData.Demand
.Select(service => Task.Run(async delegate
{
var availabilityResponse = await client.QueryAvailability(service);
// Do some other stuff, not really important
}));
await Task.WhenAll(tasks);
client.QueryAvailability
调用基本上使用HttpClient
类调用API:
public async Task<QueryAvailabilityResponse> QueryAvailability(QueryAvailabilityMultidayRequest request)
{
var response = await client.PostAsJsonAsync("api/queryavailabilitymultiday", request);
if (response.IsSuccessStatusCode)
{
return await response.Content.ReadAsAsync<QueryAvailabilityResponse>();
}
throw new HttpException((int) response.StatusCode, response.ReasonPhrase);
}
这在一段时间内效果很好,但最终事情开始超时。如果我将HttpClient超时设置为一个小时,那么我就会开始出现奇怪的内部服务器错误。
我开始做的是在QueryAvailability
方法中设置一个Stopwatch来查看发生了什么
正在发生的事情是RunData中的所有1200个项目。同时创建Demand,并调用所有1200个await client.PostAsJsonAsync
方法。然后它似乎使用这两个线程来慢慢检查任务,所以到最后,我的任务已经等待了9或10分钟。
以下是我想要的行为:
我想创建1200个任务,然后在线程可用时一次运行3-4个。我不想立即排队1200个HTTP调用。
有什么好方法可以做到这一点吗?
正如我一直建议的那样。。您需要的是TPL数据流(要安装:Install-Package System.Threading.Tasks.Dataflow
)。
您可以创建一个ActionBlock
,其中包含要对每个项目执行的操作。将MaxDegreeOfParallelism
设置为节流。开始张贴到它并等待它的完成:
var block = new ActionBlock<QueryAvailabilityMultidayRequest>(async service =>
{
var availabilityResponse = await client.QueryAvailability(service);
// ...
},
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
foreach (var service in RunData.Demand)
{
block.Post(service);
}
block.Complete();
await block.Completion;
老问题,但我想提出一个使用SemaphoreSlim类的替代轻量级解决方案。只需参考System.Threading.
SemaphoreSlim sem = new SemaphoreSlim(4,4);
foreach (var service in RunData.Demand)
{
await sem.WaitAsync();
Task t = Task.Run(async () =>
{
var availabilityResponse = await client.QueryAvailability(serviceCopy));
// do your other stuff here with the result of QueryAvailability
}
t.ContinueWith(sem.Release());
}
信号灯起到锁定机制的作用。您只能通过调用从计数中减去一的Wait(WaitAsync)来输入信号量。调用release会使计数增加一。
您使用的是异步HTTP调用,因此限制线程数量没有帮助(正如其中一个答案所示,Parallel.ForEach
中的ParallelOptions.MaxDegreeOfParallelism
也没有帮助)。即使是一个线程也可以启动所有请求,并在请求到达时处理结果。
解决这个问题的一种方法是使用TPL数据流。
另一个不错的解决方案是将源IEnumerable
划分为多个分区,并按顺序处理每个分区中的项目,如本文所述:
public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
{
return Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(dop)
select Task.Run(async delegate
{
using (partition)
while (partition.MoveNext())
await body(partition.Current);
}));
}
虽然Dataflow库很棒,但我认为如果不使用块组合,它会有点重。我倾向于使用下面的扩展方法。
此外,与Partitioner方法不同,它在调用上下文上运行异步方法-需要注意的是,如果您的代码不是真正的异步,或者采用"快速路径",那么它将有效地同步运行,因为没有显式创建线程。
public static async Task RunParallelAsync<T>(this IEnumerable<T> items, Func<T, Task> asyncAction, int maxParallel)
{
var tasks = new List<Task>();
foreach (var item in items)
{
tasks.Add(asyncAction(item));
if (tasks.Count < maxParallel)
continue;
var notCompleted = tasks.Where(t => !t.IsCompleted).ToList();
if (notCompleted.Count >= maxParallel)
await Task.WhenAny(notCompleted);
}
await Task.WhenAll(tasks);
}