列表的并行处理

本文关键字:并行处理 列表 | 更新日期: 2023-09-27 18:26:04

我的场景:我需要处理一个元素列表。每个元素处理都非常耗时(1-10秒)而不是

List retval = new List();
foreach (item in myList)
    retval.Add(ProcessItem(item));
return retval;

我想并行处理每个项目。

我知道.NET有很多并行处理方法:什么是最好的?(注意,我坚持使用3.5框架版本,不能使用.Net 4附带的Task、异步和所有nancy功能…)

这里我尝试使用代理人:

private void DoTest(int processingTaskDuration)
{
    List<int> itemsToProcess = new List<int>();
    for (int i = 1; i <= 20; i++)
        itemsToProcess.Add(i);
    TestClass tc = new TestClass(processingTaskDuration);
    DateTime start = DateTime.Now;
    List<int> result = tc.ProcessList(itemsToProcess);
    TimeSpan elapsed = DateTime.Now - start;
    System.Diagnostics.Debug.WriteLine(string.Format("elapsed (msec)= {0}", (int)elapsed.TotalMilliseconds));
}
public class TestClass
{
    static int s_Counter = 0;
    static object s_lockObject = new Object();
    int m_TaskMsecDuration = 0;
    public TestClass() :
        this(5000)
    {
    }
    public TestClass(int taskMsecDuration)
    {
        m_TaskMsecDuration = taskMsecDuration;
    }

    public int LongOperation(int itemToProcess)
    {
        int currentCounter = 0;
        lock (s_lockObject)
        {
            s_Counter++;
            currentCounter = s_Counter;
        }
        System.Diagnostics.Debug.WriteLine(string.Format("LongOperation'tStart't{0}'t{1}'t{2}", currentCounter, System.Threading.Thread.CurrentThread.ManagedThreadId, DateTime.Now.ToString("HH:mm:ss.ffffff")));
        // time consuming task, e.g 5 seconds
        Thread.Sleep(m_TaskMsecDuration);
        int retval = itemToProcess * 2;
        System.Diagnostics.Debug.WriteLine(string.Format("LongOperation'tEnd  't{0}'t{1}'t{2}", currentCounter, System.Threading.Thread.CurrentThread.ManagedThreadId, DateTime.Now.ToString("HH:mm:ss.ffffff")));
        return retval;
    }
    delegate int LongOperationDelegate(int itemToProcess);
    public List<int> ProcessList(List<int> itemsToProcess)
    {
        List<IAsyncResult> asyncResults = new List<IAsyncResult>();
        LongOperationDelegate del = LongOperation;
        foreach (int item in itemsToProcess)
        {
            IAsyncResult res = del.BeginInvoke(item, null, null);
            asyncResults.Add(res);
        }
        // list of waitHandles to wait for
        List<WaitHandle> waitHandles = new List<WaitHandle>();
        asyncResults.ForEach(el => waitHandles.Add(el.AsyncWaitHandle));

        // wait for processing every item
        WaitHandle.WaitAll(waitHandles.ToArray());

        // retrieve result of processing
        List<int> retval = new List<int>();
        asyncResults.ForEach(res =>
        {
            int singleProcessingResult = del.EndInvoke(res);
            retval.Add(singleProcessingResult);
        }
        );
        return retval;
    }
}

这是一些输出(第3列是一个渐进计数器,使用它来匹配调用的开始和结束,第4列是线程ID,最后一列是时间戳)

