在没有等待时删除信号量

本文关键字:删除 信号量 等待 | 更新日期: 2023-09-27 18:07:20

这里有一个并发性问题:

字符串值用于表示抽象资源,并且只允许一个线程为给定的字符串值工作;但是,如果多个线程的字符串值不同,则可以并发运行。到目前为止,很简单:

private static readonly Dictionary<String,Object> _locks = new Dictionary<String,Object>();
public static void DoSomethingMutuallyExclusiveByName(String resourceName) {
    Object resourceLock;
    lock( _locks ) {
        if( !_locks.TryGetValue( resourceName, out resourceLock ) ) {
            _locks.Add( resourceName, resourceLock = new Object() );
        }
    }
    lock( resourceLock ) {
        EnterCriticalSection( resourceName );
    }
}

但这是次优的:resourceName的域是无界的,_locks可能最终包含数千个或更多的字符串。因此,在没有更多的线程使用特定的resourceName值之后,它的锁定对象应该从字典中删除。

在锁对象被使用后简单地删除它将是一个bug,因为下面的场景(相关代码位于后面)。注意,所有三个线程都有resourceName = "foo"

  • 线程1位于labelC,并且刚刚从_locks字典中删除了resourceName = "foo",从而删除了resourceLock (#1)
  • 线程2在labelB,因为没有其他线程被锁定在resourceLock (#1)上,它不会在lock( resourceLock等待,而是继续进入EnterCriticalSection
  • 线程3在labelA,因为resourceName = "foo"不在_locks(因为线程1删除了它),它添加了一个新的实例到_locks,因为这个resourceLock (#2)是一个新的实例,lock( resourceLock )在线程3中不会等待线程2,因此线程2和3都可以在EnterCriticalSection内,具有相同的resourceName值。
代码:

public static void DoSomethingMutuallyExclusiveByName(String resourceName) {
labelA:        
    Object resourceLock;
    lock( _locks ) {
        if( !_locks.TryGetValue( resourceName, out resourceLock ) ) {
            _locks.Add( resourceName, resourceLock = new Object() );
        }
    }
    try {
labelB:
        lock( resourceLock ) {
            EnterCriticalSection( resourceName );
        }
    }
    finally {
        lock( _locks ) {
            _locks.Remove( resourceName );
        }
labelC:
    }
}

我最初用自己的小技巧解决了这个问题:

class CountedLock {
    public Int32 Count;
}
private static readonly Dictionary<String,CountedLock> _locks = new Dictionary<String,CountedLock>();
public static void DoSomethingMutuallyExclusiveByName(String resourceName) {
labelA:        
    CountedLock resourceLock;
    lock( _locks ) {
        if( !_locks.TryGetValue( resourceName, out resourceLock ) ) {
            _locks.Add( resourceName, resourceLock = new CountedLock() { Count = 1 } );
        }
        else {
            resourceLock.Count++; // no need for Interlocked.Increment as we're already in a mutex code block
        }
    }
    try {
labelB:
        lock( resourceLock ) {
            EnterCriticalSection( resourceName );
        }
    }
    finally {
        lock( _locks ) {
labelD:
            if( --resourceLock.Count == 0 ) {
                _locks.Remove( resourceName );
            }
        }
labelC:
    }
}

这就解决了问题。假设这三个线程位于与之前相同的位置:

  • 线程1没有从_locks中删除它的resourceName resourceLock值,因为线程2也引用了与线程1相同的resourceLock,所以当线程1处于labelD时,相关的计数是2(当线程1达到labelC时,计数变为1)。
  • 线程2继续进入临界区,因为它在线程3之前到达。
  • 如果线程3(在labelA)到达TryGetValue,而线程2仍然在临界区,那么将看到它的resourceName="foo"仍然在_locks字典中,因此获得与线程2相同的实例,因此它将在labelB等待线程2完成。
  • 所以这有效!

但我猜你现在在想:

"所以你有一个锁与一个相关的计数…听起来你好像重新发明了信号量。不要重新发明轮子,使用System.Threading.SempahoreSemaphoreSlim

确实,所以我改变了我的代码使用SemaphoreSlim - SemaphoreSlim实例有一个内部Count值,它是允许进入的线程数(与当前"内部"信号量的线程数相反-这与我的CountedLock示例在上一个示例中的工作方式相反):

private static readonly Dictionary<String,SemaphoreSlim> _locks = new Dictionary<String,SemaphoreSlim>();
public static void DoSomethingMutuallyExclusiveByName(String resourceName) {
labelA:
    SemaphoreSlim resourceLock;
    lock( _locks ) {
        if( !_locks.TryGetValue( resourceName, out resourceLock ) {
            _locks.Add( resourceName, resourceLock = new SemaphoreLock( initialCount: 1, maxCount: 1 ) );
        }
    }
labelB:
    resourceLock.Wait(); // this decrements the semaphore's count
    try {
        EnterCriticalSection( resourceName );
    }
    finally {
        lock( _locks ) {
            Int32 count = resourceLock.Release(); // this increments the sempahore's count
            if( count > 0 ) {
                _locks.Remove( resourceName );
                resourceLock.Dispose();
            }
labelC:
        }
    }
}

…但是发现bug!

考虑这个场景:

    线程1位于labelC,并且刚刚删除并处置了它的resourceLock (#1)实例。线程2在labelB(在调用Wait之前)。它获得了与线程1相同的SemaphoreSlim resourceLock (#1)实例的引用;而是因为Wait方法被调用线程离开后lock( _locks ) labelA下这意味着有一个小,但现存的,机会之窗,线程2将调用resourceLock (#1).Wait()(忽略可能ObjectDisposedException),而线程3(目前labelA)将输入TryGetValue并实例化一个新的实例的SemaphoreLock (#2) resourceName相同,但是因为线程2和3的区别信号情况下,他们都将可能同时进入临界区。

你可以建议:

当你在labelA

下的lock( _locks )块内时,你应该想办法减少信号量

…除了SemaphoreSlim类没有公开任何Decrement方法。可以调用.Wait(0),使其立即返回,因此我的代码看起来像这样:

[...]
labelA:
    SemaphoreSlim resourceLock;
    lock( _locks ) {
        if( !_locks.TryGetValue( resourceName, out resourceLock ) {
            _locks.Add( resourceName, resourceLock = new SemaphoreLock( initialCount: 1, maxCount: 1 ) );
        }
        resourceLock.Wait( 0 );
    }
labelB:
    resourceLock.Wait();
[...]

…但这行不通。Wait(Int32)状态的文档(重点是我的):

如果线程或任务在调用Wait(Int32)时被阻塞,并且由millisecdstimeout指定的超时时间过期,线程或任务不会进入信号量,并且CurrentCount属性不递减

…这个理论到此为止。即使它确实有效,在同一个线程中调用Wait两次可能会减少两次计数,而不是一次。

那么,是否有可能有一个由互斥体保护的临界区,以某种方式"知道"何时不再需要它们?

在没有等待时删除信号量

我实际上会坚持使用更简单的,基于计数的解决方案,而不是SemaphoreSlim,因为您已经实现了它。虽然被称为"苗条",但SemaphoreSlim仍然比您的简单计数器轻。正如您所知,使用信号量实际上会使代码的性能稍微降低,并且稍微复杂一些。如果你需要花更多的时间来说服自己这个版本确实有效,那么也许它不是更好的版本。

所以,也许你是在重新发明轮子,但是SemaphoreSlim是一个通用的信号量,具有你并不完全需要的功能。甚至在Semaphore已经存在的情况下,微软也重新将SemaphoreSlim添加到BCL中。

另一方面,如果您觉得争用可能是您的全局锁的一个问题,您可以尝试使用无锁的方法。最有可能的是,您不会遇到这样的问题,但如果您真的认为这将在您的代码中被调用数千次,您可以选择这样的内容:
private static readonly ConcurrentDictionary<string, CountedLock> _locks 
    = new ConcurrentDictionary<string, CountedLock>();
public static void DoSomethingMutuallyExclusiveByName(string resourceName)
{
    CountedLock resourceLock;
    // we must use a loop to avoid incrementing a stale lock object
    var spinWait = new SpinWait();
    while (true)
    {
        resourceLock = _locks.GetOrAdd(resourceName, i => new CountedLock());
        resourceLock.Increment();
        // check that the instance wasn't removed in the meantime
        if (resourceLock == _locks.GetOrAdd(resourceName, i => new CountedLock()))
            break;
        // otherwise retry
        resourceLock.Decrement();
        spinWait.SpinOnce();
    }
    try
    {
        lock (resourceLock)
        {
            // EnterCriticalSection(resourceName);
        }
    }
    finally
    {
        if (resourceLock.Decrement() <= 0)
            _locks.TryRemove(resourceName, out resourceLock);
    }
}

CountedLock也修改为使用Interlocked类:

class CountedLock
{
    Int32 _count;
    public int Increment() => Interlocked.Increment(ref _count);
    public int Decrement() => Interlocked.Decrement(ref _count);
}

无论哪种方式,我都可能将代码重新组织为通用的,并且(ab)使用IDisposable接口允许您简单地将调用包装在单个using块中。

似乎调用Wait( 0 )实际上是解决方案。实际上,如果信号量已经被阻塞,它不会减少信号量的计数,所以在离开lock( _locks )互斥锁后,只需在无限超时的情况下再次调用它,就像这样:

public static void DoSomethingMutuallyExclusiveByName(String resourceName) {
    Boolean hasLock;
    SemaphoreSlim resourceLock;
    lock( _locks ) {
        if( !_locks.TryGetValue( resourceName, out resourceLock ) {
            _locks.Add( resourceName, resourceLock = new SemaphoreLock( initialCount: 1, maxCount: 1 ) );
        }
        hasLock = resourceLock.Wait( 0 ); // This call will not block. And this call will decrement the semaphore count only if it succeeds (returns true).
    }
    if( !hasLock ) resourceLock.Wait(); // This will block, but will only be called if the previous call to Wait(0) indicated this thread did not enter the semaphore.
    try {
        EnterCriticalSection( resourceName );
    }
    finally {
        lock( _locks ) {
            Int32 count = resourceLock.Release(); // this increments the sempahore's count
            if( count > 0 ) {
                _locks.Remove( resourceName );
                resourceLock.Dispose();
            }
        }
    }
}

我认为这解决了问题-我很痛苦看到一个竞争条件在这个版本。Wait(0)调用在lock( _locks )互斥锁内的事实意味着,给定resourceName的至少一个线程将在第一个lock的闭括号之后立即拥有锁,因此,如果有人正在等待它,或者在临界区内,信号量实例将永远不会从_locks集合中删除。

我不知道,有时候重新发明可能会更好。

我很喜欢基于Monitor的同步构造(事实上,所有Slim类都以这样或那样的方式使用这种构造)。我想到的第一件事是这样简单的:
public class CriticalSectionSlim<TKey>
{
    private readonly HashSet<TKey> lockSet = new HashSet<TKey>();
    public void Enter(TKey key)
    {
        lock (lockSet)
        {
            while (!lockSet.Add(key))
                Monitor.Wait(lockSet);
        }
    }
    public void Exit(TKey key)
    {
        lock (lockSet)
        {
            lockSet.Remove(key);
            Monitor.PulseAll(lockSet);
        }
    }
}

潜在的性能问题是(A)键等待器和(B)非智能脉冲之间存在亲和关系。

解决(B)的稍微复杂一点的版本,从而最小化(A)的影响可能如下:

public class CriticalSectionSlim<TKey>
{
    const int EnteredFlag = int.MinValue;
    private readonly Dictionary<TKey, int> lockInfo = new Dictionary<TKey, int>();
    public void Enter(TKey key)
    {
        lock (lockInfo)
        {
            int info;
            if (lockInfo.TryGetValue(key, out info))
            {
                if ((info & EnteredFlag) != 0)
                {
                    lockInfo[key] = info + 1;
                    do
                    {
                        Monitor.Wait(lockInfo);
                        info = lockInfo[key];
                    }
                    while ((info & EnteredFlag) != 0);
                    info--;
                }
            }
            lockInfo[key] = EnteredFlag | info;
        }
    }
    public void Exit(TKey key)
    {
        lock (lockInfo)
        {
            int waitCount = lockInfo[key] & ~EnteredFlag;
            if (waitCount == 0)
                lockInfo.Remove(key);
            else
            {
                lockInfo[key] = waitCount;
                Monitor.PulseAll(lockInfo);
            }
        }
    }
}

对于每个键,如果它被输入,我们保留一个标志,如果有,也保留服务员的计数。这允许我们避免在没有等待者的情况下在退出时产生脉冲。