多线程阻塞集合 获取消耗枚举() 生产者-消费者的替代方案

本文关键字:消费者 生产者 方案 集合 获取 取消 枚举 多线程 | 更新日期: 2023-09-27 17:56:11

我遇到多个生产者和多个消费者的情况。生产者将作业输入队列。我选择了BlockingCollection,它工作得很好,因为我需要消费者等待找到工作。但是,如果我使用 GetConsumingEnumerable() 功能,集合中项目的顺序会发生变化......这不是我需要的。

它甚至在MSDN http://msdn.microsoft.com/en-us/library/dd287186.aspx 中说它不保留项目的顺序。

有谁知道这种情况的替代方案?

我看到 Take 方法可用,但它是否也为使用者线程提供了"等待"条件?

它说 http://msdn.microsoft.com/en-us/library/dd287085.aspx

"呼叫 Take 可能会阻止,直到可以删除项目。"使用TryTake更好吗?我真的需要线程等待并继续检查工作。

多线程阻塞集合 获取消耗枚举() 生产者-消费者的替代方案

Take 阻止线程,直到有可用的内容。

顾名思义,TryTake 会尝试这样做,但如果失败或成功,则返回布尔值。允许使用它进行更灵活的操作:

while(goingOn){
   if( q.TryTake(out var){
      Process(var)
   }
   else{
      DoSomething_Usefull_OrNotUseFull_OrEvenSleep();
   }
}

而不是

while(goingOn){
   if( var x = q.Take(){
      //w'll wait till this ever will happen and then we:
      Process(var)
   }
}

我的投票是给TryTake :-)

例:

    public class ProducerConsumer<T> {
        public struct Message {
            public T Data;
        }
        private readonly ThreadRunner _producer;
        private readonly ThreadRunner _consumer;
        public ProducerConsumer(Func<T> produce, Action<T> consume) {
            var q = new BlockingCollection<Message>();
            _producer = new Producer(produce,q);
            _consumer = new Consumer(consume,q);
        }
        public void Start() {
            _producer.Run();
            _consumer.Run();
        }
        public void Stop() {
            _producer.Stop();
            _consumer.Stop();
        }
        private class Producer : ThreadRunner {
            public Producer(Func<T> produce, BlockingCollection<Message> q) : base(q) {
                _produce = produce;
            }
            private readonly Func<T> _produce;
            public override void Worker() {
                try {
                    while (KeepRunning) {
                        var item = _produce();
                        MessageQ.TryAdd(new Message{Data = item});
                    }
                }
                catch (ThreadInterruptedException) {
                    WasInterrupted = true;
                }
            }
        }
        public abstract class ThreadRunner {
            protected readonly BlockingCollection<Message> MessageQ;
            protected ThreadRunner(BlockingCollection<Message> q) {
                MessageQ = q;
            }
            protected Thread Runner;
            protected bool KeepRunning = true;
            public bool WasInterrupted;
            public abstract void Worker();
            public void Run() {
                Runner = new Thread(Worker);
                Runner.Start();
            }
            public void Stop() {
                KeepRunning = false;
                Runner.Interrupt();
                Runner.Join();
            }
        }
        class Consumer : ThreadRunner {
            private readonly Action<T> _consume;
            public Consumer(Action<T> consume,BlockingCollection<Message> q) : base(q) {
                _consume = consume;
            }
            public override void Worker() {
                try {
                    while (KeepRunning) {
                        Message message;
                        if (MessageQ.TryTake(out message, TimeSpan.FromMilliseconds(100))) {
                            _consume(message.Data);
                        }
                        else {
                            //There's nothing in the Q so I have some spare time...
                            //Excellent moment to update my statisics or update some history to logfiles
                            //for now we sleep:
                            Thread.Sleep(TimeSpan.FromMilliseconds(100));
                        }
                    }
                }
                catch (ThreadInterruptedException) {
                    WasInterrupted = true;
                }
            }
        }
    }
}

用法:

[Fact]
public void ConsumerShouldConsume() {
    var produced = 0;
    var consumed = 0;
    Func<int> produce = () => {
        Thread.Sleep(TimeSpan.FromMilliseconds(100));
        produced++;
        return new Random(2).Next(1000);
    };
    Action<int> consume = c => { consumed++; };
    var t = new ProducerConsumer<int>(produce, consume);
    t.Start();
    Thread.Sleep(TimeSpan.FromSeconds(5));
    t.Stop();
    Assert.InRange(produced,40,60);
    Assert.InRange(consumed, 40, 60);
}