平行的.为,获取下一个空闲线程/工作线程
本文关键字:线程 工作 下一个 获取 | 更新日期: 2023-09-27 18:05:22
我有以下并行代码。我不确定的是如何设置workerIndex变量:
// Initializing Worker takes time & must be done before the actual work
Worker[] w = new Worker[3]; // I would like to limit the parallelism to 3
for (int i = 0; i < 3; i++)
w[i] = new Worker();
...
element[] elements = GetArrayOfElements(); // elements.Length > 3
ParallelOptions options = new ParallelOption();
options.MaxDegreeOfParallelism = 3;
Parallel.For(0, elements.Length, options, i =>
{
element e = elements[i];
w[workerIndex].Work(e); // how to set "workerIndex"?
});
是否有一些机制说下一个工作线程id是空闲的?
如果你只是在Parallel.For
循环中new Worker()
,并将它们添加到w
(你需要将w更改为并发列表)。
可能(如果不是太复杂的话)将.Work(e)
方法的内容移到循环体中,从而消除对Worker
类的需要。
如果你将Worker
数组更改为IEnumerable
(即List<Worker>
),你可以使用.AsParallel()
使其并行。然后,您可以使用. forall (worker -> worker. work())来并行执行工作。这需要您通过构造函数将element
传递给worker。
看起来对象池模式最适合您。你可以这样写代码:
const int Limit = 3;
using (var pool = new QueueObjectPool<Worker>(a => new Worker(), Limit)) {
element[] elements = GetArrayOfElements();
var options = new ParallelOptions { MaxDegreeOfParallelism = Limit };
Parallel.For(0, elements.Length, options, i => {
element e = elements[i];
Worker worker = null;
try {
worker = pool.Acquire();
worker.Work(e);
} finally {
pool.Release(worker);
}
});
}
在start期间,每个元素将等待可用的worker,并且只有三个worker将从开始初始化。下面是简化的基于队列的对象池实现:
public sealed class QueueObjectPool<TObject> : IDisposable {
private readonly Queue<TObject> _poolQueue;
private readonly Func<QueueObjectPool<TObject>, TObject> _factory;
private readonly int _capacity;
private readonly SemaphoreSlim _throttler;
public QueueObjectPool(Func<QueueObjectPool<TObject>, TObject> factory, int capacity) {
_factory = factory;
_capacity = capacity;
_throttler = new SemaphoreSlim(initialCount: capacity, maxCount: capacity);
_poolQueue = CreatePoolQueue();
}
public TObject Acquire() {
_throttler.Wait();
lock (_poolQueue) {
return _poolQueue.Dequeue();
}
}
public void Release(TObject poolObject) {
lock (_poolQueue) {
_poolQueue.Enqueue(poolObject);
}
_throttler.Release();
}
private Queue<TObject> CreatePoolQueue() {
var queue = new Queue<TObject>(_capacity);
int itemsLeft = _capacity;
while (itemsLeft > 0) {
TObject queueObject = _factory(this);
queue.Enqueue(queueObject);
itemsLeft -= 1;
}
return queue;
}
public void Dispose() {
throw new NotImplementedException();
}
}
此代码用于演示目的。在实际工作中,最好使用基于async/await的逻辑,这很容易使用SemaphoreSlim.WaitAsync
实现,并且可以替换Parallel。