具有优先级的并发集合

本文关键字:并发 集合 优先级 | 更新日期: 2023-09-27 18:01:55

我有许多线程从服务器列表中检索数据。服务器列表每5分钟从服务器解析器下载一次。处理数据的线程应该只使用响应时间最短的服务器。每个服务器的响应时间因请求而异。因此,在更新服务器列表之间的时间范围内,我应该验证每个服务器的响应时间。

我最初的方法是创建两个额外的线程:第一个线程更新服务器列表,第二个线程验证来自每个服务器的响应时间,并根据它们的响应时间对服务器列表进行排序。

我尝试使用旨在连接生产者和消费者的BlockingCollection<T>,但在我的任务中,我有两个并发的消费者,而且BlockingCollection<T>没有本地能力插入条目以创建服务器的优先级列表。

ConcurrentStack<T>ConcurrentQueue<T>也不能使用,因为它们像BlockingCollection<T>一样是非阻塞的,并且它们需要额外的阻塞线程机制,这需要从队列中获取项目。

请帮我解决这个问题

具有优先级的并发集合

随着。net 6的出现,出现了一个新的类PriorityQueue<TElement, TPriority>。这不是一个线程安全的集合,但是它可以很容易地用作IProducerConsumerCollection<T>实现的后台存储,而后者又可以成为BlockingCollection<T>类的底层数据存储。下面就是这样一个实现,它包含了完成这项工作所需的最小逻辑:

public class ProducerConsumerPriorityQueue<TElement, TPriority>
    : IProducerConsumerCollection<(TElement, TPriority)>
{
    private readonly PriorityQueue<TElement, (TPriority, long)> _queue;
    private long _index = 0;
    public ProducerConsumerPriorityQueue(IComparer<TPriority> comparer = default)
    {
        comparer ??= Comparer<TPriority>.Default;
        _queue = new(Comparer<(TPriority, long)>.Create((x, y) =>
        {
            int result = comparer.Compare(x.Item1, y.Item1);
            if (result == 0) result = x.Item2.CompareTo(y.Item2);
            return result;
        }));
    }
    public int Count { get { lock (_queue) return _queue.Count; } }
    public bool TryAdd((TElement, TPriority) item)
    {
        lock (_queue) _queue.Enqueue(item.Item1, (item.Item2, ++_index));
        return true;
    }
    public bool TryTake(out (TElement, TPriority) item)
    {
        lock (_queue)
        {
            if (_queue.TryDequeue(out var element, out var priority))
            {
                item = (element, priority.Item1); return true;
            }
            item = default; return false;
        }
    }
    public bool IsSynchronized => false;
    public object SyncRoot => throw new NotSupportedException();
    public (TElement, TPriority)[] ToArray()
        => throw new NotSupportedException();
    public void CopyTo((TElement, TPriority)[] array, int index)
        => throw new NotSupportedException();
    public void CopyTo(Array array, int index)
        => throw new NotSupportedException();
    public IEnumerator<(TElement, TPriority)> GetEnumerator()
        => throw new NotSupportedException();
    IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}

只实现了Count, TryAddTryTake成员,但它们已经足够了。它们以线程安全的方式实现,因为BlockingCollection<T>类的文档要求这样做:

IProducerConsumerCollection<T>表示一个允许线程安全添加和删除数据的集合。

使用例子:

var collection = new BlockingCollection<(string Server, int Priority)>(
    new ProducerConsumerPriorityQueue<string, int>());
collection.Add(("Server-A", 20));
collection.Add(("Server-B", 10));
collection.Add(("Server-C", 20));
collection.CompleteAdding();
foreach (var (server, priority) in collection.GetConsumingEnumerable())
{
    Console.WriteLine($"Server: {server}, Priority: {priority}");
}
输出:

Server: Server-B, Priority: 10
Server: Server-A, Priority: 20
Server: Server-C, Priority: 20

在线演示。

保留具有相同优先级的项的插入顺序。

考虑使用两个或多个BlockingCollections,并使用TakeFromAny来监听所有队列。该方法(尽管我在文档中没有看到它)更喜欢从它正在侦听的队列数组中的第一个队列中获取元素。

不幸的是,这不是内置的,您可能需要自己实现。为了帮助你开始,在msdn示例中已经有了一个实现,可以与BlockingCollection一起使用。

http://blogs.msdn.com/pfxteam/archive/2010/04/04/9990342.aspx

对于一个有效的实现,你需要基于堆数据结构。这是一个很好的文章,虽然类不实现IProducerConsumerCollection: http://www.codeproject.com/Articles/126751/Priority-queue-in-C-with-the-help-of-heap-data-str