消费者超时并处于特定条件下

本文关键字:于特定 条件下 超时 消费者 | 更新日期: 2023-09-27 18:26:22

BlockingCollection<T>类提供了一种实现生产者/消费者模式的简单方法,但遗憾的是,它没有我需要的功能。它允许我在等待使用元素时设置超时,但不提供限制从集合中删除哪个项的方法。

如何实现一个类似于BlockingCollection<T>的类,但它允许我指定应在哪些条件下获取项?

例如:我只需要在Amount等于特定值的情况下取Bar项目:

public class Bar 
{
    public Int32 Amount { get; set; }
}
public class Program 
{
    public  static void Main()
    {
        ToDoCollection<Bar> ToDoCollection = new ToDoCollection<Bar>();
        int timeout = 10000;
        // this doesn't work, that's why I'm asking for your help
        Bar value = ToDoCollection.TryTake().Where(p => p.Amount != 5);
        // Here, I need to wait for 10s trying to take item from blockingCollection
        // item, that will follow specific condition: Bar.Amount has to be greater then zero
    }
}

消费者超时并处于特定条件下

如果我理解正确,您想要一个具有以下行为的集合:

  1. 允许线程尝试检索与特定条件匹配的项目,并将阻塞线程,直到该项目出现为止
  2. 允许线程为第1点中描述的操作指定超时

现有的BlockingCollection类显然与这个问题毫无关系。

您可以实现自己的集合类型,添加所需的任何特定功能。例如:

class BlockingPredicateCollection<T>
{
    private readonly object _lock = new object();
    private readonly List<T> _list = new List<T>();
    public void Add(T t)
    {
        lock (_lock)
        {
            _list.Add(t);
            // Wake any waiting threads, so they can check if the element they
            // are waiting for is now present.
            Monitor.PulseAll(_lock);
        }
    }
    public bool TryTake(out T t, Predicate<T> predicate, TimeSpan timeout)
    {
        Stopwatch sw = Stopwatch.StartNew();
        lock (_lock)
        {
            int index;
            while ((index = _list.FindIndex(predicate)) < 0)
            {
                TimeSpan elapsed = sw.Elapsed;
                if (elapsed > timeout ||
                    !Monitor.Wait(_lock, timeout - elapsed))
                {
                    t = default(T);
                    return false;
                }
            }
            t = _list[index];
            _list.RemoveAt(index);
            return true;
        }
    }
}

然后,例如:

BlockingPredicateCollection<Bar> toDoCollection = new BlockingPredicateCollection<Bar>();
int timeout = 10000;
Bar value;
if (toDoCollection.TryTake(out value,
    p => p.Amount != 5, TimeSpan.FromMilliseconds(timeout)))
{
    // do something with "value"
}