当ConcurrentStack不为空时处理
本文关键字:处理 ConcurrentStack | 更新日期: 2023-09-27 17:51:04
我有一个ConcurrentStack,我倾销项目。当堆栈不是空的时候,一次一个地处理这些项的好方法是什么?我希望在堆栈未被处理时,以一种不会占用CPU周期的方式来完成此操作。
我目前得到的基本上是这个,它似乎不是一个理想的解决方案。
private void AddToStack(MyObj obj)
{
stack.Push(obj);
HandleStack();
}
private void HandleStack()
{
if (handling)
return;
Task.Run( () =>
{
lock (lockObj)
{
handling = true;
if (stack.Any())
{
//handle whatever is on top of the stack
}
handling = false;
}
}
}
所以bool在那里,所以多个线程不会在等待锁时被备份。但我不希望多个东西同时处理堆栈,因此有了锁。因此,如果两个独立的线程最终同时调用HandleStack并通过bool,那么锁就在那里,因此它们不会同时遍历堆栈。但是一旦第二个线程通过了锁,堆栈就空了,什么也不做。所以这最终给了我想要的行为。
所以我真的只是写一个伪并发包装围绕ConcurrentStack,它似乎有一个不同的方式来实现这一点。想法吗?
ConcurrentStack<T>
是实现IProducerConsumerCollection<T>
的集合之一,因此可以由BlockingCollection<T>
封装。BlockingCollection<T>
有几个方便的成员用于常见的操作,如"在堆栈不空的时候消费"。例如,你可以在循环中调用TryTake
。或者,您可以直接使用GetConsumingEnumerable
:
private BlockingCollection<MyObj> stack;
private Task consumer;
Constructor()
{
stack = new BlockingCollection<MyObj>(new ConcurrentStack<MyObj>());
consumer = Task.Run(() =>
{
foreach (var myObj in stack.GetConsumingEnumerable())
{
...
}
});
}
private void AddToStack(MyObj obj)
{
stack.Add(obj);
}
你可以考虑使用微软TPL数据流来做这类事情。
下面是一个简单的示例,展示了如何创建队列。试试MaxDegreeOfParallelism
和BoundedCapacity
的设置,看看会发生什么。
对于你的例子,我认为你会想要设置MaxDegreeOfParallelism
为1,如果你不希望多个线程同时处理一个数据项。
(注意:您需要使用。net 4.5x并使用Nuget为项目安装TPL Dataflow)
还可以阅读Stephen Cleary关于TPL的博客。
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace SimpleTPL
{
class MyObj
{
public MyObj(string data)
{
Data = data;
}
public readonly string Data;
}
class Program
{
static void Main()
{
var queue = new ActionBlock<MyObj>(data => process(data), actionBlockOptions());
var task = queueData(queue);
Console.WriteLine("Waiting for task to complete.");
task.Wait();
Console.WriteLine("Completed.");
}
private static void process(MyObj data)
{
Console.WriteLine("Processing data " + data.Data);
Thread.Sleep(200); // Simulate load.
}
private static async Task queueData(ActionBlock<MyObj> executor)
{
for (int i = 0; i < 20; ++i)
{
Console.WriteLine("Queuing data " + i);
MyObj data = new MyObj(i.ToString());
await executor.SendAsync(data);
}
Console.WriteLine("Indicating that no more data will be queued.");
executor.Complete(); // Indicate that no more items will be queued.
Console.WriteLine("Waiting for queue to empty.");
await executor.Completion; // Wait for executor queue to empty.
}
private static ExecutionDataflowBlockOptions actionBlockOptions()
{
return new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 4,
BoundedCapacity = 8
};
}
}
}
看起来你需要一个典型的生产者消费者。
我建议使用autoresetevent让你的消费者在堆栈为空时等待。调用生产者方法时的调用集。
阅读这个线程
快速和最佳的生产者/消费者队列技术BlockingCollection与并发队列