在没有等待时删除信号量
本文关键字:删除 信号量 等待 | 更新日期: 2023-09-27 18:07:20
这里有一个并发性问题:
字符串值用于表示抽象资源,并且只允许一个线程为给定的字符串值工作;但是,如果多个线程的字符串值不同,则可以并发运行。到目前为止,很简单:
private static readonly Dictionary<String,Object> _locks = new Dictionary<String,Object>();
public static void DoSomethingMutuallyExclusiveByName(String resourceName) {
Object resourceLock;
lock( _locks ) {
if( !_locks.TryGetValue( resourceName, out resourceLock ) ) {
_locks.Add( resourceName, resourceLock = new Object() );
}
}
lock( resourceLock ) {
EnterCriticalSection( resourceName );
}
}
但这是次优的:resourceName
的域是无界的,_locks
可能最终包含数千个或更多的字符串。因此,在没有更多的线程使用特定的resourceName
值之后,它的锁定对象应该从字典中删除。
在锁对象被使用后简单地删除它将是一个bug,因为下面的场景(相关代码位于后面)。注意,所有三个线程都有resourceName = "foo"
。
- 线程1位于
labelC
,并且刚刚从_locks
字典中删除了resourceName = "foo"
,从而删除了resourceLock (#1)
。 - 线程2在
labelB
,因为没有其他线程被锁定在resourceLock (#1)
上,它不会在lock( resourceLock
等待,而是继续进入EnterCriticalSection
。 - 线程3在
labelA
,因为resourceName = "foo"
不在_locks
(因为线程1删除了它),它添加了一个新的实例到_locks
,因为这个resourceLock (#2)
是一个新的实例,lock( resourceLock )
在线程3中不会等待线程2,因此线程2和3都可以在EnterCriticalSection
内,具有相同的resourceName
值。
public static void DoSomethingMutuallyExclusiveByName(String resourceName) {
labelA:
Object resourceLock;
lock( _locks ) {
if( !_locks.TryGetValue( resourceName, out resourceLock ) ) {
_locks.Add( resourceName, resourceLock = new Object() );
}
}
try {
labelB:
lock( resourceLock ) {
EnterCriticalSection( resourceName );
}
}
finally {
lock( _locks ) {
_locks.Remove( resourceName );
}
labelC:
}
}
我最初用自己的小技巧解决了这个问题:
class CountedLock {
public Int32 Count;
}
private static readonly Dictionary<String,CountedLock> _locks = new Dictionary<String,CountedLock>();
public static void DoSomethingMutuallyExclusiveByName(String resourceName) {
labelA:
CountedLock resourceLock;
lock( _locks ) {
if( !_locks.TryGetValue( resourceName, out resourceLock ) ) {
_locks.Add( resourceName, resourceLock = new CountedLock() { Count = 1 } );
}
else {
resourceLock.Count++; // no need for Interlocked.Increment as we're already in a mutex code block
}
}
try {
labelB:
lock( resourceLock ) {
EnterCriticalSection( resourceName );
}
}
finally {
lock( _locks ) {
labelD:
if( --resourceLock.Count == 0 ) {
_locks.Remove( resourceName );
}
}
labelC:
}
}
这就解决了问题。假设这三个线程位于与之前相同的位置:
- 线程1没有从
_locks
中删除它的resourceName
resourceLock
值,因为线程2也引用了与线程1相同的resourceLock
,所以当线程1处于labelD
时,相关的计数是2
(当线程1达到labelC
时,计数变为1
)。 - 线程2继续进入临界区,因为它在线程3之前到达。
- 如果线程3(在
labelA
)到达TryGetValue
,而线程2仍然在临界区,那么将看到它的resourceName="foo"
仍然在_locks
字典中,因此获得与线程2相同的实例,因此它将在labelB
等待线程2完成。 - 所以这有效!
但我猜你现在在想:
"所以你有一个锁与一个相关的计数…听起来你好像重新发明了信号量。不要重新发明轮子,使用
System.Threading.Sempahore
或SemaphoreSlim
。
确实,所以我改变了我的代码使用SemaphoreSlim
- SemaphoreSlim
实例有一个内部Count
值,它是允许进入的线程数(与当前"内部"信号量的线程数相反-这与我的CountedLock
示例在上一个示例中的工作方式相反):
private static readonly Dictionary<String,SemaphoreSlim> _locks = new Dictionary<String,SemaphoreSlim>();
public static void DoSomethingMutuallyExclusiveByName(String resourceName) {
labelA:
SemaphoreSlim resourceLock;
lock( _locks ) {
if( !_locks.TryGetValue( resourceName, out resourceLock ) {
_locks.Add( resourceName, resourceLock = new SemaphoreLock( initialCount: 1, maxCount: 1 ) );
}
}
labelB:
resourceLock.Wait(); // this decrements the semaphore's count
try {
EnterCriticalSection( resourceName );
}
finally {
lock( _locks ) {
Int32 count = resourceLock.Release(); // this increments the sempahore's count
if( count > 0 ) {
_locks.Remove( resourceName );
resourceLock.Dispose();
}
labelC:
}
}
}
…但是发现bug!
考虑这个场景:
- 线程1位于
labelC
,并且刚刚删除并处置了它的resourceLock (#1)
实例。线程2在labelB
(在调用Wait
之前)。它获得了与线程1相同的SemaphoreSlim resourceLock (#1)
实例的引用;而是因为Wait
方法被调用线程离开后lock( _locks )
labelA
下这意味着有一个小,但现存的,机会之窗,线程2将调用resourceLock (#1).Wait()
(忽略可能ObjectDisposedException
),而线程3(目前labelA
)将输入TryGetValue
并实例化一个新的实例的SemaphoreLock (#2)
resourceName
相同,但是因为线程2和3的区别信号情况下,他们都将可能同时进入临界区。你可以建议:
当你在
下的labelA
lock( _locks )
块内时,你应该想办法减少信号量
…除了SemaphoreSlim
类没有公开任何Decrement
方法。可以调用.Wait(0)
,使其立即返回,因此我的代码看起来像这样:
[...]
labelA:
SemaphoreSlim resourceLock;
lock( _locks ) {
if( !_locks.TryGetValue( resourceName, out resourceLock ) {
_locks.Add( resourceName, resourceLock = new SemaphoreLock( initialCount: 1, maxCount: 1 ) );
}
resourceLock.Wait( 0 );
}
labelB:
resourceLock.Wait();
[...]
…但这行不通。Wait(Int32)
状态的文档(重点是我的):
如果线程或任务在调用Wait(Int32)时被阻塞,并且由millisecdstimeout指定的超时时间过期,线程或任务不会进入信号量,并且CurrentCount属性不递减。
…这个理论到此为止。即使它确实有效,在同一个线程中调用Wait
两次可能会减少两次计数,而不是一次。
那么,是否有可能有一个由互斥体保护的临界区,以某种方式"知道"何时不再需要它们?
我实际上会坚持使用更简单的,基于计数的解决方案,而不是SemaphoreSlim
,因为您已经实现了它。虽然被称为"苗条",但SemaphoreSlim
仍然比您的简单计数器轻。正如您所知,使用信号量实际上会使代码的性能稍微降低,并且稍微复杂一些。如果你需要花更多的时间来说服自己这个版本确实有效,那么也许它不是更好的版本。
所以,也许你是在重新发明轮子,但是SemaphoreSlim
是一个通用的信号量,具有你并不完全需要的功能。甚至在Semaphore
已经存在的情况下,微软也重新将SemaphoreSlim
添加到BCL中。
private static readonly ConcurrentDictionary<string, CountedLock> _locks
= new ConcurrentDictionary<string, CountedLock>();
public static void DoSomethingMutuallyExclusiveByName(string resourceName)
{
CountedLock resourceLock;
// we must use a loop to avoid incrementing a stale lock object
var spinWait = new SpinWait();
while (true)
{
resourceLock = _locks.GetOrAdd(resourceName, i => new CountedLock());
resourceLock.Increment();
// check that the instance wasn't removed in the meantime
if (resourceLock == _locks.GetOrAdd(resourceName, i => new CountedLock()))
break;
// otherwise retry
resourceLock.Decrement();
spinWait.SpinOnce();
}
try
{
lock (resourceLock)
{
// EnterCriticalSection(resourceName);
}
}
finally
{
if (resourceLock.Decrement() <= 0)
_locks.TryRemove(resourceName, out resourceLock);
}
}
与CountedLock
也修改为使用Interlocked
类:
class CountedLock
{
Int32 _count;
public int Increment() => Interlocked.Increment(ref _count);
public int Decrement() => Interlocked.Decrement(ref _count);
}
无论哪种方式,我都可能将代码重新组织为通用的,并且(ab)使用IDisposable
接口允许您简单地将调用包装在单个using
块中。
似乎调用Wait( 0 )
实际上是解决方案。实际上,如果信号量已经被阻塞,它不会减少信号量的计数,所以在离开lock( _locks )
互斥锁后,只需在无限超时的情况下再次调用它,就像这样:
public static void DoSomethingMutuallyExclusiveByName(String resourceName) {
Boolean hasLock;
SemaphoreSlim resourceLock;
lock( _locks ) {
if( !_locks.TryGetValue( resourceName, out resourceLock ) {
_locks.Add( resourceName, resourceLock = new SemaphoreLock( initialCount: 1, maxCount: 1 ) );
}
hasLock = resourceLock.Wait( 0 ); // This call will not block. And this call will decrement the semaphore count only if it succeeds (returns true).
}
if( !hasLock ) resourceLock.Wait(); // This will block, but will only be called if the previous call to Wait(0) indicated this thread did not enter the semaphore.
try {
EnterCriticalSection( resourceName );
}
finally {
lock( _locks ) {
Int32 count = resourceLock.Release(); // this increments the sempahore's count
if( count > 0 ) {
_locks.Remove( resourceName );
resourceLock.Dispose();
}
}
}
}
我认为这解决了问题-我很痛苦看到一个竞争条件在这个版本。Wait(0)
调用在lock( _locks )
互斥锁内的事实意味着,给定resourceName
的至少一个线程将在第一个lock
的闭括号之后立即拥有锁,因此,如果有人正在等待它,或者在临界区内,信号量实例将永远不会从_locks
集合中删除。
我不知道,有时候重新发明可能会更好。
我很喜欢基于Monitor
的同步构造(事实上,所有Slim
类都以这样或那样的方式使用这种构造)。我想到的第一件事是这样简单的:
public class CriticalSectionSlim<TKey>
{
private readonly HashSet<TKey> lockSet = new HashSet<TKey>();
public void Enter(TKey key)
{
lock (lockSet)
{
while (!lockSet.Add(key))
Monitor.Wait(lockSet);
}
}
public void Exit(TKey key)
{
lock (lockSet)
{
lockSet.Remove(key);
Monitor.PulseAll(lockSet);
}
}
}
潜在的性能问题是(A)键等待器和(B)非智能脉冲之间存在亲和关系。
解决(B)的稍微复杂一点的版本,从而最小化(A)的影响可能如下:
public class CriticalSectionSlim<TKey>
{
const int EnteredFlag = int.MinValue;
private readonly Dictionary<TKey, int> lockInfo = new Dictionary<TKey, int>();
public void Enter(TKey key)
{
lock (lockInfo)
{
int info;
if (lockInfo.TryGetValue(key, out info))
{
if ((info & EnteredFlag) != 0)
{
lockInfo[key] = info + 1;
do
{
Monitor.Wait(lockInfo);
info = lockInfo[key];
}
while ((info & EnteredFlag) != 0);
info--;
}
}
lockInfo[key] = EnteredFlag | info;
}
}
public void Exit(TKey key)
{
lock (lockInfo)
{
int waitCount = lockInfo[key] & ~EnteredFlag;
if (waitCount == 0)
lockInfo.Remove(key);
else
{
lockInfo[key] = waitCount;
Monitor.PulseAll(lockInfo);
}
}
}
}
对于每个键,如果它被输入,我们保留一个标志,如果有,也保留服务员的计数。这允许我们避免在没有等待者的情况下在退出时产生脉冲。