如何将任务排入动态队列

本文关键字:动态 队列 任务 | 更新日期: 2023-09-27 18:30:12

我需要为不同的任务创建一个队列。目前,这是通过由http://www.albahari.com/threading/part4.aspx#_Wait_and_Pulse:

using System;
using System.Threading;
using System.Collections.Generic;
public class PCQueue
{
  readonly object _locker = new object();
  Thread[] _workers;
  Queue<Action> _itemQ = new Queue<Action>();
  public PCQueue (int workerCount)
  {
    _workers = new Thread [workerCount];
    // Create and start a separate thread for each worker
    for (int i = 0; i < workerCount; i++)
      (_workers [i] = new Thread (Consume)).Start();
  }
  public void Shutdown (bool waitForWorkers)
  {
    // Enqueue one null item per worker to make each exit.
    foreach (Thread worker in _workers)
      EnqueueItem (null);
    // Wait for workers to finish
    if (waitForWorkers)
      foreach (Thread worker in _workers)
        worker.Join();
  }
  public void EnqueueItem (Action item)
  {
    lock (_locker)
    {
      _itemQ.Enqueue (item);           // We must pulse because we're
      Monitor.Pulse (_locker);         // changing a blocking condition.
    }
  }
  void Consume()
  {
    while (true)                        // Keep consuming until
    {                                   // told otherwise.
      Action item;
      lock (_locker)
      {
        while (_itemQ.Count == 0) Monitor.Wait (_locker);
        item = _itemQ.Dequeue();
      }
      if (item == null) return;         // This signals our exit.
      item();                           // Execute item.
    }
  }
}

主要方法:

static void Main()
{
  PCQueue q = new PCQueue (2);
  Console.WriteLine ("Enqueuing 10 items...");
  for (int i = 0; i < 10; i++)
  {
    int itemNumber = i;      // To avoid the captured variable trap
    q.EnqueueItem (() =>
    {
      Thread.Sleep (1000);          // Simulate time-consuming work
      Console.Write (" Task" + itemNumber);
    });
  }
  q.Shutdown (true);
  Console.WriteLine();
  Console.WriteLine ("Workers complete!");
}

然而,在浏览stackoverflow时,我偶然发现了这个修改后的版本:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
namespace Project
{
    /// <summary>
    /// Description of Multithread.
    /// </summary>
     public class Multithread<T> : IDisposable where T : class
    {
        object locker = new object();
        Thread[] workers;
        Queue<T> taskQ = new Queue<T>();
        public void TaskQueue(int workerCount)
        {
            workers = new Thread[workerCount];
            // Create and start a separate thread for each worker
            for (int i = 0; i < workerCount; i++)
                (workers[i] = new Thread(Consume)).Start();
        }
        public void Dispose()
        {
            // Enqueue one null task per worker to make each exit.
            foreach (Thread worker in workers) EnqueueTask(null);
            foreach (Thread worker in workers) worker.Join();
        }
        public void EnqueueTask(T task)
        {
            lock (locker)
            {
                taskQ.Enqueue(task);
                Monitor.PulseAll(locker);
            }
        }
        void Consume()
        {
            while (true)
            {
                T task;
                lock (locker)
                {
                    while (taskQ.Count == 0) Monitor.Wait(locker);
                    task = taskQ.Dequeue();
                }
                if (task == null) return;         // This signals our exit
                System.Diagnostics.Debug.WriteLine(task);
                Thread.Sleep(1000);              // Simulate time-consuming task
            }
        }
    }
}

这似乎提供了更好的可用性。但是,我找不到如何正确地将任务添加到此队列中。

classname testclass = new classname();
Multithread<classname> testthread = new Multithread<classname>();

我以为这会是一个类似的东西:

testthread.EnqueueTask(testclass.functioname());

然而,它似乎不起作用。我陷入了这个问题,在其他地方找不到解决这个问题的任何帮助。

如何将任务排入动态队列

我看不出Multithread是如何提供更好的可用性的,因为它展示了如何以通用的方式实现生产者/消费者模式,而不是真正决定如何实际消费物品。另一方面,PCQueue的操作允许它实际消费物品。

要修改Multithread以允许它做一些工作,可以删除泛型类型参数T,并用Action替换所有出现的T。在Consume方法中,您需要替换代码

System.Diagnostics.Debug.WriteLine(task);
Thread.Sleep(1000);              // Simulate time-consuming task

通过

task();

要将任务排入队列,您应该通过提供Action来完全按照使用PCQueue时所做的操作进行。您可以为此使用lambda表达式。

使用BlockingCollection可以大大简化这一过程。该数据结构被实现为一个队列,该队列已经封装了生产者-消费者逻辑。

public class PCQueue
{
  private Thread[] workers;
  private BlockingCollection<Action> queue = new BlockingCollection<Action>();
  private CancellationTokenSource cts = new CancellationTokenSource();
  public PCQueue(int workerCount)
  {
    workers = new Thread[workerCount];
    for (int i = 0; i < workerCount; i++)
    {
      workers[i] = new Thread(Run);
      workers[i].Start();
    }
  }
  public void Shutdown(bool waitForWorkers)
  {
    cts.Cancel();
    if (waitForWorkers)
    {
      foreach (Thread thread in workers)
      {
        thread.Join();
      }
    }
  }
  public void EnqueueItem(Action action)
  {
    queue.Add(action);
  }
  private void Consumer()
  {
    while (true)
    {
      Action action = queue.Take(cts.Token);
      try
      {
        if (action != null) action();
      }
      catch (Exception caught)
      {
        // Notify somebody that something bad happened.
      }
    }
  }
}