是否有一个.Net类来执行ManualResetEvent.PulseAll()将执行的操作(如果存在的话)

本文关键字:执行 操作 存在 如果 PulseAll Net 有一个 ManualResetEvent 是否 | 更新日期: 2023-09-27 18:01:02

是否有一个.Net类来做ManualResetEvent.PulseAll()要做的事情(如果存在的话)?

我需要原子地释放一组等待同一信号的线程。(对于我的预期用途,我并不担心"线程踩踏"。)

您不能使用ManualResetEvent来执行此操作。例如,如果你这样做:

ManualResetEventSlim signal = new ManualResetEventSlim();
// ...
signal.Set();
signal.Reset();

然后根本不会释放等待信号的线程。

如果在Set()Reset()调用之间放置一个Thread.Sleep(5),则会释放一些等待线程,但不是所有等待线程。将睡眠时间增加到10ms可以释放所有线程。(这是用20个线程测试的。)

显然,添加Thread.Sleep()来实现这一点是不可接受的。

然而,使用Monitor.PulseAll()可以很容易地做到这一点,我已经为此编写了一个小类。(我之所以编写一个类来实现这一点,是因为我们发现使用Monitor的逻辑虽然相当简单,但并不明显,不值得拥有这样一个类以简化使用。)

我的问题很简单:.Net中是否已经有一个类可以做到这一点?

作为参考,这里是我的"ManualResetEvent.PulseAll()"等价物的基本版本:

public sealed class Signaller
{
    public void PulseAll()
    {
        lock (_lock)
        {
            Monitor.PulseAll(_lock);
        }
    }
    public void Wait()
    {
        Wait(Timeout.Infinite);
    }
    public bool Wait(int timeoutMilliseconds)
    {
        lock (_lock)
        {
            return Monitor.Wait(_lock, timeoutMilliseconds);
        }
    }
    private readonly object _lock = new object();
}

下面是一个示例程序,它演示了如果您不在Set()和Reset()之间休眠,则不会释放等待线程:

using System;
using System.Threading;
using System.Threading.Tasks;
namespace Demo
{
    public static class Program
    {
        private static void Main(string[] args)
        {
            _startCounter = new CountdownEvent(NUM_THREADS);
            for (int i = 0; i < NUM_THREADS; ++i)
            {
                int id = i;
                Task.Factory.StartNew(() => test(id));
            }
            Console.WriteLine("Waiting for " + NUM_THREADS + " threads to start");
            _startCounter.Wait(); // Wait for all threads to have started.
            Thread.Sleep(100);
            Console.WriteLine("Threads all started. Setting signal now.");
            _signal.Set();
            // Thread.Sleep(5); // With no sleep at all, NO threads receive the signal. Try commenting this line out.
            _signal.Reset();
            Thread.Sleep(1000);
            Console.WriteLine("'n{0}/{1} threads received the signal.'n'n", _signalledCount, NUM_THREADS);
            Console.WriteLine("Press any key to exit.");
            Console.ReadKey();
        }
        private static void test(int id)
        {
            _startCounter.Signal(); // Used so main thread knows when all threads have started.
            _signal.Wait();
            Interlocked.Increment(ref _signalledCount);
            Console.WriteLine("Task " + id + " received the signal.");
        }
        private const int NUM_THREADS = 20;
        private static readonly ManualResetEventSlim _signal = new ManualResetEventSlim();
        private static CountdownEvent _startCounter;
        private static int _signalledCount;
    }
}

是否有一个.Net类来执行ManualResetEvent.PulseAll()将执行的操作(如果存在的话)

您可以使用Barrier对象。它允许运行未指定数量的任务,然后等待所有其他任务达到该点。

如果你不知道哪些任务中的代码块将作为一个特定的工作单元开始工作,你可以以类似于Go中的WaitGroup的方式使用它。

版本1
最大清晰度:在每个PulseAll循环开始时,都会急切地安装一个新的ManualResetEvent

public class PulseEvent
{
    public PulseEvent()
    {
        mre = new ManualResetEvent(false);
    }
    ManualResetEvent mre;
    public void PulseAll() => Interlocked.Exchange(ref mre, new ManualResetEvent(false)).Set();
    public bool Wait(int ms) => Volatile.Read(ref mre).WaitOne(ms);
    public void Wait() => Wait(Timeout.Infinite);
};

版本2
此版本避免为任何碰巧在没有等待程序的情况下完成的PulseAll周期创建内部事件。每个周期的第一个服务程序进入乐观无锁竞赛,以创建和原子安装单个共享事件。

