列表的并行处理
本文关键字:并行处理 列表 | 更新日期: 2023-09-27 18:26:04
我的场景:我需要处理一个元素列表。每个元素处理都非常耗时(1-10秒)而不是
List retval = new List();
foreach (item in myList)
retval.Add(ProcessItem(item));
return retval;
我想并行处理每个项目。
我知道.NET有很多并行处理方法:什么是最好的?(注意,我坚持使用3.5框架版本,不能使用.Net 4附带的Task、异步和所有nancy功能…)
这里我尝试使用代理人:
private void DoTest(int processingTaskDuration)
{
List<int> itemsToProcess = new List<int>();
for (int i = 1; i <= 20; i++)
itemsToProcess.Add(i);
TestClass tc = new TestClass(processingTaskDuration);
DateTime start = DateTime.Now;
List<int> result = tc.ProcessList(itemsToProcess);
TimeSpan elapsed = DateTime.Now - start;
System.Diagnostics.Debug.WriteLine(string.Format("elapsed (msec)= {0}", (int)elapsed.TotalMilliseconds));
}
public class TestClass
{
static int s_Counter = 0;
static object s_lockObject = new Object();
int m_TaskMsecDuration = 0;
public TestClass() :
this(5000)
{
}
public TestClass(int taskMsecDuration)
{
m_TaskMsecDuration = taskMsecDuration;
}
public int LongOperation(int itemToProcess)
{
int currentCounter = 0;
lock (s_lockObject)
{
s_Counter++;
currentCounter = s_Counter;
}
System.Diagnostics.Debug.WriteLine(string.Format("LongOperation'tStart't{0}'t{1}'t{2}", currentCounter, System.Threading.Thread.CurrentThread.ManagedThreadId, DateTime.Now.ToString("HH:mm:ss.ffffff")));
// time consuming task, e.g 5 seconds
Thread.Sleep(m_TaskMsecDuration);
int retval = itemToProcess * 2;
System.Diagnostics.Debug.WriteLine(string.Format("LongOperation'tEnd 't{0}'t{1}'t{2}", currentCounter, System.Threading.Thread.CurrentThread.ManagedThreadId, DateTime.Now.ToString("HH:mm:ss.ffffff")));
return retval;
}
delegate int LongOperationDelegate(int itemToProcess);
public List<int> ProcessList(List<int> itemsToProcess)
{
List<IAsyncResult> asyncResults = new List<IAsyncResult>();
LongOperationDelegate del = LongOperation;
foreach (int item in itemsToProcess)
{
IAsyncResult res = del.BeginInvoke(item, null, null);
asyncResults.Add(res);
}
// list of waitHandles to wait for
List<WaitHandle> waitHandles = new List<WaitHandle>();
asyncResults.ForEach(el => waitHandles.Add(el.AsyncWaitHandle));
// wait for processing every item
WaitHandle.WaitAll(waitHandles.ToArray());
// retrieve result of processing
List<int> retval = new List<int>();
asyncResults.ForEach(res =>
{
int singleProcessingResult = del.EndInvoke(res);
retval.Add(singleProcessingResult);
}
);
return retval;
}
}
这是一些输出(第3列是一个渐进计数器,使用它来匹配调用的开始和结束,第4列是线程ID,最后一列是时间戳)
LongOperation Start 1 6 15:11:18.331619
LongOperation Start 2 12 15:11:18.331619
LongOperation Start 3 13 15:11:19.363722
LongOperation Start 4 14 15:11:19.895775
LongOperation Start 5 15 15:11:20.406826
LongOperation Start 6 16 15:11:21.407926
LongOperation Start 7 17 15:11:22.410026
LongOperation End 1 6 15:11:23.360121
LongOperation End 2 12 15:11:23.361122
LongOperation Start 8 12 15:11:23.363122
LongOperation Start 9 6 15:11:23.365122
LongOperation Start 10 18 15:11:23.907176
LongOperation End 3 13 15:11:24.365222
LongOperation Start 11 13 15:11:24.366222
LongOperation End 4 14 15:11:24.897275
LongOperation Start 12 14 15:11:24.898275
LongOperation Start 13 19 15:11:25.407326
LongOperation End 5 15 15:11:25.408326
LongOperation Start 14 15 15:11:25.412327
LongOperation Start 15 20 15:11:26.407426
LongOperation End 6 16 15:11:26.410426
LongOperation Start 16 16 15:11:26.410426
LongOperation Start 17 21 15:11:27.408526
LongOperation End 7 17 15:11:27.411527
LongOperation Start 18 17 15:11:27.413527
LongOperation End 8 12 15:11:28.365622
LongOperation Start 19 12 15:11:28.366622
LongOperation End 9 6 15:11:28.366622
LongOperation Start 20 6 15:11:28.389624
LongOperation End 10 18 15:11:28.908676
LongOperation End 11 13 15:11:29.367722
LongOperation End 12 14 15:11:29.899775
LongOperation End 13 19 15:11:30.411827
LongOperation End 14 15 15:11:30.413827
LongOperation End 15 20 15:11:31.407926
LongOperation End 16 16 15:11:31.411927
LongOperation End 17 21 15:11:32.413027
LongOperation End 18 17 15:11:32.416027
LongOperation End 19 12 15:11:33.389124
LongOperation End 20 6 15:11:33.391124
elapsed (msec)= 15075
因此:
代表方法正确吗?
我实施得对吗?
如果是,为什么第三个操作在前两个操作后一秒开始(依此类推)?
我的意思是,我希望整个处理或多或少在一个处理的时间内完成,但系统似乎以一种奇怪的方式使用线程池。毕竟,我要问20个线程,它在前两个调用之后等待跨越第三个线程。
我认为Reactive Extensions的3.5后台端口附带了Parallel.ForEach()
的实现,您应该能够使用它。端口应该只包含让Rx在3.5上工作所需的,但这应该足够了。
其他人也尝试过实现它,基本上只是在ThreadPool
上排队工作项。
void Main()
{
var list = new List<int>{ 1,2,3 };
var processes = list.Count();
foreach (var item in list)
{
ThreadPool.QueueUserWorkItem(s => {
ProcessItem(item);
processes--;
});
}
while (processes > 0) { Thread.Sleep(10); }
}
static void ProcessItem(int item)
{
Thread.Sleep(100); // do work
}
我解决了第三个问题:
如果是,为什么第三次操作在前两次操作后一秒开始(等等)?
问题似乎出在ThreadPool管理线程生成的默认方式上:请参阅http://msdn.microsoft.com/en-us/library/0ka9477y%28v=VS.90%29.aspx.报价:
线程池有一个内置的延迟(在.NET中为半秒Framework版本2.0),然后再启动新的空闲线程。如果您应用程序会在短时间内周期性地启动许多任务空闲线程数量的增加会产生显著的吞吐量的增加。将空闲线程数设置得过高不必要地消耗系统资源。
如果调用ThreadPool.SetMinThreads的值合适,似乎会有很大帮助。在ProcessList的开头,我插入了对这个方法的调用:
private void SetUpThreadPool(int numThreadDesired)
{
int currentWorkerThreads;
int currentCompletionPortThreads;
ThreadPool.GetMinThreads(out currentWorkerThreads, out currentCompletionPortThreads);
//System.Diagnostics.Debug.WriteLine(string.Format("ThreadPool.GetMinThreads: workerThreads = {0}, completionPortThreads = {1}", workerThreads, completionPortThreads));
const int MAXIMUM_VALUE_FOR_SET_MIN_THREAD_PARAM = 20;
int numMinThreadToSet = Math.Min(numThreadDesired, MAXIMUM_VALUE_FOR_SET_MIN_THREAD_PARAM);
if (currentWorkerThreads < numMinThreadToSet)
ThreadPool.SetMinThreads(numThreadDesired, currentCompletionPortThreads);
}
public List<int> ProcessList(List<int> itemsToProcess)
{
SetUpThreadPool(documentNumberList.Count);
...
}
现在,所有线程(最多20个)都在同一时刻开始,没有延迟。我认为20是MAXIMUM_VALUE_for_SET_MIN_THRAD_PARAM的一个很好的折衷方案:不太高,符合我的特定要求
仍然想知道主要问题
- 代表方法正确吗
- 我实施得对吗
感谢大家的帮助。