Parallel.For循环-为每个线程分配一个唯一的数据实体

本文关键字:一个 唯一 实体 数据 分配 循环 For 线程 Parallel | 更新日期: 2023-09-27 18:29:11

我有100条记录用于并行化,从1到100,现在我可以方便地使用并行

 Parallel.For(0, limit, i =>
    {
        DoWork(i);
    });

但也有一些限制,每个线程都需要使用相同的Data实体,而且Data实体的数量有限,比如10个,它们是通过相互克隆并保存在Dictionary或List等结构中预先创建的。现在我可以使用以下代码限制并行化的数量:

 Parallel.For(0, limit, new ParallelOptions { MaxDegreeOfParallelism = 10 }, i =>
    {
        DoWork(i);
    });

但问题是如何为每个传入线程分配一个唯一的数据实体,这样在执行中的任何其他当前线程都不会使用数据实体,因为线程的数量和数据实体是相同的,所以饥饿不是问题。我可以想出一种方法,为每个数据实体创建一个布尔值,指定它是否在使用中,因此我们遍历字典或列表,找到下一个可用的数据实体,并锁定整个分配过程,这样在给定的时间给一个线程分配一个数据实体,但在我看来,这个问题将有更优雅的解决方案,我的版本只是一个变通方法,不是真正的解决办法。我的逻辑是:

Parallel.For(0, limit, new ParallelOptions { MaxDegreeOfParallelism = 10 }, i =>
        {
            lock(All_Threads_Common_Object)
            {
              Check for available data entity using boolean
              Assign the Data entity
            }
            DoWork(i);
            Reset the Boolean value for another thread to use it
        });

如果问题需要进一步澄清,请告诉我。

Parallel.For循环-为每个线程分配一个唯一的数据实体

使用接受线程本地初始化函数的Parallel.For的重载。

Parallel.For<DataEntity>(0, limit, 
    //will run once for each thread
    () => GetThreadLocalDataEntity(),
    //main loop body, will run once per iteration
    (i, loop, threadDataEntity) =>
    {
        DoWork(i, threadDataEntity);
        return threadDataEntity; //we must return it here to adhere to the Func signature.
    },
    //will run once for each thread after the loop
    (threadDataEntity) => threadDataEntity.Dispose() //if necessary
);

与您在问题中发布的方法相比,该方法的主要优点是,DataEntity的分配在每个线程中发生一次,而不是在每个循环迭代中发生一个。

您可以使用并发集合来存储您的10个对象。每个Worker将提取一个数据实体,使用它,然后将其返回。并发集合的使用很重要,因为在您的场景中,正常的集合不是线程安全的。

像这样:

var queue = new ConcurrentQueue<DataEntity>();
// fill the queue with 10 items
Parallel.For(0, limit, new ParallelOptions { MaxDegreeOfParallelism = 10 }, i =>
    {
        DataEntity x;
        if(!queue.TryDequeue(out x))
            throw new InvalidOperationException();
        DoWork(i, x);
        queue.Enqueue(x);
    });

或者,如果需要提供阻塞,则将其封装在BlockingCollection中。

编辑:不要将其包裹在循环中以保持等待。相反,像这样使用BlockingCollection:

var entities = new BlockingCollection(new ConcurrentQueue<DataEntity>());
// fill the collection with 10 items
Parallel.For(0, limit, new ParallelOptions { MaxDegreeOfParallelism = 10 }, i =>
    {
        DataEntity x = entities.Take();
        DoWork(i, x);
        entities.Add(x);
    });