平行的.为,获取下一个空闲线程/工作线程

本文关键字:线程 工作 下一个 获取 | 更新日期: 2023-09-27 18:05:22

我有以下并行代码。我不确定的是如何设置workerIndex变量:

// Initializing Worker takes time & must be done before the actual work
Worker[] w = new Worker[3]; // I would like to limit the parallelism to 3
for (int i = 0; i < 3; i++)
  w[i] = new Worker();
...
element[] elements = GetArrayOfElements(); // elements.Length > 3
ParallelOptions options = new ParallelOption();
options.MaxDegreeOfParallelism = 3;
Parallel.For(0, elements.Length, options, i =>
{
  element e = elements[i];
  w[workerIndex].Work(e); // how to set "workerIndex"?
});

是否有一些机制说下一个工作线程id是空闲的?

平行的.为,获取下一个空闲线程/工作线程

如果你只是在Parallel.For循环中new Worker(),并将它们添加到w(你需要将w更改为并发列表)。

可能(如果不是太复杂的话)将.Work(e)方法的内容移到循环体中,从而消除对Worker类的需要。

编辑:

如果你将Worker数组更改为IEnumerable(即List<Worker>),你可以使用.AsParallel()使其并行。然后,您可以使用. forall (worker -> worker. work())来并行执行工作。这需要您通过构造函数将element传递给worker。

看起来对象池模式最适合您。你可以这样写代码:

const int Limit = 3;
using (var pool = new QueueObjectPool<Worker>(a => new Worker(), Limit)) {
    element[] elements = GetArrayOfElements();
    var options = new ParallelOptions { MaxDegreeOfParallelism = Limit };
    Parallel.For(0, elements.Length, options, i => {
        element e = elements[i];
        Worker worker = null;
        try {
            worker = pool.Acquire();
            worker.Work(e);
        } finally {
            pool.Release(worker);
        }
    });
}

在start期间,每个元素将等待可用的worker,并且只有三个worker将从开始初始化。下面是简化的基于队列的对象池实现:

public sealed class QueueObjectPool<TObject> : IDisposable {
        private readonly Queue<TObject> _poolQueue;
        private readonly Func<QueueObjectPool<TObject>, TObject> _factory;
        private readonly int _capacity;
        private readonly SemaphoreSlim _throttler;
        public QueueObjectPool(Func<QueueObjectPool<TObject>, TObject> factory, int capacity) {
            _factory = factory;
            _capacity = capacity;
            _throttler = new SemaphoreSlim(initialCount: capacity, maxCount: capacity);
            _poolQueue = CreatePoolQueue();
        }
        public TObject Acquire() {
            _throttler.Wait();
            lock (_poolQueue) {
                return _poolQueue.Dequeue();
            }
        }
        public void Release(TObject poolObject) {
            lock (_poolQueue) {
                _poolQueue.Enqueue(poolObject);
            }
            _throttler.Release();
        }
        private Queue<TObject> CreatePoolQueue() {
            var queue = new Queue<TObject>(_capacity);
            int itemsLeft = _capacity;
            while (itemsLeft > 0) {
                TObject queueObject = _factory(this);
                queue.Enqueue(queueObject);
                itemsLeft -= 1;
            }
            return queue;
        }
        public void Dispose() {
            throw new NotImplementedException();
        }
    }

此代码用于演示目的。在实际工作中,最好使用基于async/await的逻辑,这很容易使用SemaphoreSlim.WaitAsync实现,并且可以替换Parallel。