异步队列管理器

本文关键字:管理器 队列 异步 | 更新日期: 2023-09-27 18:27:29

我在c#中编写异步多服务器网络应用程序时遇到了一个问题。我有许多工作由线程池负责,其中包括对网络套接字的写入。这最终允许多个线程同时向套接字写入并扰乱我的传出消息。我解决这个问题的想法是实现一个队列系统,每当数据被添加到队列中时,套接字就会写入它

我的问题是,我无法完全理解这种性质的建筑。我想象有一个队列对象,每当数据被添加到队列中时,它就会触发一个事件。然后,事件会写入队列中保存的数据,但这不会起作用,因为如果两个线程同时出现并添加到队列中,即使队列是线程安全的,也会为这两个线程触发事件,我会遇到同样的问题。因此,如果另一个事件正在进行,那么也许可以用某种方法来推迟一个事件,但一旦第一个事件结束,我如何在不简单地阻塞某个互斥对象或其他对象上的线程的情况下继续该事件。如果我不想严格遵守我的"无阻塞"体系结构,这就不会那么难了,但这个特定的应用程序要求我允许线程池线程继续做他们的事情。

有什么想法吗?

异步队列管理器

虽然与Porges答案相似,但在实现方面有所不同。

首先,我通常不会对要发送的字节进行排队,而是在发送线程中对对象进行序列化,但我想这是一个品味问题。但更大的区别在于ConcurrentQueues的使用(除了BlockingCollection)。所以我最终会得到类似的代码

        BlockingCollection<Packet> sendQueue = new BlockingCollection<Packet>(new ConcurrentQueue<Packet>());
        while (true)
        {
            var packet = sendQueue.Take(); //this blocks if there are no items in the queue.
            SendPacket(packet); //Send your packet here.
        }

这里的关键是,您有一个线程来循环此代码,所有其他线程都可以以线程安全的方式添加到队列中(BlockingCollection和ConcurrentQueue都是线程安全的)

看看在C#中异步处理项目队列,我在其中回答了一个类似的问题。

听起来您需要一个线程同步写入套接字,而需要一堆线程写入队列以供该线程处理。

您可以使用阻塞集合(BlockingCollection<T>)来完成艰苦的工作:

// somewhere there is a queue:
BlockingCollection<byte[]> queue = new BlockingCollection<byte[]>();
// in socket-writing thread, read from the queue and send the messages:
foreach (byte[] message in queue.GetConsumingEnumerable())
{
    // just an example... obviously you'd need error handling and stuff here
    socket.Send(message);
}
// in the other threads, just enqueue messages to be sent:
queue.Add(someMessage);

BlockingCollection将处理所有同步。您还可以强制执行最大队列长度和其他有趣的事情。

我不知道C#,但我要做的是让事件触发套接字管理器开始从队列中提取,并一次写一个。如果它已经在运行,触发器将不会执行任何操作,并且一旦队列中没有任何内容,它就会停止。

这解决了两个线程同时写入队列的问题,因为第二个事件将是no-op。

您可以有一个线程安全的队列,所有工作线程都将结果写入该队列。然后有另一个线程轮询该队列,并在看到它们等待时发送结果。