多生产者和一个消费者的互斥

本文关键字:消费者 一个 生产者 | 更新日期: 2023-09-27 18:01:32

我有一个有趣的问题,我需要解决一些生产代码。我们目前正在开发一个web服务,它将被许多不同的应用程序调用,主要用于发送电子邮件。每当发送新邮件时,我们最终需要将该邮件的收据添加到数据库中,但理想情况下,我们不希望立即这样做,因此我们将随着时间的推移建立一个缓冲区。一旦缓冲区达到一定长度,或者在足够的时间过去后,缓冲区的内容将被刷新到数据库中。

这样想,当一个线程发送电子邮件时,它将锁定缓冲区,以便在不受干扰的情况下添加日志并维护线程安全。如果它看到缓冲区有一定的大小(在这个例子中我们说1000),那么线程就有责任把它全部写进数据库(我认为这是低效的,但我使用服务堆栈作为我们的web框架,所以如果有一种方法来委托这个任务,我宁愿用这种方法)。

现在,由于写入数据库可能很耗时,我们想添加一个备用缓冲区。因此,一旦一个缓冲区填满,所有新请求将在刷新第一个缓冲区的同时将其工作记录到第二个缓冲区。同样,一旦第二个缓冲区被填满,所有的线程都将移回第一个缓冲区,而第二个缓冲区将被刷新。

我们需要解决的主要问题:

  • 当一个线程决定需要刷新其中一个缓冲区时,它需要指示所有新线程开始记录到第二个缓冲区(这应该像更改一些变量或指针指向空缓冲区一样微不足道)
  • 如果当前临界区用户决定刷新日志时,当前有线程阻塞,则需要重新激活所有阻塞的线程并将它们指向第二个缓冲区

我更关心第二个要点。重新唤醒所有阻塞线程的最佳方法是什么,而不是允许它们进入第一个缓冲区的临界区,而是让它们尝试获得空缓冲区的锁?

编辑

根据下面的评论,我想出了一些我认为可能会起作用的东西。我不知道有线程安全的数据结构存在。

    private readonly ConcurrentQueue<EmailResponse> _logBuffer = new ConcurrentQueue<EmailResponse>();
    private readonly object _lockobject = new object();
    private const int BufferThreshold = 1000;
    public void AddToBuffer(EmailResponse email)
    {
        _logBuffer.Enqueue(email);
        Monitor.Enter(_lockobject);
        if (_logBuffer.Count >= BufferThreshold)
            Task.Run(async () =>
            {
                EmailResponse response;
                for (var i = 0; i < BufferThreshold; i++)
                    if (_logBuffer.TryDequeue(out response))
                        await AddMail(response);
                Monitor.Exit(_lockobject);
            });
        else Monitor.Exit(_lockobject);
    }

多生产者和一个消费者的互斥

我不确定你是否需要第二个缓冲区;我认为ConcurrentQueue是解决你问题的好办法。每个线程都可以在没有冲突的情况下排队,如果任何线程注意到队列的计数超过了魔法阈值,即使有其他线程排队,您也可以安全地退出该数量的对象。

一个(非常快速和肮脏的)工作示例:

static class Buffer
{
    private const int c_MagicThreshold = 10;
    private static ConcurrentQueue<string> s_Messages = new ConcurrentQueue<string>();
    private static object s_LockObj = new object();
    public static void Enqueue(string message)
    {
        s_Messages.Enqueue(message);
        // try to flush every time; spawn on a non-blocking thread and immediately return
        new Task(Flush).Start();
    }
    public static void Flush()
    {
        // do we flush at all?
        if (s_Messages.Count >= c_MagicThreshold)
        {
            lock (s_LockObj)
            {
                // make sure another thread didn't flush while we were waiting
                if (s_Messages.Count >= c_MagicThreshold)
                {
                    List<string> messages = new List<string>();
                    Console.WriteLine("Flushing " + c_MagicThreshold + " messages...");
                    for (int i = 0; i < c_MagicThreshold; i++)
                    {
                        string message;
                        if (!s_Messages.TryDequeue(out message))
                        {
                            throw new InvalidOperationException("How the hell did you manage that?");
                            // or just break from the loop if you don't care much, you spaz
                        }
                        messages.Add(message);
                    }
                    Console.WriteLine("[ " + String.Join(", ", messages) + " ]");
                    // number of new messages enqueued between threshold pass and now
                    Console.WriteLine(s_Messages.Count + " messages remaining in queue");
                }
            }
        }
    }
}

测试调用:

Parallel.For(0, 30, (i) =>
{
    Thread.Sleep(100);  // do other things
    Buffer.Enqueue(i.ToString());
});

测试运行的控制台输出:

刷新10条消息…

[28, 21, 14, 0, 7, 29, 8, 15, 1, 22]

队列中剩余5条消息

刷新10条消息…

[16,3,9,2,23,17,10,4,24,5]

队列中剩余1条消息

刷新10条消息…

[11, 18, 25, 19, 26, 12, 6, 20, 13, 27]

队列中剩余0条消息

你能给每个线程一个对象,持有两个缓冲区,并有线程日志到这个对象吗?当每个线程请求该对象记录某些内容时,该对象将决定写入哪个缓冲区。该对象还可能负责清空数据库的完整缓冲区,而不是阻止线程写入。