如何将任务排入动态队列
本文关键字:动态 队列 任务 | 更新日期: 2023-09-27 18:30:12
我需要为不同的任务创建一个队列。目前,这是通过由http://www.albahari.com/threading/part4.aspx#_Wait_and_Pulse:
using System;
using System.Threading;
using System.Collections.Generic;
public class PCQueue
{
readonly object _locker = new object();
Thread[] _workers;
Queue<Action> _itemQ = new Queue<Action>();
public PCQueue (int workerCount)
{
_workers = new Thread [workerCount];
// Create and start a separate thread for each worker
for (int i = 0; i < workerCount; i++)
(_workers [i] = new Thread (Consume)).Start();
}
public void Shutdown (bool waitForWorkers)
{
// Enqueue one null item per worker to make each exit.
foreach (Thread worker in _workers)
EnqueueItem (null);
// Wait for workers to finish
if (waitForWorkers)
foreach (Thread worker in _workers)
worker.Join();
}
public void EnqueueItem (Action item)
{
lock (_locker)
{
_itemQ.Enqueue (item); // We must pulse because we're
Monitor.Pulse (_locker); // changing a blocking condition.
}
}
void Consume()
{
while (true) // Keep consuming until
{ // told otherwise.
Action item;
lock (_locker)
{
while (_itemQ.Count == 0) Monitor.Wait (_locker);
item = _itemQ.Dequeue();
}
if (item == null) return; // This signals our exit.
item(); // Execute item.
}
}
}
主要方法:
static void Main()
{
PCQueue q = new PCQueue (2);
Console.WriteLine ("Enqueuing 10 items...");
for (int i = 0; i < 10; i++)
{
int itemNumber = i; // To avoid the captured variable trap
q.EnqueueItem (() =>
{
Thread.Sleep (1000); // Simulate time-consuming work
Console.Write (" Task" + itemNumber);
});
}
q.Shutdown (true);
Console.WriteLine();
Console.WriteLine ("Workers complete!");
}
然而,在浏览stackoverflow时,我偶然发现了这个修改后的版本:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
namespace Project
{
/// <summary>
/// Description of Multithread.
/// </summary>
public class Multithread<T> : IDisposable where T : class
{
object locker = new object();
Thread[] workers;
Queue<T> taskQ = new Queue<T>();
public void TaskQueue(int workerCount)
{
workers = new Thread[workerCount];
// Create and start a separate thread for each worker
for (int i = 0; i < workerCount; i++)
(workers[i] = new Thread(Consume)).Start();
}
public void Dispose()
{
// Enqueue one null task per worker to make each exit.
foreach (Thread worker in workers) EnqueueTask(null);
foreach (Thread worker in workers) worker.Join();
}
public void EnqueueTask(T task)
{
lock (locker)
{
taskQ.Enqueue(task);
Monitor.PulseAll(locker);
}
}
void Consume()
{
while (true)
{
T task;
lock (locker)
{
while (taskQ.Count == 0) Monitor.Wait(locker);
task = taskQ.Dequeue();
}
if (task == null) return; // This signals our exit
System.Diagnostics.Debug.WriteLine(task);
Thread.Sleep(1000); // Simulate time-consuming task
}
}
}
}
这似乎提供了更好的可用性。但是,我找不到如何正确地将任务添加到此队列中。
classname testclass = new classname();
Multithread<classname> testthread = new Multithread<classname>();
我以为这会是一个类似的东西:
testthread.EnqueueTask(testclass.functioname());
然而,它似乎不起作用。我陷入了这个问题,在其他地方找不到解决这个问题的任何帮助。
我看不出Multithread
是如何提供更好的可用性的,因为它展示了如何以通用的方式实现生产者/消费者模式,而不是真正决定如何实际消费物品。另一方面,PCQueue
的操作允许它实际消费物品。
要修改Multithread
以允许它做一些工作,可以删除泛型类型参数T
,并用Action
替换所有出现的T
。在Consume
方法中,您需要替换代码
System.Diagnostics.Debug.WriteLine(task);
Thread.Sleep(1000); // Simulate time-consuming task
通过
task();
要将任务排入队列,您应该通过提供Action
来完全按照使用PCQueue
时所做的操作进行。您可以为此使用lambda表达式。
使用BlockingCollection
可以大大简化这一过程。该数据结构被实现为一个队列,该队列已经封装了生产者-消费者逻辑。
public class PCQueue
{
private Thread[] workers;
private BlockingCollection<Action> queue = new BlockingCollection<Action>();
private CancellationTokenSource cts = new CancellationTokenSource();
public PCQueue(int workerCount)
{
workers = new Thread[workerCount];
for (int i = 0; i < workerCount; i++)
{
workers[i] = new Thread(Run);
workers[i].Start();
}
}
public void Shutdown(bool waitForWorkers)
{
cts.Cancel();
if (waitForWorkers)
{
foreach (Thread thread in workers)
{
thread.Join();
}
}
}
public void EnqueueItem(Action action)
{
queue.Add(action);
}
private void Consumer()
{
while (true)
{
Action action = queue.Take(cts.Token);
try
{
if (action != null) action();
}
catch (Exception caught)
{
// Notify somebody that something bad happened.
}
}
}
}