public class PulseEvent
{
    ManualResetEvent mre;
    public void PulseAll() => Interlocked.Exchange(ref mre, null)?.Set();
    public bool Wait(int ms)
    {
        ManualResetEvent tmp =
           mre ??
           Interlocked.CompareExchange(ref mre, tmp = new ManualResetEvent(false), null) ??
           tmp;
        return tmp.WaitOne(ms);
    }
    public void Wait() => Wait(Timeout.Infinite);
};

版本3
此版本通过分配两个持久ManualResetEvent对象并在它们之间切换来消除每个周期的分配。与上面的例子相比,这稍微改变了语义,如下所示:

  • 首先,回收相同的两个锁意味着PulseAll周期必须足够长,以允许所有等待程序清除前一个锁。否则,当您连续两次快速调用PulseAll时,任何被认为是由上一次PulseAll调用释放的等待线程——但操作系统还没有机会调度——也可能在新周期中被重新阻塞。我提到这主要是作为一种理论考虑,因为除非在亚微秒脉冲周期上阻塞极端数量的线程,否则这是一个没有实际意义的问题。您可以决定这种情况是否与您的情况相关。如果是这样,或者如果您不确定或谨慎,您可以始终使用版本1以上版本2,它们没有此限制。

  • 同样"可以说"不同的是(但请参阅下面的段落,了解为什么第二点可能被证明是不相关的),在这个版本中,对PulseAll的调用被认为是基本上同时的,这意味着除了一个"同时"调用方之外,所有这些调用方都成为了NOP。这种行为并非没有先例(请参阅此处的"备注"),根据应用情况可能是可取的。

请注意,后一点必须被视为合法的设计选择,而不是bug、理论缺陷或并发错误。这是因为Pulse锁在多个同时PulseAll的情况下本质上是模糊的:特别是,没有办法证明任何没有被指定的单个脉冲发生器释放的服务员也必然会被其他合并/消除的脉冲之一释放。

换句话说,这种类型的锁并不是为了原子化地序列化PulseAll调用方而设计的,事实上它确实不可能,因为跳过的"同时"脉冲总是有可能独立地来来去去,即使在合并脉冲的时间之后完全,但在服务员(他不会被脉冲)到达之前仍然"脉冲"。

public class PulseEvent
{
    public PulseEvent()
    {
        cur = new ManualResetEvent(false);
        alt = new ManualResetEvent(true);
    }
    ManualResetEvent cur, alt;
    public void PulseAll()
    {
        ManualResetEvent tmp;
        if ((tmp = Interlocked.Exchange(ref alt, null)) != null) // try claiming 'pulser'
        {
            tmp.Reset();                     // prepare for re-use, ending previous cycle
            (tmp = Interlocked.Exchange(ref cur, tmp)).Set();    // atomic swap & pulse
            Volatile.Write(ref alt, tmp);    // release claim; re-allow 'pulser' claims
        }
    }
    public bool Wait(int ms) => cur.WaitOne(ms);  // 'cur' is never null (unlike 'alt')
    public void Wait() => Wait(Timeout.Infinite);
};

 


最后,介绍几点一般性意见。在这里和这种类型的代码中,一个重要的反复出现的主题通常是,当ManualResetEvent仍然是公共可见的时,不能将其更改为信号状态(即通过调用Set。在上面的代码中,我们使用Interlocked.Exchange原子性地更改"cur"中活动锁的标识(在这种情况下,通过在替换中即时交换),并且在之前执行操作,Set对于确保除了在交换时已经被阻止的等待程序之外,不能再向ManualResetEvent添加新的等待程序至关重要。

只有在这个交换之后,才可以通过在我们(现在是)的私有副本上调用Set来安全地释放那些等待的线程。如果我们在ManualResetEvent上呼叫Set,而它仍在发布,那么一个迟到的服务员可能会错过即时脉冲,但却看到了打开的锁,并在不等待下一个锁的情况下顺利通过,这是定义所要求的。

有趣的是,这意味着,尽管直觉上感觉"脉冲"发生的确切时刻应该与Set的调用一致,但事实上,更准确地说,它就在Interlocked.Exchange的时刻,因为这是一个严格确定截止时间之前/之后的动作,并确定要释放的服务员(如果有的话)。

因此,错过截止时间并立即到达的服务员必须只能看到——并将阻止——现在为下一个周期指定的事件,即使当前周期尚未发出信号,也没有释放任何等待线程,这也是真的,所有这些都是"瞬时"脉冲正确性所必需的。