当ConcurrentStack不为空时处理

本文关键字:处理 ConcurrentStack | 更新日期: 2023-09-27 17:51:04

我有一个ConcurrentStack,我倾销项目。当堆栈不是空的时候,一次一个地处理这些项的好方法是什么?我希望在堆栈未被处理时,以一种不会占用CPU周期的方式来完成此操作。

我目前得到的基本上是这个,它似乎不是一个理想的解决方案。

private void AddToStack(MyObj obj)
{
    stack.Push(obj);
    HandleStack();
}
private void HandleStack()
{
    if (handling)
        return;
    Task.Run( () =>
    {
        lock (lockObj)
        {
            handling = true;
            if (stack.Any())
            {
                //handle whatever is on top of the stack
            }
            handling = false;
        }
    }
}

所以bool在那里,所以多个线程不会在等待锁时被备份。但我不希望多个东西同时处理堆栈,因此有了锁。因此,如果两个独立的线程最终同时调用HandleStack并通过bool,那么锁就在那里,因此它们不会同时遍历堆栈。但是一旦第二个线程通过了锁,堆栈就空了,什么也不做。所以这最终给了我想要的行为。

所以我真的只是写一个伪并发包装围绕ConcurrentStack,它似乎有一个不同的方式来实现这一点。想法吗?

当ConcurrentStack不为空时处理

ConcurrentStack<T>是实现IProducerConsumerCollection<T>的集合之一,因此可以由BlockingCollection<T>封装。BlockingCollection<T>有几个方便的成员用于常见的操作,如"在堆栈不空的时候消费"。例如,你可以在循环中调用TryTake。或者,您可以直接使用GetConsumingEnumerable:

private BlockingCollection<MyObj> stack;
private Task consumer;
Constructor()
{
  stack = new BlockingCollection<MyObj>(new ConcurrentStack<MyObj>());
  consumer = Task.Run(() =>
  {
    foreach (var myObj in stack.GetConsumingEnumerable())
    {
      ...
    }
  });
}
private void AddToStack(MyObj obj)
{
  stack.Add(obj);
}

你可以考虑使用微软TPL数据流来做这类事情。

下面是一个简单的示例,展示了如何创建队列。试试MaxDegreeOfParallelismBoundedCapacity的设置,看看会发生什么。

对于你的例子,我认为你会想要设置MaxDegreeOfParallelism为1,如果你不希望多个线程同时处理一个数据项。

(注意:您需要使用。net 4.5x并使用Nuget为项目安装TPL Dataflow)

还可以阅读Stephen Cleary关于TPL的博客。

using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace SimpleTPL
{
    class MyObj
    {
        public MyObj(string data)
        {
            Data = data;
        }
        public readonly string Data;
    }
    class Program
    {
        static void Main()
        {
            var queue = new ActionBlock<MyObj>(data => process(data), actionBlockOptions());
            var task = queueData(queue);
            Console.WriteLine("Waiting for task to complete.");
            task.Wait();
            Console.WriteLine("Completed.");
        }
        private static void process(MyObj data)
        {
            Console.WriteLine("Processing data " + data.Data);
            Thread.Sleep(200); // Simulate load.
        }
        private static async Task queueData(ActionBlock<MyObj> executor)
        {
            for (int i = 0; i < 20; ++i)
            {
                Console.WriteLine("Queuing data " + i);
                MyObj data = new MyObj(i.ToString());
                await executor.SendAsync(data);
            }
            Console.WriteLine("Indicating that no more data will be queued.");
            executor.Complete(); // Indicate that no more items will be queued.
            Console.WriteLine("Waiting for queue to empty.");
            await executor.Completion; // Wait for executor queue to empty.
        }
        private static ExecutionDataflowBlockOptions actionBlockOptions()
        {
            return new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 4,
                BoundedCapacity        = 8
            };
        }
    }
}

看起来你需要一个典型的生产者消费者。

我建议使用autoresetevent

让你的消费者在堆栈为空时等待。调用生产者方法时的调用集。

阅读这个线程

快速和最佳的生产者/消费者队列技术BlockingCollection与并发队列