Linq调用任务.运行访问错误的成员,延迟执行问题

本文关键字:成员 延迟 执行 问题 错误 调用 任务 运行 访问 Linq | 更新日期: 2023-09-27 18:18:05

我有一个奇怪的问题与linq查询使用错误的值。我的代码看起来像这样

await Task.WhenAll((from item in itemsToProcess
    let taskCount = count++
    select Task.Run(() => { process(item).Result; }))
    .AsParallel().ToArray());

基本上,我有一个50k项的列表,这些项目被调用到一个方法中,该方法进行web调用。它们是完全不相关的,可以以任何顺序运行,并且不访问任何共享的内容。但是,偶尔,非常随机地,它似乎将错误的项传递给进程方法,就像您在foreach循环中遇到的那样,如果您没有将其复制到局部变量中。

如果我把代码改成

await Task.WhenAll((from item in itemsToProcess
    let taskCount = count++
    let itemCopy = item
    select Task.Run(() => { process(itemCopy).Result; }))
    .AsParallel().ToArray());

那么我似乎没有这个问题。所以我的问题是,是我遗漏了什么,还是这是预期行为?我以为linq的from子句应该复制到本地副本,但事实并非如此吗?我很难找到任何能直接解决这个问题的方法。但是我看到很多在linq表达式中调用异步方法而不做额外的let的例子。

我也试过让lambda异步并等待方法,但后来我遇到了没有线程的情况。也许有更好的方法?我很乐意知道这件事。简而言之,我所做的就是迭代一个列表并并行调用一个方法,因为它是I/O限制而不是cpu限制。另一种可能性是,已经有很多关于这个的帖子了,我只是在寻找错误的术语。如果是的话,我也很乐意知道。

Linq调用任务.运行访问错误的成员,延迟执行问题

并行代码和异步代码很少一起使用。理想情况下,并行只适用于cpu受限的代码。

你为什么不能这样做呢?

await Task.WhenAll(itemsToProcess.Select(item => process(item)));

基于注释编辑:

使用SemaphoreSlim:

可以很容易地实现异步节流。
static SemaphoreSlim throttle = new SemaphoreSlim(50);
static async Task ProcessAsync(Item item)
{
  await throttle.WaitAsync();
  try
  {
    ... // Original process(item) code
  }
  finally
  {
    throttle.Release();
  }
}

这将限制项处理到50。这只是我从空气中取出的一个数字;您应该稍微试验一下以找到一个合适的值。

注意,一旦工作变为异步,并行处理节流就停止工作。异步工作不"占用"线程,所以它不计入并行处理节流(或线程池注入速率节流)。