多线程阻塞集合 获取消耗枚举() 生产者-消费者的替代方案
本文关键字:消费者 生产者 方案 集合 获取 取消 枚举 多线程 | 更新日期: 2023-09-27 17:56:11
我遇到多个生产者和多个消费者的情况。生产者将作业输入队列。我选择了BlockingCollection,它工作得很好,因为我需要消费者等待找到工作。但是,如果我使用 GetConsumingEnumerable() 功能,集合中项目的顺序会发生变化......这不是我需要的。
它甚至在MSDN http://msdn.microsoft.com/en-us/library/dd287186.aspx 中说它不保留项目的顺序。
有谁知道这种情况的替代方案?
我看到 Take 方法可用,但它是否也为使用者线程提供了"等待"条件?
它说 http://msdn.microsoft.com/en-us/library/dd287085.aspx
"呼叫 Take 可能会阻止,直到可以删除项目。"使用TryTake更好吗?我真的需要线程等待并继续检查工作。
Take 阻止线程,直到有可用的内容。
顾名思义,TryTake 会尝试这样做,但如果失败或成功,则返回布尔值。允许使用它进行更灵活的操作:
while(goingOn){
if( q.TryTake(out var){
Process(var)
}
else{
DoSomething_Usefull_OrNotUseFull_OrEvenSleep();
}
}
而不是
while(goingOn){
if( var x = q.Take(){
//w'll wait till this ever will happen and then we:
Process(var)
}
}
我的投票是给TryTake :-)
例:
public class ProducerConsumer<T> {
public struct Message {
public T Data;
}
private readonly ThreadRunner _producer;
private readonly ThreadRunner _consumer;
public ProducerConsumer(Func<T> produce, Action<T> consume) {
var q = new BlockingCollection<Message>();
_producer = new Producer(produce,q);
_consumer = new Consumer(consume,q);
}
public void Start() {
_producer.Run();
_consumer.Run();
}
public void Stop() {
_producer.Stop();
_consumer.Stop();
}
private class Producer : ThreadRunner {
public Producer(Func<T> produce, BlockingCollection<Message> q) : base(q) {
_produce = produce;
}
private readonly Func<T> _produce;
public override void Worker() {
try {
while (KeepRunning) {
var item = _produce();
MessageQ.TryAdd(new Message{Data = item});
}
}
catch (ThreadInterruptedException) {
WasInterrupted = true;
}
}
}
public abstract class ThreadRunner {
protected readonly BlockingCollection<Message> MessageQ;
protected ThreadRunner(BlockingCollection<Message> q) {
MessageQ = q;
}
protected Thread Runner;
protected bool KeepRunning = true;
public bool WasInterrupted;
public abstract void Worker();
public void Run() {
Runner = new Thread(Worker);
Runner.Start();
}
public void Stop() {
KeepRunning = false;
Runner.Interrupt();
Runner.Join();
}
}
class Consumer : ThreadRunner {
private readonly Action<T> _consume;
public Consumer(Action<T> consume,BlockingCollection<Message> q) : base(q) {
_consume = consume;
}
public override void Worker() {
try {
while (KeepRunning) {
Message message;
if (MessageQ.TryTake(out message, TimeSpan.FromMilliseconds(100))) {
_consume(message.Data);
}
else {
//There's nothing in the Q so I have some spare time...
//Excellent moment to update my statisics or update some history to logfiles
//for now we sleep:
Thread.Sleep(TimeSpan.FromMilliseconds(100));
}
}
}
catch (ThreadInterruptedException) {
WasInterrupted = true;
}
}
}
}
}
用法:
[Fact]
public void ConsumerShouldConsume() {
var produced = 0;
var consumed = 0;
Func<int> produce = () => {
Thread.Sleep(TimeSpan.FromMilliseconds(100));
produced++;
return new Random(2).Next(1000);
};
Action<int> consume = c => { consumed++; };
var t = new ProducerConsumer<int>(produce, consume);
t.Start();
Thread.Sleep(TimeSpan.FromSeconds(5));
t.Stop();
Assert.InRange(produced,40,60);
Assert.InRange(consumed, 40, 60);
}