更好地理解任务和调度
本文关键字:调度 任务 更好 | 更新日期: 2023-09-27 17:55:06
我是一个新手在任务,并行和调度等。这里我创建了两个blockingcollection。一个用来收集输入,另一个用来收集输出。我添加了100项。所以我认为它们的总数应该是100或0。或者它们的和等于100。
然而我发现它们都是0。请用简单的语言帮助我理解这些概念。
static void Main(string[] args)
{
new Program().run();
}
void run()
{
int threadCount = 4;
Task[] workers = new Task[threadCount];
Task.Factory.StartNew(consumer);
// We can do other work in parallel
for (int i = 0; i < threadCount; ++i)
{
int workerId = i;
Task task = new Task(() => worker(workerId));
workers[i] = task;
task.Start();
}
for (int i = 0; i < 100; ++i)
{
Console.WriteLine("Queueing work item {0}", i);
inputQueue.Add(i);
Thread.Sleep(50);
}
Console.WriteLine("Stopping adding.");
inputQueue.CompleteAdding();
Console.WriteLine("The count in InputQueue= {0}", inputQueue.Count);// 0
Task.WaitAll(workers);
outputQueue.CompleteAdding();
Console.WriteLine("The count in OutputQueue= {0}", outputQueue.Count); // 0
Console.WriteLine("Done.");
Console.ReadLine();
}
void worker(int workerId)
{
Console.WriteLine("Worker {0} is starting.", workerId);
foreach (var workItem in inputQueue.GetConsumingEnumerable())
{
Console.WriteLine("Worker {0} is processing item {1}", workerId, workItem);
Thread.Sleep(100); // Simulate work.
outputQueue.Add(workItem); // Output completed item.
}
Console.WriteLine("Worker {0} is stopping.", workerId);
}
void consumer()
{
Console.WriteLine("Consumer is starting.");
foreach (var workItem in outputQueue.GetConsumingEnumerable())
{
Console.WriteLine("Consumer is using item {0}", workItem);
Thread.Sleep(25);
}
Console.WriteLine("Consumer is finished.");
}
BlockingCollection<int> inputQueue = new BlockingCollection<int>();
BlockingCollection<int> outputQueue = new BlockingCollection<int>();
}
你有一个3阶段的管道
run——(inputQueue)——worker——(outputQueue)——consumer
所有三个阶段同时运行,因此一旦有一个项目在inputQueue
中,它可以立即取出并移动到outputQueue
,一旦有一个项目放入outputQueue
,它可以立即取出并处理。
要获得100的计数你需要执行
<>之前(100 -"run"已经放入"inputQueue"的条目数)+"inputQueue。数" +0到"threadCount"之间的某个数字,表示已从"inputQueue"中取出但尚未放入"outputQueue"+中的项目"outputQueue。数" +"consumer"从"outputQueue"中取出的条目数之前因为线程和等待的数量对于负载是完美平衡的,所以上面的公式很可能是0 + 0 + 4 + 0 + 96
,最后4个元素等待在worker
中处理,其他所有元素已经由consumer
处理。如果你做了一半的工作线程,让消费者线程的处理时间增加4倍,你会得到像0 + 48 + 2 + 25 + 25
这样的数字,48个等待被工作线程处理,2个正在被工作线程处理,25个等待被消费者线程处理,25个已经被消费者线程处理。