线程.监视器-多个锁对象vs PulseAll -这是更有效的资源

本文关键字:资源 PulseAll 有效 vs 监视器 对象 线程 | 更新日期: 2023-09-27 18:16:53

在有多个'请求线程'和一个哑'工作线程'的情况下,请求线程必须排队。

考虑两种可能性:

  1. 每个请求线程调用Monitor。在自己的专用对象上等待,该对象进入FIFO队列。当结果到达时,最老的对象被脉冲。

  2. 所有请求线程取一个号码并调用Monitor。等待共享对象。当结果到达时,Monitor。在共享对象上调用PulseAll,所有请求线程检查它们的线程数是否已满。

可能有其他选项,但为了这个问题的目的,请忽略它们。

Question -当存在大量排队线程时:

  • 是任何一种方法显着提高CPU效率?
  • 是任何一种方法显着更多的内存效率?

锁对象只是'new object()'实例。

我的直觉是场景1更有效,因为当脉冲发生时只有一个线程会行动,并且基本对象实例非常资源轻(对吗?)然而,我不太理解等待的机制。如果更多的对象被"监视",可能需要更多的资源?

提前感谢您的真知灼见。


我写了下面的代码来说明这两种情况。

进一步解释:

在我的情况下,'worker'线程接受工作和异步地产生结果。它不知道结果是哪个请求属于,除了产生的结果总是一样的接收请求的顺序。

虽然我有这个申请,这个问题应该是被视为学术的。请不要浪费时间质疑潜在的假设或建议替代的解决方案。然而,欢迎提问,以澄清问题的意图。

using System;
using System.Collections.Generic;
using System.Threading;
namespace praccmd.threads
{
    internal class Program
    {
        private static void Main(string[] args)
        {
            TestResets();
            Console.WriteLine("press key");
            Console.ReadKey();
        }

        private static void TestResets()
        {
            //lock object per work request
            Console.WriteLine("----lock object per work request----");
            for (int i = 1; i <= 10; i++)
            {
                Thread t = new Thread(ThreadLockObjPerRequest);
                t.Name = "Thread_object_per_request_" + i;
                t.Start();
            }
            //now pretend to be the WorkDone event
            while (_ticketQueue.Count > 0)
            {
                Thread.Sleep(50);
                lock (_receiveLock)
                {
                    var doneTicketNext = _ticketQueue.Dequeue();
                    lock (doneTicketNext)
                    {
                        Monitor.Pulse(doneTicketNext);
                        Monitor.Wait(doneTicketNext);
                    }
                }
            }
            //shared lock object (pulseall), one id per request
            Console.WriteLine("----shared lock object----");
            for (int i = 1; i <= 10; i++)
            {
                Thread t = new Thread(ThreadSharedLock);
                t.Name = "Thread_shared_lock_object_" + i;
                t.Start();
            }
            //now pretend to be the WorkDone event
            while (_ticketNumberQueue.Count > 0)
            {
                Thread.Sleep(50);
                lock (_sharedReceiveLock)
                {
                    lock (_sharedLock)
                    {
                        _sharedLock.TicketNumber = _ticketNumberQueue.Dequeue();
                        Monitor.PulseAll(_sharedLock);
                    }
                    lock (_sharedThanksLock) Monitor.Wait(_sharedThanksLock);
                }
            }
        }

        //infrastructure for lock-object-per-request
        private static readonly object _sendLock = new object();
        private static readonly object _receiveLock = new object();
        private static readonly Queue<object> _ticketQueue = new Queue<object>();
        private static object TakeATicket()
        {
            var ticket = new object();
            _ticketQueue.Enqueue(ticket);
            return ticket;
        }
        //lock-object-per-request thread
        private static void ThreadLockObjPerRequest()
        {
            var name = Thread.CurrentThread.Name;
            object ticket;
            lock (_sendLock)
            {
                ticket = TakeATicket();
                //RequestWorkNonBlocking("some data specific to this request");
                Console.WriteLine(name + " sends its request.");
            }
            var myResult = string.Empty;
            lock (ticket)
            {
                Monitor.Wait(ticket);
                //myResult = GetResultFromAStaticVariable();
                Console.WriteLine(name + " gets its data.");
                Monitor.Pulse(ticket);
            }
            //do something with myResult
        }

        //infrastructure for shared-lock
        private class SharedLock { public int TicketNumber { get; set; } }
        private static readonly SharedLock _sharedLock = new SharedLock { TicketNumber = 0 };
        private static readonly dynamic _sharedReceiveLock = new object();
        private static readonly dynamic _sharedThanksLock = new object();
        private static readonly object _ticketIncrementLock = new object();
        private static int _ticketNumber = 0;
        private static readonly Queue<int> _ticketNumberQueue = new Queue<int>();
        private static int TakeATicketNumber()
        {
            lock (_ticketIncrementLock)
            {
                _ticketNumberQueue.Enqueue(++_ticketNumber);
                return _ticketNumber;
            }
        }
        //thread for shared-lock
        private static void ThreadSharedLock()
        {
            var name = Thread.CurrentThread.Name;
            int ticketNumber;
            lock (_sendLock)
            {
                ticketNumber = TakeATicketNumber();
                //RequestWorkNonBlocking("some data specific to this request");
                Console.WriteLine(name + " sends its request.");
            }
            var myResult = string.Empty;
            do
            {
                lock (_sharedLock)
                {
                    Monitor.Wait(_sharedLock);
                    if (_sharedLock.TicketNumber == ticketNumber)
                    {
                        myResult = "response"; //GetResultFromAStaticVariable();
                        Console.WriteLine(name + " gets its data.");
                    }
                }
            } while (myResult.Length == 0);
            lock (_sharedThanksLock) Monitor.Pulse(_sharedThanksLock);
            //do something with myResult
        }
    }
}

线程.监视器-多个锁对象vs PulseAll -这是更有效的资源

性能总是很棘手的,并且很大程度上取决于您的特定上下文;您可能需要度量它才能得到一个好的答案,注意它可能取决于预期未完成任务的数量,等等。

我工作这个多路复用器场景的方式是使用Task API;一个新的传入请求创建一个TaskCompletionSource<T>,创建一个排队(同步)到队列中,即假设每个结果(当它稍后到达时)都是一个int:

private readonly Queue<TaskCompletionSource<int>> queue
            = new Queue<TaskCompletionSource<int>>();
public Task<int> MakeRequest(...) {
    var source = new TaskCompletionSource<int>();
    lock(queue) {
        queue.Enqueue(source);
    }
    return source.Task;
}

,然后当结果进来时,worker可以做如下操作:

private void SetNextResult(int value) {
    TaskCompletionSource<int> source;
    lock(queue) {
        source = queue.Dequeue();
    }
    source.SetResult(value);
}

这样做的好处是它允许每个单独的调用者决定他们想要如何响应延迟的工作:

  • 他们可以使用.Wait/.Result来阻止
  • 他们可以使用.ContinueWith添加回调
  • 他们可以使用await来使用基于状态机的延续