如何实现一个可挂起的BlockingCollection

本文关键字:挂起 BlockingCollection 一个 何实现 实现 | 更新日期: 2023-09-27 17:53:49

我正在编写一个WCF服务,它接收来自几个模块(DB,其他服务…)的通知,并将它们添加到阻塞集合中,在消费者线程上进行处理,该线程向客户端发布相关数据。

客户端可以请求存储在服务器上的全部数据,在此操作期间,我不想接受任何新的通知。基本上我想挂起阻塞收集(或消费者线程),并在完成客户端的请求后恢复通知的接收和处理。

实现这种行为的好方法是什么?

如何实现一个可挂起的BlockingCollection

如果我没有理解错的话,您想要防止消费者线程在进行其他查询操作时从BlockingCollection消费数据,但是在此期间生产者可以继续将数据推入集合。

如果这是正确的,那么我认为最好的方法是有一个ManualResetEvent,它通常是有信号的,消费者线程不会被阻塞,当你想暂停消费者时,你可以重置事件,这将导致每个消费者阻塞等待事件成为信号。

Update:下面是一个快速控制台应用程序,演示了我上面描述的内容。这只是一个快速的演示,但它显示了一个生产者线程和两个消费者线程。点击键盘上的Space Bar,消费者的状态可以在RunningPaused之间切换。

using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Threading;
namespace ProducerConsumerDemo
{
  class Program
  {
    static BlockingCollection<int> _queue = new BlockingCollection<int>();
    static ManualResetEvent _pauseConsumers = new ManualResetEvent(true);
    static bool _paused = false;
    static int _itemsEnqueued = 0;
    static int _itemsDequeued = 0;
    static void Main(string[] args)
    {
      Thread producerThread = new Thread(Producer);
      Thread consumerThread1 = new Thread(Consumer);
      Thread consumerThread2 = new Thread(Consumer);
      producerThread.Start();
      consumerThread1.Start();
      consumerThread2.Start();
      while (true)
      {
        WriteAt(0,0,"State: " + (string)(_paused ? "Paused" : "Running"));
        WriteAt(0,1,"Items In Queue: " + _queue.Count);
        WriteAt(0, 2, "Total enqueued: " + _itemsEnqueued);
        WriteAt(0, 3, "Total dequeued: " + _itemsDequeued);
        Thread.Sleep(100);
        if (Console.KeyAvailable)
        {
          if (Console.ReadKey().Key == ConsoleKey.Spacebar)
          {
            if (_paused)
            {
              _paused = false;
              _pauseConsumers.Set();
            }
            else
            {
              _paused = true;
              _pauseConsumers.Reset();
            }
          }
        }
      }
    }
    static void WriteAt(int x, int y, string format, params object[] args)
    {
      Console.SetCursorPosition(x, y);
      Console.Write("                                         ");
      Console.SetCursorPosition(x, y);
      Console.Write(format, args);
    }
    static void Consumer()
    {
      while (true)
      {
        if (_paused)
        {
          // If we are paused, wait for the signal to indicate that
          // we can continue
          _pauseConsumers.WaitOne();
        }
        int value;
        if (_queue.TryTake(out value))
        {
          Interlocked.Increment(ref _itemsDequeued);
          // Do something with the data
        }
        Thread.Sleep(500);
      }
    }
    static void Producer()
    {
      Random rnd = new Random();
      while (true)
      {
        if (_queue.TryAdd(rnd.Next(100)))
        {
          Interlocked.Increment(ref _itemsEnqueued);
        }
        Thread.Sleep(500);
      }
    }
  }
}

你可以包装BlockingCollection,并在内部为生产者添加一个Queue和一些同步机制。

