具有优先级的并发集合
本文关键字:并发 集合 优先级 | 更新日期: 2023-09-27 18:01:55
我有许多线程从服务器列表中检索数据。服务器列表每5分钟从服务器解析器下载一次。处理数据的线程应该只使用响应时间最短的服务器。每个服务器的响应时间因请求而异。因此,在更新服务器列表之间的时间范围内,我应该验证每个服务器的响应时间。
我最初的方法是创建两个额外的线程:第一个线程更新服务器列表,第二个线程验证来自每个服务器的响应时间,并根据它们的响应时间对服务器列表进行排序。
我尝试使用旨在连接生产者和消费者的BlockingCollection<T>
,但在我的任务中,我有两个并发的消费者,而且BlockingCollection<T>
没有本地能力插入条目以创建服务器的优先级列表。
ConcurrentStack<T>
或ConcurrentQueue<T>
也不能使用,因为它们像BlockingCollection<T>
一样是非阻塞的,并且它们需要额外的阻塞线程机制,这需要从队列中获取项目。
请帮我解决这个问题
随着。net 6的出现,出现了一个新的类PriorityQueue<TElement, TPriority>
。这不是一个线程安全的集合,但是它可以很容易地用作IProducerConsumerCollection<T>
实现的后台存储,而后者又可以成为BlockingCollection<T>
类的底层数据存储。下面就是这样一个实现,它包含了完成这项工作所需的最小逻辑:
public class ProducerConsumerPriorityQueue<TElement, TPriority>
: IProducerConsumerCollection<(TElement, TPriority)>
{
private readonly PriorityQueue<TElement, (TPriority, long)> _queue;
private long _index = 0;
public ProducerConsumerPriorityQueue(IComparer<TPriority> comparer = default)
{
comparer ??= Comparer<TPriority>.Default;
_queue = new(Comparer<(TPriority, long)>.Create((x, y) =>
{
int result = comparer.Compare(x.Item1, y.Item1);
if (result == 0) result = x.Item2.CompareTo(y.Item2);
return result;
}));
}
public int Count { get { lock (_queue) return _queue.Count; } }
public bool TryAdd((TElement, TPriority) item)
{
lock (_queue) _queue.Enqueue(item.Item1, (item.Item2, ++_index));
return true;
}
public bool TryTake(out (TElement, TPriority) item)
{
lock (_queue)
{
if (_queue.TryDequeue(out var element, out var priority))
{
item = (element, priority.Item1); return true;
}
item = default; return false;
}
}
public bool IsSynchronized => false;
public object SyncRoot => throw new NotSupportedException();
public (TElement, TPriority)[] ToArray()
=> throw new NotSupportedException();
public void CopyTo((TElement, TPriority)[] array, int index)
=> throw new NotSupportedException();
public void CopyTo(Array array, int index)
=> throw new NotSupportedException();
public IEnumerator<(TElement, TPriority)> GetEnumerator()
=> throw new NotSupportedException();
IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}
只实现了Count
, TryAdd
和TryTake
成员,但它们已经足够了。它们以线程安全的方式实现,因为BlockingCollection<T>
类的文档要求这样做:
IProducerConsumerCollection<T>
表示一个允许线程安全添加和删除数据的集合。
使用例子:
var collection = new BlockingCollection<(string Server, int Priority)>(
new ProducerConsumerPriorityQueue<string, int>());
collection.Add(("Server-A", 20));
collection.Add(("Server-B", 10));
collection.Add(("Server-C", 20));
collection.CompleteAdding();
foreach (var (server, priority) in collection.GetConsumingEnumerable())
{
Console.WriteLine($"Server: {server}, Priority: {priority}");
}
输出:Server: Server-B, Priority: 10
Server: Server-A, Priority: 20
Server: Server-C, Priority: 20
在线演示。
保留具有相同优先级的项的插入顺序。
考虑使用两个或多个BlockingCollections,并使用TakeFromAny来监听所有队列。该方法(尽管我在文档中没有看到它)更喜欢从它正在侦听的队列数组中的第一个队列中获取元素。
不幸的是,这不是内置的,您可能需要自己实现。为了帮助你开始,在msdn示例中已经有了一个实现,可以与BlockingCollection一起使用。
http://blogs.msdn.com/pfxteam/archive/2010/04/04/9990342.aspx 对于一个有效的实现,你需要基于堆数据结构。这是一个很好的文章,虽然类不实现IProducerConsumerCollection: http://www.codeproject.com/Articles/126751/Priority-queue-in-C-with-the-help-of-heap-data-str