具有资源的生产者-消费者

本文关键字:消费者 生产者 资源 | 更新日期: 2023-09-27 17:56:31

我正在尝试使用一组资源实现生产者/消费者模式,因此每个线程都有一个与之关联的资源。例如,我可能有一个任务队列,其中每个任务都需要一个StreamWriter来编写其结果。每个任务还必须向其传递参数。

我从Joseph Albahari的实现开始(我的修改版本见下文)。

我将Action队列替换为Action<T>队列,其中T是资源,并将与线程关联的资源传递给Action。但是,这给我留下了如何将参数传递给Action的问题。显然,Action必须替换为委托,但这留下了在任务排队时如何传递参数的问题(从ProducerConsumerQueue类外部)。关于如何做到这一点的任何想法?

class ProducerConsumerQueue<T>
    {
        readonly object _locker = new object();            
        Thread[] _workers;
        Queue<Action<T>> _itemQ = new Queue<Action<T>>();
        public ProducerConsumerQueue(T[] resources)
        {
            _workers = new Thread[resources.Length];
            // Create and start a separate thread for each worker
            for (int i = 0; i < resources.Length; i++)
            {
                Thread thread = new Thread(() => Consume(resources[i]));
                thread.SetApartmentState(ApartmentState.STA);
                _workers[i] = thread;
                _workers[i].Start();
            }
        }        
        public void Shutdown(bool waitForWorkers)
        {
            // Enqueue one null item per worker to make each exit.
            foreach (Thread worker in _workers)
                EnqueueItem(null);
            // Wait for workers to finish
            if (waitForWorkers)
                foreach (Thread worker in _workers)
                    worker.Join();
        }
        public void EnqueueItem(Action<T> item)
        {
            lock (_locker)
            {
                _itemQ.Enqueue(item);           // We must pulse because we're
                Monitor.Pulse(_locker);         // changing a blocking condition.
            }
        }
        void Consume(T parameter)
        {
            while (true)                        // Keep consuming until
            {                                   // told otherwise.
                Action<T> item;
                lock (_locker)
                {
                    while (_itemQ.Count == 0) Monitor.Wait(_locker);
                    item = _itemQ.Dequeue();
                }
                if (item == null) return;         // This signals our exit.
                item(parameter);                           // Execute item.
            }
        }
    }

具有资源的生产者-消费者

ProducerConsumerQueue<T> 中的T类型不必是资源,它可以是包含资源的复合类型。 跟。NET4 最简单的方法是使用 Tuple<StreamWriter, YourParameterType> . 生产者/使用者队列只是吃掉并吐出T因此在您的Action<T>中,您只需使用属性来获取资源和参数。 如果使用Tuple则将使用Item1来获取资源,并使用Item2来获取参数。

如果不使用 .NET4,过程类似,但您只需创建自己的类:

public class WorkItem<T>
{
    private StreamWriter resource;
    private T parameter;
    public WorkItem(StreamWriter resource, T parameter)
    {
        this.resource = resource;
        this.parameter = parameter;
    }
    public StreamWriter Resource { get { return resource; } }
    public T Parameter { get { return parameter; } }
}

事实上,使其通用化可能会针对您的情况进行过度设计。 您可以只将 T 定义为您想要的类型。

此外,作为参考,中还包含了一些执行多线程处理的新方法。可能适用于您的用例(例如并发队列和并行任务库)的 NET4。 它们还可以与传统方法(如信号量)结合使用。

编辑:

继续使用此方法,下面是一个小的示例类,用于演示如何使用:

  • 用于控制对有限资源的访问的信号量
  • 用于在线程之间安全管理该资源的并发队列
  • 使用任务并行库进行任务管理

这是Processor类:

public class Processor
{
    private const int count = 3;
    private ConcurrentQueue<StreamWriter> queue = new ConcurrentQueue<StreamWriter>();
    private Semaphore semaphore = new Semaphore(count, count);
    public Processor()
    {
        // Populate the resource queue.
        for (int i = 0; i < count; i++) queue.Enqueue(new StreamWriter("sample" + i));
    }
    public void Process(int parameter)
    {
        // Wait for one of our resources to become free.
        semaphore.WaitOne();
        StreamWriter resource;
        queue.TryDequeue(out resource);
        // Dispatch the work to a task.
        Task.Factory.StartNew(() => Process(resource, parameter));
    }
    private Random random = new Random();
    private void Process(StreamWriter resource, int parameter)
    {
        // Do work in background with resource.
        Thread.Sleep(random.Next(10) * 100);
        resource.WriteLine("Parameter = {0}", parameter);
        queue.Enqueue(resource);
        semaphore.Release();
    }
}

现在我们可以像这样使用类:

var processor = new Processor();
for (int i = 0; i < 10; i++)
    processor.Process(i);

并且不会同时安排超过三个任务,每个任务都有自己的回收StreamWriter资源。