更好地理解任务和调度

本文关键字:调度 任务 更好 | 更新日期: 2023-09-27 17:55:06

我是一个新手在任务,并行和调度等。这里我创建了两个blockingcollection。一个用来收集输入,另一个用来收集输出。我添加了100项。所以我认为它们的总数应该是100或0。或者它们的和等于100。

然而我发现它们都是0。请用简单的语言帮助我理解这些概念。

    static void Main(string[] args)
    {
        new Program().run();
    }
    void run()
    {
        int threadCount = 4;
        Task[] workers = new Task[threadCount];
        Task.Factory.StartNew(consumer);
        // We can do other work in parallel
        for (int i = 0; i < threadCount; ++i)
        {
            int workerId = i;
            Task task = new Task(() => worker(workerId));
            workers[i] = task;
            task.Start();
        }
        for (int i = 0; i < 100; ++i)
        {
            Console.WriteLine("Queueing work item {0}", i);
            inputQueue.Add(i);
            Thread.Sleep(50);
        }
        Console.WriteLine("Stopping adding.");
        inputQueue.CompleteAdding();
        Console.WriteLine("The count in InputQueue= {0}", inputQueue.Count);// 0
        Task.WaitAll(workers);
        outputQueue.CompleteAdding();
        Console.WriteLine("The count in OutputQueue= {0}", outputQueue.Count); // 0
        Console.WriteLine("Done.");
        Console.ReadLine();
    }
    void worker(int workerId)
    {
        Console.WriteLine("Worker {0} is starting.", workerId);
        foreach (var workItem in inputQueue.GetConsumingEnumerable())
        {
            Console.WriteLine("Worker {0} is processing item {1}", workerId, workItem);
            Thread.Sleep(100);          // Simulate work.
            outputQueue.Add(workItem);  // Output completed item.
        }
        Console.WriteLine("Worker {0} is stopping.", workerId);
    }
    void consumer()
    {
        Console.WriteLine("Consumer is starting.");
        foreach (var workItem in outputQueue.GetConsumingEnumerable())
        {
            Console.WriteLine("Consumer is using item {0}", workItem);
            Thread.Sleep(25);
        }
        Console.WriteLine("Consumer is finished.");
    }
    BlockingCollection<int> inputQueue = new BlockingCollection<int>();
    BlockingCollection<int> outputQueue = new BlockingCollection<int>();
}

更好地理解任务和调度

你有一个3阶段的管道

run——(inputQueue)——worker——(outputQueue)——consumer

所有三个阶段同时运行,因此一旦有一个项目在inputQueue中,它可以立即取出并移动到outputQueue,一旦有一个项目放入outputQueue,它可以立即取出并处理。

要获得100的计数你需要执行

<>之前(100 -"run"已经放入"inputQueue"的条目数)+"inputQueue。数" +0到"threadCount"之间的某个数字,表示已从"inputQueue"中取出但尚未放入"outputQueue"+中的项目"outputQueue。数" +"consumer"从"outputQueue"中取出的条目数之前

因为线程和等待的数量对于负载是完美平衡的,所以上面的公式很可能是0 + 0 + 4 + 0 + 96,最后4个元素等待在worker中处理,其他所有元素已经由consumer处理。如果你做了一半的工作线程,让消费者线程的处理时间增加4倍,你会得到像0 + 48 + 2 + 25 + 25这样的数字,48个等待被工作线程处理,2个正在被工作线程处理,25个等待被消费者线程处理,25个已经被消费者线程处理。