ConcurrentQueue 一个元素由两个线程占用

本文关键字:两个 线程 元素 一个 ConcurrentQueue | 更新日期: 2023-09-27 18:30:17

我希望两个线程使用一个队列。第一个线程应每 2 秒调用一次,第二个线程应每 3 秒调用一次。两个线程应同时启动。我在访问队列的第一个元素时遇到问题。两个线程都采用索引为 0 的元素。有时它发生在队列的其他元素上,而不仅仅是第一个元素。我在控制台上有这样的输出:

  • 项目 0 处理时间 1 时间:3:27:8
  • 项目 0 处理者 2 时间:3:27:8
  • 项目 2 处理者 1 时间:3:27:10
  • 项目 3 处理者 2 时间: 3:27:11
  • 项目 4 处理者 1 时间: 3:27:12

等等..

这是我使用的代码:

    ConcurrentQueue<int> sharedQueue = new ConcurrentQueue<int>();
    for (int i = 0; i < 10; i++)
    {
        sharedQueue.Enqueue(i);
    }

    int itemCount= 0;

    Task[] tasks = new Task[2];
    for (int i = 0; i < tasks.Length; i++)
    {
        // create the new task
        tasks[i] = new Task(() =>
        {
            while (sharedQueue.Count > 0)
            {
                // define a variable for the dequeue requests
                int queueElement;
                // take an item from the queue
                bool gotElement = sharedQueue.TryDequeue(out queueElement);
                // increment the count of items processed
                if (gotElement)
                {
                    DateTime dt = DateTime.Now;
                    Console.WriteLine("Item " + itemCount + "processed by " 
                        + Task.CurrentId + " Time: " + dt.Hour + ":" + dt.Minute + ":" + dt.Second);
                    Interlocked.Increment(ref itemCount);   
               if (Task.CurrentId == 1) 
                    Thread.Sleep(2000);
                else 
                    Thread.Sleep(3000);                       
                }
            }
        });
        // start the new task
        tasks[i].Start();

    }
    // wait for the tasks to complete
    Task.WaitAll(tasks);
    // report on the number of items processed
    Console.WriteLine("Items processed: {0}", itemCount);
    // wait for input before exiting
    Console.WriteLine("Press enter to finish");
    Console.ReadLine();
}

ConcurrentQueue 一个元素由两个线程占用

替换以下行:

Console.WriteLine("Item " + itemCount + "processed by " ...);

有了这一行:

Console.WriteLine("Item " + queueElement + "processed by " ...);

您看到的问题可能是由于任务几乎同时执行Console.WriteLine,并且两者都看到了相同的itemCount值,因为它们以尚未发生Interlocked.Increment调用的方式交错。无论如何,打印出queueElement可能更有意义,因为它更有意义。

请参阅Brian Gideon关于您的itemCount问题的出色回答。

您可以考虑重写代码以使用 BlockingCollection 而不是 ConcurrentQueue<T> 。使用起来要容易得多。 BlockingCollection是并发集合的包装器。在其默认配置中,后备存储是ConcurrentQueue 。因此,您可以获得相同的并发队列功能,但界面要好得多。

BlockingCollection<int> sharedQueue = new BlockingCollection<int>();
for (int i = 0; i < 10; i++)
{
    sharedQueue.Add(i);
}
// CompleteAdding marks the queue as "complete for adding,"
// meaning that no more items will be added.
sharedQueue.CompleteAdding();
int itemCount= 0;
Task[] tasks = new Task[2];
for (int i = 0; i < tasks.Length; i++)
{
    // create the new task
    tasks[i] = new Task(() =>
    {
        foreach (var queueElement in sharedQueue.GetConsumingEnumerable())
        {
            DateTime dt = DateTime.Now;
            Console.WriteLine("Item " + itemCount + "processed by " 
                + Task.CurrentId + " Time: " + dt.Hour + ":" + dt.Minute + ":" + dt.Second);
            Interlocked.Increment(ref itemCount);   
            if (Task.CurrentId == 1) 
                Thread.Sleep(2000);
            else 
                Thread.Sleep(3000);                       
        }
    });
    // start the new task
    tasks[i].Start();
}

GetConsumingEnumerable 返回一个枚举器,该枚举器将从队列中获取下一项,直到队列为空。它还可以很好地处理取消,这对于ConcurrentQueue来说有点困难。

一般来说,任何时候你想使用 ConcurrentQueue<T> ,你可能想要BlockingCollection<T> .