LongOperation   Start   1   6   15:11:18.331619
LongOperation   Start   2   12  15:11:18.331619
LongOperation   Start   3   13  15:11:19.363722
LongOperation   Start   4   14  15:11:19.895775
LongOperation   Start   5   15  15:11:20.406826
LongOperation   Start   6   16  15:11:21.407926
LongOperation   Start   7   17  15:11:22.410026
LongOperation   End     1   6   15:11:23.360121
LongOperation   End     2   12  15:11:23.361122
LongOperation   Start   8   12  15:11:23.363122
LongOperation   Start   9   6   15:11:23.365122
LongOperation   Start   10  18  15:11:23.907176
LongOperation   End     3   13  15:11:24.365222
LongOperation   Start   11  13  15:11:24.366222
LongOperation   End     4   14  15:11:24.897275
LongOperation   Start   12  14  15:11:24.898275
LongOperation   Start   13  19  15:11:25.407326
LongOperation   End     5   15  15:11:25.408326
LongOperation   Start   14  15  15:11:25.412327
LongOperation   Start   15  20  15:11:26.407426
LongOperation   End     6   16  15:11:26.410426
LongOperation   Start   16  16  15:11:26.410426
LongOperation   Start   17  21  15:11:27.408526
LongOperation   End     7   17  15:11:27.411527
LongOperation   Start   18  17  15:11:27.413527
LongOperation   End     8   12  15:11:28.365622
LongOperation   Start   19  12  15:11:28.366622
LongOperation   End     9   6   15:11:28.366622
LongOperation   Start   20  6   15:11:28.389624
LongOperation   End     10  18  15:11:28.908676
LongOperation   End     11  13  15:11:29.367722
LongOperation   End     12  14  15:11:29.899775
LongOperation   End     13  19  15:11:30.411827
LongOperation   End     14  15  15:11:30.413827
LongOperation   End     15  20  15:11:31.407926
LongOperation   End     16  16  15:11:31.411927
LongOperation   End     17  21  15:11:32.413027
LongOperation   End     18  17  15:11:32.416027
LongOperation   End     19  12  15:11:33.389124
LongOperation   End     20  6   15:11:33.391124
elapsed (msec)= 15075

因此:

代表方法正确吗?

我实施得对吗?

如果是,为什么第三个操作在前两个操作后一秒开始(依此类推)?

我的意思是,我希望整个处理或多或少在一个处理的时间内完成,但系统似乎以一种奇怪的方式使用线程池。毕竟,我要问20个线程,它在前两个调用之后等待跨越第三个线程。

列表的并行处理

我认为Reactive Extensions的3.5后台端口附带了Parallel.ForEach()的实现,您应该能够使用它。端口应该只包含让Rx在3.5上工作所需的,但这应该足够了。

其他人也尝试过实现它,基本上只是在ThreadPool上排队工作项。

void Main()
{
    var list = new List<int>{ 1,2,3 };
    var processes = list.Count();
    foreach (var item in list)
    {
        ThreadPool.QueueUserWorkItem(s => {
            ProcessItem(item);      
            processes--;
        });
    }
    while (processes > 0) { Thread.Sleep(10); }
}
static void ProcessItem(int item)
{
    Thread.Sleep(100); // do work
}

我解决了第三个问题:

如果是,为什么第三次操作在前两次操作后一秒开始(等等)?

问题似乎出在ThreadPool管理线程生成的默认方式上:请参阅http://msdn.microsoft.com/en-us/library/0ka9477y%28v=VS.90%29.aspx.报价:

线程池有一个内置的延迟(在.NET中为半秒Framework版本2.0),然后再启动新的空闲线程。如果您应用程序会在短时间内周期性地启动许多任务空闲线程数量的增加会产生显著的吞吐量的增加。将空闲线程数设置得过高不必要地消耗系统资源。

如果调用ThreadPool.SetMinThreads的值合适,似乎会有很大帮助。在ProcessList的开头,我插入了对这个方法的调用:

private void SetUpThreadPool(int numThreadDesired)
{
    int currentWorkerThreads;
    int currentCompletionPortThreads;
    ThreadPool.GetMinThreads(out currentWorkerThreads, out currentCompletionPortThreads);
    //System.Diagnostics.Debug.WriteLine(string.Format("ThreadPool.GetMinThreads: workerThreads = {0}, completionPortThreads = {1}", workerThreads, completionPortThreads));
    const int MAXIMUM_VALUE_FOR_SET_MIN_THREAD_PARAM = 20;
    int numMinThreadToSet = Math.Min(numThreadDesired, MAXIMUM_VALUE_FOR_SET_MIN_THREAD_PARAM);
    if (currentWorkerThreads < numMinThreadToSet)
        ThreadPool.SetMinThreads(numThreadDesired, currentCompletionPortThreads);
}
public List<int> ProcessList(List<int> itemsToProcess)
{
    SetUpThreadPool(documentNumberList.Count);
    ...
}

现在,所有线程(最多20个)都在同一时刻开始,没有延迟。我认为20是MAXIMUM_VALUE_for_SET_MIN_THRAD_PARAM的一个很好的折衷方案:不太高,符合我的特定要求

仍然想知道主要问题

  1. 代表方法正确吗
  2. 我实施得对吗

感谢大家的帮助。