下面是一个示例:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Windows.Forms;
using System.Threading;
using System.Collections;
using System.Collections.Concurrent;
namespace WindowsFormsApplication2
{
    static class Program
    {
        private static void Producer()
        {
            int i = 0;
            while (i < 500)
            {
                Thread.Sleep(50); // can remove the sleep just for debugging
                bc.ProducerAdd("Notification " + i);
                i++;
            }
        }
        private static void Consumer()
        {
            while (true)
            {
                foreach (var it in bc)
                {
                    Console.WriteLine(String.Format("{0} : CONSUMES {1}", Thread.CurrentThread.Name, it));
                    // this is just a check test
                    lock (consumed)
                    {
                        consumed.Add(it, Convert.ToInt32(it.Split(' ')[1])); // this will fail if we consume the same twice
                    }
                }
            }
        }
        private static void TogglePause()
        {
            while (true)
            {
                Thread.Sleep(3000); //every 3 seconds
                bc.Paused = !bc.Paused;
                Console.WriteLine("PAUSE is now: " + bc.Paused);
            }
        }
        private static QueuedBlockingCollection<string> bc = new QueuedBlockingCollection<string>();
        private static Dictionary<string, int> consumed = new Dictionary<string, int>();
        /// <summary>
        /// The main entry point for the application.
        /// </summary>
        [STAThread]
        static void Main()
        {
            Thread producer = new Thread(Producer);
            producer.Start();
            Thread consumer1 = new Thread(Consumer);
            consumer1.Name = "Consumer 1";
            consumer1.Start();
            Thread consumer2 = new Thread(Consumer);
            consumer2.Name = "Consumer 2";
            consumer2.Start();
            Thread pauser = new Thread(TogglePause);
            pauser.Start();
            while (true)
            {
                // wait and observe console writelines
                Application.DoEvents();
            }
        }
    }
    class QueuedBlockingCollection<T> : IEnumerable<T>, ICollection, IEnumerable, IDisposable
    {
        private Queue<T> queue = new Queue<T>();
        private BlockingCollection<T> collection = new BlockingCollection<T>();
        private Thread syncThread;
        public bool Paused { get; set; }
        public bool Exiting { get; set; }
        private void SyncLoop()
        {
            // this while will wait for the class to be destroyed
            while (!Exiting)
            {
                try
                {
                    // this while will finish when queue is synched to collection, or paused or exiting
                    while (queue.Count > 0 && !Exiting && !Paused)
                    {
                        lock (collection)
                        {
                            T item = queue.Dequeue();
                            collection.Add(item);
                            Console.WriteLine(String.Format("SYNCHED {0} TO COLLECTION", item));
                        }
                    }
                }
                catch (ObjectDisposedException)
                {
                    // collection has been disposed, exit this thread
                    break;
                }
            }
        }
        public void ProducerAdd(T item)
        {
            // producer always adds to the queue if the collection is paused so it wont block the collection
            // the sync thread will block the collection only when adding from the queue
            // consumers automatically block this collection when enumerating because it is a wrapper to the internal blocked coll...
            if (!Paused)
            {
                lock (collection)
                {
                    collection.Add(item);
                    Console.WriteLine(String.Format("Producer ADDED '" + item + "', status collection {0} queue {1}", this.Count, queue.Count));
                }
            }
            else
            {
                queue.Enqueue(item);
                Console.WriteLine(String.Format("Producer ENQUEUED '" + item + "', Status -> Collection: {0}, Queue: {1}", this.Count, queue.Count));
            }
        }
        public QueuedBlockingCollection()
        {
            //collection.CompleteAdding();
            syncThread = new Thread(SyncLoop);
            syncThread.Start();
        }
        ~QueuedBlockingCollection()
        {
            Exiting = true;
            syncThread.Join(200);
        }
        public IEnumerator<T> GetEnumerator()
        {
            return collection.GetConsumingEnumerable().GetEnumerator();
        }
        IEnumerator IEnumerable.GetEnumerator()
        {
            return collection.GetConsumingEnumerable().GetEnumerator();
        }
        public void CopyTo(Array array, int index)
        {
            var items = collection.GetConsumingEnumerable();
            int offset = 0;
            if (array == null) throw new NullReferenceException("Array must be initialized");
            if (array.Rank > 1) throw new InvalidOperationException("Array must have 1 dimension");
            if (array.GetLength(0) - index < items.Count()) throw new IndexOutOfRangeException("Array is too small");
            foreach (var item in items)
            {
                array.SetValue(item, index + offset);
                offset++;
            }
        }
        public int Count
        {
            get
            {
                return collection.Count;
            }
        }
        public bool IsSynchronized
        {
            get { return queue.Count == 0; }
        }
        public object SyncRoot
        {
            get { return collection; }
        }
        public void Dispose()
        {
            collection.Dispose();
        }
    }
}

