如何在不同的线程中同时使用队列

本文关键字:队列 线程 | 更新日期: 2023-09-27 17:57:28

如何在c#中处理队列?我想要一个线程将数据排入队列&另一个线程将把来自的数据排到队列中。这些线程应该同时运行。

有可能吗?

如何在不同的线程中同时使用队列

如果需要线程安全性,请使用ConcurrentQueue<T>

如果使用System.Collections.Queue,线程安全性通过以下方式得到保证:

var queue = new Queue();
Queue.Synchronized(queue).Enqueue(new WorkItem());
Queue.Synchronized(queue).Enqueue(new WorkItem());
Queue.Synchronized(queue).Clear();

如果您想使用System.Collections.Generic.Queue<T>,那么创建您自己的包装器类。我已经用System.Collections.Generic.Stack<T>:做好了准备

using System;
using System.Collections.Generic;
[Serializable]
public class SomeStack
{
    private readonly object stackLock = new object();
    private readonly Stack<WorkItem> stack;
    public ContextStack()
    {
        this.stack = new Stack<WorkItem>();
    }
    public IContext Push(WorkItem context)
    {
        lock (this.stackLock)
        {
            this.stack.Push(context);
        }
        return context;
    }
    public WorkItem Pop()
    {
        lock (this.stackLock)
        {
            return this.stack.Pop();
        }
    }
}

一种可能的实现是使用具有独立读写指针的环形缓冲区。在每次读/写操作中,您都会将相反的指针(必须是线程安全的)复制到本地上下文中,然后执行批处理读或写。

每次读取或写入时,都会更新指针并触发事件。

如果读或写线程到达了没有更多工作要做的位置,则在重新读取相应的指针之前,请等待其他线程事件。

您可以使用原子操作实现线程安全队列。我曾经为一个多人游戏写过下面的课。它允许多个线程安全地写入队列,并允许单个其他线程安全地从队列中读取:

/// <summary>
/// The WaitFreeQueue class implements the Queue abstract data type through a linked list. The WaitFreeQueue
/// allows thread-safe addition and removal of elements using atomic operations. Multiple threads can add
/// elements simultaneously, and another thread can remove elements from the queue at the same time. Only one
/// thread can remove elements from the queue at any given time.
/// </summary>
/// <typeparam name="T">The type parameter</typeparam>
public class WaitFreeQueue<T>
{
    // Private fields
    // ==============
    #region Private fields
    private Node<T> _tail;  // The tail of the queue.
    private Node<T> _head;  // The head of the queue.
    #endregion

    // Public methods
    // ==============
    #region Public methods
    /// <summary>
    /// Removes the first item from the queue. This method returns a value to indicate if an item was
    /// available, and passes the item back through an argument.
    /// This method is not thread-safe in itself (only one thread can safely access this method at any
    /// given time) but it is safe to call this method while other threads are enqueueing items.
    /// 
    /// If no item was available at the time of calling this method, the returned value is initialised
    /// to the default value that matches this instance's type parameter. For reference types, this is
    /// a Null reference.
    /// </summary>
    /// <param name="value">The value.</param>
    /// <returns>A boolean value indicating if an element was available (true) or not.</returns>
    public bool Dequeue(ref T value)
    {
        bool succeeded = false;
        value = default(T);
        // If there is an element on the queue then we get it.
        if (null != _head)
        {
            // Set the head to the next element in the list, and retrieve the old head.
            Node<T> head = System.Threading.Interlocked.Exchange<Node<T>>(ref _head, _head.Next);
            // Sever the element we just pulled off the queue.
            head.Next = null;
            // We have succeeded.
            value = head.Value;
            succeeded = true;
        }
        return succeeded;
    }
    /// <summary>
    /// Adds another item to the end of the queue. This operation is thread-safe, and multiple threads
    /// can enqueue items while a single other thread dequeues items.
    /// </summary>
    /// <param name="value">The value to add.</param>
    public void Enqueue(T value)
    {
        // We create a new node for the specified value, and point it to itself.
        Node<T> newNode = new Node<T>(value);
        // In one atomic operation, set the tail of the list to the new node, and remember the old tail.
        Node<T> previousTail = System.Threading.Interlocked.Exchange<Node<T>>(ref _tail, newNode);
        // Link the previous tail to the new tail.
        if (null != previousTail)
            previousTail.Next = newNode;
        // If this is the first node in the list, we save it as the head of the queue.
        System.Threading.Interlocked.CompareExchange<Node<T>>(ref _head, newNode, null);
    } // Enqueue()
    #endregion

    // Public constructor
    // ==================
    #region Public constructor
    /// <summary>
    /// Constructs a new WaitFreeQueue instance.
    /// </summary>
    public WaitFreeQueue() { }
    /// <summary>
    /// Constructs a new WaitFreeQueue instance based on the specified list of items.
    /// The items will be enqueued. The list can be a Null reference.
    /// </summary>
    /// <param name="items">The items</param>
    public WaitFreeQueue(IEnumerable<T> items)
    {
        if(null!=items)
            foreach(T item in items)
                this.Enqueue(item);
    }
    #endregion

    // Private types
    // =============
    #region Private types
    /// <summary>
    /// The Node class represents a single node in the linked list of a WaitFreeQueue.
    /// It contains the queued-up value and a reference to the next node in the list.
    /// </summary>
    /// <typeparam name="U">The type parameter.</typeparam>
    private class Node<U>
    {
        // Public fields
        // =============
        #region Public fields
        public Node<U> Next;
        public U Value;
        #endregion

        // Public constructors
        // ===================
        #region Public constructors
        /// <summary>
        /// Constructs a new node with the specified value.
        /// </summary>
        /// <param name="value">The value</param>
        public Node(U value)
        {
            this.Value = value;
        }
        #endregion
    } // Node generic class
    #endregion
} // WaitFreeQueue class

如果只有一个线程退出队列而多个线程可以排队的限制对您来说是可以的,那么您可以使用它。这对游戏来说非常棒,因为这意味着不需要线程同步。

是一个简单的用法示例

namespace ConsoleApplication1
{
    class Program
    {
        static void Main(string[] args)
        {
            ExampleQueue eq = new ExampleQueue();
            eq.Run();
            // Wait...
            System.Threading.Thread.Sleep(100000);
        }

    }
    class ExampleQueue
    {
        private Queue<int> _myQueue = new Queue<int>();
        public void Run()
        {
            ThreadPool.QueueUserWorkItem(new WaitCallback(PushToQueue), null);
            ThreadPool.QueueUserWorkItem(new WaitCallback(PopFromQueue), null);
        }
        private void PushToQueue(object Dummy)
        {
            for (int i = 0; i <= 1000; i++)
            {
                lock (_myQueue)
                {
                    _myQueue.Enqueue(i);
                }
            }
            System.Console.WriteLine("END PushToQueue");
        }
        private void PopFromQueue(object Dummy)
        {
            int dataElementFromQueue = -1;
            while (dataElementFromQueue < 1000)
            {
                lock (_myQueue)
                {
                    if (_myQueue.Count > 0)
                    {
                        dataElementFromQueue = _myQueue.Dequeue();
                        // Do something with dataElementFromQueue...
                        System.Console.WriteLine("Dequeued " + dataElementFromQueue);
                    }
                }
            }
            System.Console.WriteLine("END PopFromQueue");
        }
    }
}

您可能想要使用一个阻塞队列,其中从队列中弹出的线程将等待,直到一些数据可用。

请参阅:创建阻塞队列<T>在.NET中?