为多个生产者和消费者排队
本文关键字:消费者 排队 生产者 | 更新日期: 2023-09-27 18:10:45
我必须在。net 3.5中编写多个线程(目前至少3个)之间的通信,每个线程都是生产者和消费者。我的想法不是在每对线程之间发送信号,而是实现一个消息队列,它将像这样存储类:
enum Signals { ObjectArrivedOnLightBarrier, ObjectLeftLightBarrier, CodeFound };
enum UnitID { GrabThread, ImageProcessingThread, SaveThread };
// Consumer shows who the message is intended for (and only that unit is allowed to remove it from the queue)
public class QueuedSignal
{
Signals Message;
UnitID Producer;
UnitID Consumer;
}
这个想法是,任何线程都可以偷看队列中的第一个项目,如果消息不是为它准备的,就不去管它(如果有几个其他消息,其中一个可能是为这个线程准备的,这不是问题)。
当有多个生产者和消费者时,
Queue<T>
不是 threadsafe。
如果您使用的是。net 4或更高版本,我建议您使用BlockingCollection<T>
。
不幸的是,您不能使用它,但是有几种并发队列的实现正在使用。
看看Marc Gravel的回答吧。不幸的是,它没有Peek()方法。
这是我在。net 4出来之前用过的一个类;也许你会对这个感兴趣。这不是最好的实现;在。net 4出来之前,我们更多地把它用作占位符。即便如此,这里是:
/// <summary>A bounded blocking queue.</summary>
/// <typeparam name="T">The element type of the queue, which must be a reference type.</typeparam>
public sealed class BoundedBlockingQueue<T>: IDisposable where T: class
{
#region Construction and disposal
/// <summary>Constructor.</summary>
/// <param name="maxQueueSize">
/// The maximum size of the queue.
/// Calls to <see cref="Enqueue"/> when the queue is full will block until at least one item has been removed.
/// Calls to <see cref="Dequeue"/> when the queue is empty will block until a new item is enqueued.
/// </param>
public BoundedBlockingQueue(int maxQueueSize)
{
if (maxQueueSize <= 0)
{
throw new ArgumentOutOfRangeException("maxQueueSize");
}
_queue = new Queue<T>(maxQueueSize);
_itemsAvailable = new Semaphore(0, maxQueueSize);
_spaceAvailable = new Semaphore(maxQueueSize, maxQueueSize);
_queueStopped = new ManualResetEvent(false);
_queueStoppedAndEmpty = new ManualResetEvent(false);
_stoppedOrItemAvailable = new WaitHandle[] { _queueStopped, _itemsAvailable };
}
/// <summary>Disposal.</summary>
public void Dispose()
{
if (_itemsAvailable != null)
{
_itemsAvailable.Close();
_spaceAvailable.Close();
_queueStopped.Close();
_queueStoppedAndEmpty.Close();
_itemsAvailable = null; // Use _itemsAvailable as a flag to indicate that Dispose() has been called.
}
}
#endregion Construction and disposal
#region Public properties
/// <summary>The number of items currently in the queue.</summary>
public int Count
{
get
{
throwIfDisposed();
lock (_queue)
{
return _queue.Count;
}
}
}
/// <summary>Has <see cref="Stop"/> been called?</summary>
public bool Stopped
{
get
{
throwIfDisposed();
return _stopped;
}
}
#endregion Public properties
#region Public methods
/// <summary>
/// Signals that new items will no longer be placed into the queue.
/// After this is called, calls to <see cref="Dequeue"/> will return null immediately if the queue is empty.
/// Before this is called, calls to <see cref="Dequeue"/> will block if the queue is empty.
/// Attempting to enqueue items after this has been called will cause an exception to be thrown.
/// </summary>
/// <remarks>
/// If you use a different thread to enqueue items than the thread that calls Stop() you might get a race condition.
///
/// If the queue is full and a thread calls Enqueue(), that thread will block until space becomes available in the queue.
/// If a different thread then calls Stop() while the other thread is blocked in Enqueue(), the item enqueued by the other
/// thread may become lost since it will be enqueued after the special null value used to indiciate the end of the
/// stream is enqueued.
///
/// To prevent this from happening, you must enqueue from the same thread that calls Stop(), or provide another
/// synchronisation mechanism to avoid this race condition.
/// </remarks>
public void Stop()
{
throwIfDisposed();
lock (_queue)
{
_queueStopped.Set();
_stopped = true;
}
}
/// <summary>
/// Returns the front item of the queue without removing it, or null if the queue is currently empty.
/// A null return does NOT indicate that <see cref="Stop"/> has been called.
/// This never blocks.
/// </summary>
/// <returns>The front item of the queue, or null if the queue is empty.</returns>
public T Peek()
{
throwIfDisposed();
T result;
lock (_queue)
{
if (_queue.Count > 0)
{
result = _queue.Peek();
}
else
{
result = null;
}
}
return result;
}
/// <summary>
/// Enqueues a new non-null item.
/// If there is no room in the queue, this will block until there is room.
/// An exception will be thrown if <see cref="Stop"/> has been called.
/// </summary>
/// <param name="item">The item to be enqueued. This may not be null.</param>
public void Enqueue(T item)
{
throwIfDisposed();
if (item == null)
{
throw new ArgumentNullException("item");
}
if (_stopped)
{
throw new InvalidOperationException("Attempting to enqueue an item to a stopped queue.");
}
this.enqueue(item);
}
/// <summary>
/// Dequeues the next available item.
/// If <see cref="Stop"/> has been called, this returns null if the queue is empty.
/// Otherwise it blocks until an item becomes available (or <see cref="Stop"/> is called).
/// </summary>
/// <returns>The next available item, or null if the queue is empty and stopped.</returns>
public T Dequeue()
{
throwIfDisposed();
if (_isQueueStoppedAndEmpty)
{
return null;
}
WaitHandle.WaitAny(_stoppedOrItemAvailable);
lock (_queue)
{
if (_stopped && (_queue.Count == 0))
{
_isQueueStoppedAndEmpty = true;
_queueStoppedAndEmpty.Set();
return null;
}
else
{
T item = _queue.Dequeue();
_spaceAvailable.Release();
return item;
}
}
}
/// <summary>Waits forever for the queue to become empty AND stopped.</summary>
public void WaitUntilStoppedAndEmpty()
{
throwIfDisposed();
WaitUntilStoppedAndEmpty(Timeout.Infinite);
}
/// <summary>Waits up to the specified time for the queue to become empty AND stopped.</summary>
/// <param name="timeoutMilliseconds">The maximum wait time, in milliseconds.</param>
/// <returns>True if the wait succeeded, false if it timed-out.</returns>
public bool WaitUntilStoppedAndEmpty(int timeoutMilliseconds)
{
throwIfDisposed();
return _queueStoppedAndEmpty.WaitOne(timeoutMilliseconds);
}
#endregion Public methods
#region Private methods
/// <summary>Enqueues a new item (which may be null to indicate the last item to go into the queue).</summary>
/// <param name="item">An item to enqueue.</param>
private void enqueue(T item)
{
_spaceAvailable.WaitOne();
lock (_queue)
{
_queue.Enqueue(item);
}
_itemsAvailable.Release();
}
/// <summary>Throws if this object has been disposed.</summary>
private void throwIfDisposed()
{
if (_itemsAvailable == null)
{
throw new ObjectDisposedException(this.GetType().FullName);
}
}
#endregion Private methods
#region Fields
/// <summary>
/// Contains wait handles for "stopped" and "item available".
/// Therefore using this for WaitAny() will wait until the queue is stopped
/// or an item becomes available, whichever is the sooner.
/// </summary>
private readonly WaitHandle[] _stoppedOrItemAvailable;
private Semaphore _itemsAvailable;
private volatile bool _stopped;
private volatile bool _isQueueStoppedAndEmpty;
private readonly Queue<T> _queue;
private readonly Semaphore _spaceAvailable;
private readonly ManualResetEvent _queueStopped;
private readonly ManualResetEvent _queueStoppedAndEmpty;
#endregion Fields
}
这是一个旧的单元测试。这不是一个很好的单元测试;它一次测试了太多的东西,并且有一些其他的问题,但是它将演示如何使用队列:
[TestMethod]
public void TestBoundedBlockingQueue()
{
int maxQueueSize = 8;
using (var queue = new BoundedBlockingQueue<string>(maxQueueSize))
{
// Fill the queue, but don't block.
for (int i = 0; i < maxQueueSize; ++i)
{
int start1 = DateTimeFunctions.TickCount;
queue.Enqueue(i.ToString());
int elapsed1 = DateTimeFunctions.TickCount - start1;
Assert.IsTrue(elapsed1 < 100, "Took too long to enqueue an item."); // Shouldn't have taken more than 100 ms to enqueue the item.
}
// Now if we try to enqueue something we should block (since the queue should be full).
// We can detect this by starting a thread that will dequeue something in a few seconds
// and then seeing how long the main thread took to enqueue something.
// It should have taken around the thread sleep time (+/- half a second or so).
int sleepTime = 2500;
int tolerance = 500;
Worker.Run(()=>{Thread.Sleep(sleepTime); queue.Dequeue();}, "TestBoundedBlockingQueue Dequeue()");
int start2 = DateTimeFunctions.TickCount;
queue.Enqueue(maxQueueSize.ToString());
int elapsed2 = DateTimeFunctions.TickCount - start2;
Assert.IsTrue(Math.Abs(elapsed2 - sleepTime) <= tolerance, "Didn't take the right time to enqueue an item.");
// Now verify that the remaining items in the queue are the expected ones,
// i.e. from "1".."maxQueueSize" (since the first item, "0", has been dequeued).
for (int i = 1; i <= maxQueueSize; ++i)
{
Assert.AreEqual(i.ToString(), queue.Dequeue(), "Incorrect item dequeued.");
}
Assert.AreEqual(0, queue.Count);
// Now if we try to dequeue something we should block (since the queue is empty).
// We can detect this by starting a thread that will enqueue something in 5 seconds
// and then seeing how long the main thread took to dequeue something.
// It should have taken around 5 seconds (+/- half a second or so).
string testValue = "TEST";
Worker.Run(()=>{Thread.Sleep(sleepTime); queue.Enqueue(testValue);}, "TestBoundedBlockingQueue Enqueue()");
start2 = DateTimeFunctions.TickCount;
Assert.AreEqual(testValue, queue.Dequeue(), "Incorrect item dequeued");
elapsed2 = DateTimeFunctions.TickCount - start2;
Assert.IsTrue(Math.Abs(elapsed2 - sleepTime) <= tolerance, "Didn't take the right time to enqueue an item.");
}
}
2008年6月的。net并行扩展CTP包含了一个BlockingCollection<T>
类,它可以做你想做的事情。尽管它可能没有Peek
方法。该库与。net 3.5兼容。我经常用它。
我一直找不到下载的地方,但你可以做一点搜索。
它可能在响应扩展中可用。Rx的新版本是针对。net 4.5的,但是在http://www.microsoft.com/en-us/download/details.aspx?id=28568