你可以添加你需要的BlockingCollection方法,只需将它们作为集合方法的包装器。

下面是一些输出:

Producer ADDED 'Notification 0', status collection 1 queue 0
Producer ADDED 'Notification 1', status collection 2 queue 0
Producer ADDED 'Notification 2', status collection 3 queue 0
Producer ADDED 'Notification 3', status collection 4 queue 0
Consumer 2 : CONSUMES Notification 0
Consumer 2 : CONSUMES Notification 1
Consumer 2 : CONSUMES Notification 2
Consumer 2 : CONSUMES Notification 3
Consumer 2 : CONSUMES Notification 4
Producer ADDED 'Notification 4', status collection 0 queue 0
Producer ADDED 'Notification 5', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 5
Producer ADDED 'Notification 6', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 6
Producer ADDED 'Notification 7', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 7
Producer ADDED 'Notification 8', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 8
Producer ADDED 'Notification 9', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 9
Producer ADDED 'Notification 10', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 10
Producer ADDED 'Notification 11', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 11
Consumer 2 : CONSUMES Notification 12
Producer ADDED 'Notification 12', status collection 0 queue 0
Producer ADDED 'Notification 13', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 13
Producer ADDED 'Notification 14', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 14
Producer ADDED 'Notification 15', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 15
Producer ADDED 'Notification 16', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 16
Producer ADDED 'Notification 17', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 17
Producer ADDED 'Notification 18', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 18
Producer ADDED 'Notification 19', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 19
Producer ADDED 'Notification 20', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 20
Producer ADDED 'Notification 21', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 21
Producer ADDED 'Notification 22', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 22
Producer ADDED 'Notification 23', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 23
Producer ADDED 'Notification 24', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 24
Producer ADDED 'Notification 25', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 25
Producer ADDED 'Notification 26', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 26
Producer ADDED 'Notification 27', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 27
Producer ADDED 'Notification 28', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 28
Producer ADDED 'Notification 29', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 29
Producer ADDED 'Notification 30', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 30
Producer ADDED 'Notification 31', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 31
Producer ADDED 'Notification 32', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 32
Producer ADDED 'Notification 33', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 33
Producer ADDED 'Notification 34', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 34
Producer ADDED 'Notification 35', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 35
Producer ADDED 'Notification 36', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 36
Producer ADDED 'Notification 37', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 37
Producer ADDED 'Notification 38', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 38
Producer ADDED 'Notification 39', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 39
Producer ADDED 'Notification 40', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 40
Producer ADDED 'Notification 41', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 41
Producer ADDED 'Notification 42', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 42
Producer ADDED 'Notification 43', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 43
Producer ADDED 'Notification 44', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 44
Producer ADDED 'Notification 45', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 45
Producer ADDED 'Notification 46', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 46
Producer ADDED 'Notification 47', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 47
Producer ADDED 'Notification 48', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 48
Producer ADDED 'Notification 49', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 49
Producer ADDED 'Notification 50', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 50
Producer ADDED 'Notification 51', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 51
Producer ADDED 'Notification 52', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 52
Producer ADDED 'Notification 53', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 53
Producer ADDED 'Notification 54', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 54
Producer ADDED 'Notification 55', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 55
Producer ADDED 'Notification 56', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 56
Producer ADDED 'Notification 57', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 57
Producer ADDED 'Notification 58', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 58
Producer ADDED 'Notification 59', status collection 1 queue 0
Consumer 1 : CONSUMES Notification 59
Producer ADDED 'Notification 60', status collection 1 queue 0
Consumer 2 : CONSUMES Notification 60
PAUSE is now: True
Producer ENQUEUED 'Notification 61', Status -> Collection: 0, Queue: 1
Producer ENQUEUED 'Notification 62', Status -> Collection: 0, Queue: 2
Producer ENQUEUED 'Notification 63', Status -> Collection: 0, Queue: 3
Producer ENQUEUED 'Notification 64', Status -> Collection: 0, Queue: 4
Producer ENQUEUED 'Notification 65', Status -> Collection: 0, Queue: 5
Producer ENQUEUED 'Notification 66', Status -> Collection: 0, Queue: 6
Producer ENQUEUED 'Notification 67', Status -> Collection: 0, Queue: 7
Producer ENQUEUED 'Notification 68', Status -> Collection: 0, Queue: 8
Producer ENQUEUED 'Notification 69', Status -> Collection: 0, Queue: 9
Producer ENQUEUED 'Notification 70', Status -> Collection: 0, Queue: 10
Producer ENQUEUED 'Notification 71', Status -> Collection: 0, Queue: 11
Producer ENQUEUED 'Notification 72', Status -> Collection: 0, Queue: 12
Producer ENQUEUED 'Notification 73', Status -> Collection: 0, Queue: 13
Producer ENQUEUED 'Notification 74', Status -> Collection: 0, Queue: 14
Producer ENQUEUED 'Notification 75', Status -> Collection: 0, Queue: 15
Producer ENQUEUED 'Notification 76', Status -> Collection: 0, Queue: 16
Producer ENQUEUED 'Notification 77', Status -> Collection: 0, Queue: 17
Producer ENQUEUED 'Notification 78', Status -> Collection: 0, Queue: 18
Producer ENQUEUED 'Notification 79', Status -> Collection: 0, Queue: 19
Producer ENQUEUED 'Notification 80', Status -> Collection: 0, Queue: 20
Producer ENQUEUED 'Notification 81', Status -> Collection: 0, Queue: 21
Producer ENQUEUED 'Notification 82', Status -> Collection: 0, Queue: 22
Producer ENQUEUED 'Notification 83', Status -> Collection: 0, Queue: 23
Producer ENQUEUED 'Notification 84', Status -> Collection: 0, Queue: 24
Producer ENQUEUED 'Notification 85', Status -> Collection: 0, Queue: 25
Producer ENQUEUED 'Notification 86', Status -> Collection: 0, Queue: 26
Producer ENQUEUED 'Notification 87', Status -> Collection: 0, Queue: 27
Producer ENQUEUED 'Notification 88', Status -> Collection: 0, Queue: 28
Producer ENQUEUED 'Notification 89', Status -> Collection: 0, Queue: 29
Producer ENQUEUED 'Notification 90', Status -> Collection: 0, Queue: 30
Producer ENQUEUED 'Notification 91', Status -> Collection: 0, Queue: 31
Producer ENQUEUED 'Notification 92', Status -> Collection: 0, Queue: 32
Producer ENQUEUED 'Notification 93', Status -> Collection: 0, Queue: 33
Producer ENQUEUED 'Notification 94', Status -> Collection: 0, Queue: 34
Producer ENQUEUED 'Notification 95', Status -> Collection: 0, Queue: 35
Producer ENQUEUED 'Notification 96', Status -> Collection: 0, Queue: 36
Producer ENQUEUED 'Notification 97', Status -> Collection: 0, Queue: 37
The thread '<No Name>' (0x1100) has exited with code 0 (0x0).
Producer ENQUEUED 'Notification 98', Status -> Collection: 0, Queue: 38
Producer ENQUEUED 'Notification 99', Status -> Collection: 0, Queue: 39
PAUSE is now: False
Consumer 1 : CONSUMES Notification 61
SYNCHED Notification 61 TO COLLECTION
SYNCHED Notification 62 TO COLLECTION
Consumer 2 : CONSUMES Notification 62
Consumer 1 : CONSUMES Notification 63
SYNCHED Notification 63 TO COLLECTION
SYNCHED Notification 64 TO COLLECTION
Consumer 2 : CONSUMES Notification 64
Consumer 1 : CONSUMES Notification 65
SYNCHED Notification 65 TO COLLECTION
SYNCHED Notification 66 TO COLLECTION
Consumer 2 : CONSUMES Notification 66
Consumer 1 : CONSUMES Notification 67
SYNCHED Notification 67 TO COLLECTION
SYNCHED Notification 68 TO COLLECTION
Consumer 2 : CONSUMES Notification 68
Consumer 1 : CONSUMES Notification 69
SYNCHED Notification 69 TO COLLECTION
SYNCHED Notification 70 TO COLLECTION
Consumer 2 : CONSUMES Notification 70
Consumer 1 : CONSUMES Notification 71
SYNCHED Notification 71 TO COLLECTION
SYNCHED Notification 72 TO COLLECTION
Consumer 2 : CONSUMES Notification 72
Consumer 1 : CONSUMES Notification 73
SYNCHED Notification 73 TO COLLECTION
SYNCHED Notification 74 TO COLLECTION
SYNCHED Notification 75 TO COLLECTION
SYNCHED Notification 76 TO COLLECTION
SYNCHED Notification 77 TO COLLECTION
SYNCHED Notification 78 TO COLLECTION
SYNCHED Notification 79 TO COLLECTION
SYNCHED Notification 80 TO COLLECTION
SYNCHED Notification 81 TO COLLECTION
SYNCHED Notification 82 TO COLLECTION
SYNCHED Notification 83 TO COLLECTION
SYNCHED Notification 84 TO COLLECTION
SYNCHED Notification 85 TO COLLECTION
SYNCHED Notification 86 TO COLLECTION
SYNCHED Notification 87 TO COLLECTION
SYNCHED Notification 88 TO COLLECTION
SYNCHED Notification 89 TO COLLECTION
SYNCHED Notification 90 TO COLLECTION
SYNCHED Notification 91 TO COLLECTION
SYNCHED Notification 92 TO COLLECTION
SYNCHED Notification 93 TO COLLECTION
SYNCHED Notification 94 TO COLLECTION
SYNCHED Notification 95 TO COLLECTION
SYNCHED Notification 96 TO COLLECTION
SYNCHED Notification 97 TO COLLECTION
SYNCHED Notification 98 TO COLLECTION
SYNCHED Notification 99 TO COLLECTION
Consumer 1 : CONSUMES Notification 74
Consumer 2 : CONSUMES Notification 75
Consumer 1 : CONSUMES Notification 76
Consumer 2 : CONSUMES Notification 77
Consumer 2 : CONSUMES Notification 79
Consumer 1 : CONSUMES Notification 78
Consumer 2 : CONSUMES Notification 80
Consumer 1 : CONSUMES Notification 81
Consumer 2 : CONSUMES Notification 82
Consumer 1 : CONSUMES Notification 83
Consumer 2 : CONSUMES Notification 84
Consumer 1 : CONSUMES Notification 85
Consumer 2 : CONSUMES Notification 86
Consumer 1 : CONSUMES Notification 87
Consumer 2 : CONSUMES Notification 88
Consumer 1 : CONSUMES Notification 89
Consumer 2 : CONSUMES Notification 90
Consumer 1 : CONSUMES Notification 91
Consumer 2 : CONSUMES Notification 92
Consumer 1 : CONSUMES Notification 93
Consumer 2 : CONSUMES Notification 94
Consumer 1 : CONSUMES Notification 95
Consumer 2 : CONSUMES Notification 96
Consumer 1 : CONSUMES Notification 97
Consumer 2 : CONSUMES Notification 98
Consumer 1 : CONSUMES Notification 99
PAUSE is now: True
PAUSE is now: False

如果你愿意,你可以使用X个生产者和Y个消费者…

如果不允许在ProducerAdd上阻塞更多的生产者,那么可以使用

Monitor.TryEnter(collection)
不是

lock (collection)

,如果失败则使用

queue.Enqueue()

将在锁释放后立即同步回集合