消费者超时并处于特定条件下
本文关键字:于特定 条件下 超时 消费者 | 更新日期: 2023-09-27 18:26:22
BlockingCollection<T>
类提供了一种实现生产者/消费者模式的简单方法,但遗憾的是,它没有我需要的功能。它允许我在等待使用元素时设置超时,但不提供限制从集合中删除哪个项的方法。
如何实现一个类似于BlockingCollection<T>
的类,但它允许我指定应在哪些条件下获取项?
例如:我只需要在Amount
等于特定值的情况下取Bar
项目:
public class Bar
{
public Int32 Amount { get; set; }
}
public class Program
{
public static void Main()
{
ToDoCollection<Bar> ToDoCollection = new ToDoCollection<Bar>();
int timeout = 10000;
// this doesn't work, that's why I'm asking for your help
Bar value = ToDoCollection.TryTake().Where(p => p.Amount != 5);
// Here, I need to wait for 10s trying to take item from blockingCollection
// item, that will follow specific condition: Bar.Amount has to be greater then zero
}
}
如果我理解正确,您想要一个具有以下行为的集合:
- 允许线程尝试检索与特定条件匹配的项目,并将阻塞线程,直到该项目出现为止
- 允许线程为第1点中描述的操作指定超时
现有的BlockingCollection
类显然与这个问题毫无关系。
您可以实现自己的集合类型,添加所需的任何特定功能。例如:
class BlockingPredicateCollection<T>
{
private readonly object _lock = new object();
private readonly List<T> _list = new List<T>();
public void Add(T t)
{
lock (_lock)
{
_list.Add(t);
// Wake any waiting threads, so they can check if the element they
// are waiting for is now present.
Monitor.PulseAll(_lock);
}
}
public bool TryTake(out T t, Predicate<T> predicate, TimeSpan timeout)
{
Stopwatch sw = Stopwatch.StartNew();
lock (_lock)
{
int index;
while ((index = _list.FindIndex(predicate)) < 0)
{
TimeSpan elapsed = sw.Elapsed;
if (elapsed > timeout ||
!Monitor.Wait(_lock, timeout - elapsed))
{
t = default(T);
return false;
}
}
t = _list[index];
_list.RemoveAt(index);
return true;
}
}
}
然后,例如:
BlockingPredicateCollection<Bar> toDoCollection = new BlockingPredicateCollection<Bar>();
int timeout = 10000;
Bar value;
if (toDoCollection.TryTake(out value,
p => p.Amount != 5, TimeSpan.FromMilliseconds(timeout)))
{
// do something with "value"
}