多线程不处理所有任务
本文关键字:任务 处理 多线程 | 更新日期: 2023-09-27 17:56:47
我使用以下代码来使用线程执行任务,在这里我尝试执行"dtTable"数据表中的所有记录。 线程数有限为 2(即当时只有两个线程/仅允许执行)。问题是它没有执行Datatable中所有可用的记录,它以不规则的方式执行数据。可能有什么问题..?提前谢谢。
public class Generator : IDisposable
{
public static int maxThreadCount = 2;
public static int runningThreadCount = 0;
public RunTask()
{
for (int ro = 0; ro <= dtTable.Rows.Count - 1; ro++)
{
if (maxThreadCount > runningThreadCount)
{
Thread atpthread = new Thread(delegate()
{
DoOperationMethod(dtTable.Rows[ro], Task, startDate, EndDate, dtTemplate);
});
atpthread.Start();
runningThreadCount = runningThreadCount + 1;
Mainthreads.Add(atpthread);
}
else
{
ro--;
}
}
}
public void DoOperationMethod(DataRow drAttachpoint, System.StrTaskItem Task, DateTime startDate, DateTime EndDate, DataTable dtTemplate)
{
//doing my Operation
runningThreadCount = runningThreadCount-1; //Once Task done count will get reduce
}
}
我正在使用.net 3.5(仅供参考)。
问题是您在等待线程空闲时不断迭代数据表中的行。无论如何,这种多线程,即使它实际上是正确的(事实并非如此),也是非常低效的 - 大部分时间可能都花在启动新线程上。
请尝试以下操作:
Parallel
.ForEach(dtTable.Rows.OfType<DataRow>(), row => DoOperationMethod(...))
.WithDegreeOfParallelism(2);
编辑:
要弄清楚问题是如何产生的,您必须了解变量是如何在匿名方法中捕获的。您的DoOperationMethod
调用未传递所需的数据行,因为ro
"变量"不是复制的,而是引用的。因此,当循环中的ro
更改时,您创建的线程也会更改。
除此之外,您的代码线程非常不安全且效率低下:
- 你实际上是在浪费三个线程在做工作 - 没有阻塞,你的循环只是不断加减
ro
,这几乎完全是纯粹的 CPU 工作。这比在等待结果时简单地阻止要浪费得多。 - 您不能只从多个线程读取和写入静态字段,并期望事情正常工作。实际上,您的代码很容易并行启动比您希望的更多的线程 - 甚至使整个事情陷入僵局,最终
runningThread
2
,而没有线程正在运行。 - 您不断启动新线程来执行看似微不足道的操作 - 我猜您的大部分工作要么是 I/O 绑定的,要么是一遍又一遍地创建新线程的成本。
- 我假设
Mainthreads
是某种列表,我假设您也从DoOperationMethod
方法修改它 - 同样,这可能会导致随机异常和意外结果。 - 从理论上讲,由于各种优化和缓存,甚至可能永远不会评估检查
maxThreadCount > runningThreadCount
。实际上,在 x86 CPU 上的当前 .NET 上,对于像这样复杂的方法来说,这并不完全可能,但是当您更新到 .NET 7.0 或任何:)时,它会咬你
多线程很难。你真的不想猜测你的方式。至少,首先尝试了解基础知识 - http://www.albahari.com/threading/。
,您的代码的最大问题是,即使您修复了线程不安全使用runningThreadCount
,您的代码在等待某个线程完成时也会"旋转"。这完全占用了CPU内核,而您正在尝试完成实际工作。
Luann 提出的解决方案很好,尽管我会使用 Cast<DataRow>()
而不是 OfType<DataRow>()
(因为枚举中的所有元素实际上都应该是 DataRow
类型)。除了简洁之外,一大优点是它使用线程池,这将显着减少线程管理的开销(因为它重用线程而不是一遍又一遍地创建和销毁它们)。
如果您更喜欢更明确的方法,可以修改您发布的代码以使用信号量:
SemaphoreSlim semaphore = new SemaphoreSlim(2);
for (int ro = 0; ro <= dtTable.Rows.Count - 1; ro++)
{
semaphore.Wait();
DataRow row = dtTable.Rows[ro];
Thread atpthread = new Thread(delegate()
{
DoOperationMethod(row, Task, startDate, EndDate, dtTemplate);
semaphore.Release();
});
atpthread.Start();
Mainthreads.Add(atpthread);
}
这将导致主线程在 semamphore 计数达到 0 时阻塞Wait()
调用,并在计数再次为正时继续(即在线程调用 Release()
之后)。
我注意到评论者Enigmativity关于dtTable
是否安全使用的观点。我在这里假设对象在此处理过程中未被修改。有了这个假设,不同步使用它应该没问题。但是,如果这个假设是错误的,那么我同意他们的观点,这是代码中的另一个错误。
最后,我要指出的是,您看到跳过行的原因是您在匿名方法中使用了变量ro
。在匿名方法为给定的循环迭代执行之前,该变量可以很容易地递增,从而导致该线程处理错误的行。某些行可能会被多次处理,而其他行会被跳过。我已经通过在循环块内的变量中检索DataRow
对象来解决上面的代码示例中的问题,以便每个线程都获得自己的变量私有副本,其中包含它应该处理的DataRow
对象。