多线程不处理所有任务

本文关键字:任务 处理 多线程 | 更新日期: 2023-09-27 17:56:47

我使用以下代码来使用线程执行任务,在这里我尝试执行"dtTable"数据表中的所有记录。 线程数有限为 2(即当时只有两个线程/仅允许执行)。问题是它没有执行Datatable中所有可用的记录,它以不规则的方式执行数据。可能有什么问题..?提前谢谢。

 public class Generator : IDisposable
 {
    public static int maxThreadCount = 2;
    public static int runningThreadCount = 0;
    public RunTask()
    {
       for (int ro = 0; ro <= dtTable.Rows.Count - 1; ro++)
       {
          if (maxThreadCount > runningThreadCount)
          {
            Thread atpthread = new Thread(delegate()
            {
            DoOperationMethod(dtTable.Rows[ro], Task, startDate, EndDate, dtTemplate);
            });
            atpthread.Start();
            runningThreadCount = runningThreadCount + 1;
            Mainthreads.Add(atpthread);
          }
          else
          {
            ro--;
          }
       }
  }
  public void DoOperationMethod(DataRow drAttachpoint, System.StrTaskItem Task, DateTime startDate, DateTime EndDate, DataTable dtTemplate)
    {
     //doing my Operation
     runningThreadCount = runningThreadCount-1; //Once Task done count will get reduce
    }

}

我正在使用.net 3.5(仅供参考)。

多线程不处理所有任务

问题是您在等待线程空闲时不断迭代数据表中的行。无论如何,这种多线程,即使它实际上是正确的(事实并非如此),也是非常低效的 - 大部分时间可能都花在启动新线程上。

请尝试以下操作:

Parallel
 .ForEach(dtTable.Rows.OfType<DataRow>(), row => DoOperationMethod(...))
 .WithDegreeOfParallelism(2);

编辑:

要弄清楚问题是如何产生的,您必须了解变量是如何在匿名方法中捕获的。您的DoOperationMethod调用未传递所需的数据行,因为ro"变量"不是复制的,而是引用的。因此,当循环中的ro更改时,您创建的线程也会更改。

除此之外,您的代码线程非常不安全且效率低下:

  • 你实际上是在浪费三个线程在做工作 - 没有阻塞,你的循环只是不断加减ro,这几乎完全是纯粹的 CPU 工作。这比在等待结果时简单地阻止要浪费得多。
  • 您不能只从多个线程读取和写入静态字段,并期望事情正常工作。实际上,您的代码很容易并行启动比您希望的更多的线程 - 甚至使整个事情陷入僵局,最终runningThread 2,而没有线程正在运行。
  • 您不断启动新线程来执行看似微不足道的操作 - 我猜您的大部分工作要么是 I/O 绑定的,要么是一遍又一遍地创建新线程的成本。
  • 我假设Mainthreads是某种列表,我假设您也从DoOperationMethod方法修改它 - 同样,这可能会导致随机异常和意外结果。
  • 从理论上讲,由于各种优化和缓存,甚至可能永远不会评估检查maxThreadCount > runningThreadCount。实际上,在 x86 CPU 上的当前 .NET 上,对于像这样复杂的方法来说,这并不完全可能,但是当您更新到 .NET 7.0 或任何:)时,它会咬你

多线程很难。你真的不想猜测你的方式。至少,首先尝试了解基础知识 - http://www.albahari.com/threading/。

在我看来

,您的代码的最大问题是,即使您修复了线程不安全使用runningThreadCount,您的代码在等待某个线程完成时也会"旋转"。这完全占用了CPU内核,而您正在尝试完成实际工作。

Luann 提出的解决方案很好,尽管我会使用 Cast<DataRow>() 而不是 OfType<DataRow>()(因为枚举中的所有元素实际上都应该是 DataRow 类型)。除了简洁之外,一大优点是它使用线程池,这将显着减少线程管理的开销(因为它重用线程而不是一遍又一遍地创建和销毁它们)。

如果您更喜欢更明确的方法,可以修改您发布的代码以使用信号量:

SemaphoreSlim semaphore = new SemaphoreSlim(2);
for (int ro = 0; ro <= dtTable.Rows.Count - 1; ro++)
{
    semaphore.Wait();
    DataRow row = dtTable.Rows[ro];
    Thread atpthread = new Thread(delegate()
    {
        DoOperationMethod(row, Task, startDate, EndDate, dtTemplate);
        semaphore.Release();
    });
    atpthread.Start();
    Mainthreads.Add(atpthread);
}

这将导致主线程在 semamphore 计数达到 0 时阻塞Wait()调用,并在计数再次为正时继续(即在线程调用 Release() 之后)。

我注意到评论者Enigmativity关于dtTable是否安全使用的观点。我在这里假设对象在此处理过程中未被修改。有了这个假设,不同步使用它应该没问题。但是,如果这个假设是错误的,那么我同意他们的观点,这是代码中的另一个错误。

最后,我要指出的是,您看到跳过行的原因是您在匿名方法中使用了变量ro。在匿名方法为给定的循环迭代执行之前,该变量可以很容易地递增,从而导致该线程处理错误的行。某些行可能会被多次处理,而其他行会被跳过。我已经通过在循环块内的变量中检索DataRow对象来解决上面的代码示例中的问题,以便每个线程都获得自己的变量私有副本,其中包含它应该处理的DataRow对象。