.Net中的键控锁定
本文关键字:锁定 Net | 更新日期: 2023-09-27 18:30:05
我有一个Azure服务总线队列,我在其中接收1到10条具有相同"密钥"的消息。其中一条消息需要使用长时间运行的操作进行处理。完成后,数据库将被更新,其他消息将对其进行检查。然而,与此同时,其他消息也将重新排队,这样进程就不会丢失。
但主要的一点是,这个长时间运行的操作不能对同一个键同时运行,不应该多次运行。
这就是我目前所掌握的:
void Main()
{
Enumerable.Range(1, 1000)
.AsParallel()
.ForAll(async i => await ManageConcurrency(i % 2, async () => await Task.Delay(TimeSpan.FromSeconds(10))));
}
private readonly ConcurrentDictionary<int, SemaphoreSlim> _taskLocks = new ConcurrentDictionary<int, SemaphoreSlim>();
private async Task<bool> ManageConcurrency(int taskId, Func<Task> task)
{
SemaphoreSlim taskLock = null;
try
{
if (_taskLocks.TryGetValue(taskId, out taskLock))
{
if (taskLock.CurrentCount == 0)
{
Console.WriteLine($"{DateTime.Now.ToString("hh:mm:ss.ffffff")}, {taskId}, I found. No available.. Thread Id: {Thread.CurrentThread.ManagedThreadId}");
return false;
}
taskLock.Wait();
Console.WriteLine($"{DateTime.Now.ToString("hh:mm:ss.ffffff")}, {taskId}, I found and took. Thread Id: {System.Threading.Thread.CurrentThread.ManagedThreadId}");
}
else
{
taskLock = new SemaphoreSlim(1, 1);
taskLock = _taskLocks.GetOrAdd(taskId, taskLock);
if (taskLock.CurrentCount == 0)
{
Console.WriteLine($"{DateTime.Now.ToString("hh:mm:ss.ffffff")}, {taskId}, I didn't find, and then found/created. None available.. Thread Id: {System.Threading.Thread.CurrentThread.ManagedThreadId}");
return false;
}
else
{
taskLock.Wait(TimeSpan.FromSeconds(1));
Console.WriteLine($"{DateTime.Now.ToString("hh:mm:ss.ffffff")}, {taskId}, I didn't find, then found/created, and took. Thread Id: {System.Threading.Thread.CurrentThread.ManagedThreadId}");
}
}
Console.WriteLine($"{DateTime.Now.ToString("hh:mm:ss.ffffff")}, {taskId}, Lock pulled for TaskId {taskId}, Thread Id: {System.Threading.Thread.CurrentThread.ManagedThreadId}");
await task.Invoke();
return true;
}
catch (Exception e)
{
;
return false;
}
finally
{
//taskLock?.Release();
_taskLocks.TryRemove(taskId, out taskLock);
//Console.WriteLine($"I removed. Thread Id: {System.Threading.Thread.CurrentThread.ManagedThreadId}");
}
}
它没有按预期工作,因为它会创建多个信号量,突然间,我的长时间运行的操作用同一个键运行了两次。我认为问题是因为整个操作不是原子操作。
解决这个问题的最佳方法是什么?
您正确地认识到需要确保每个键只创建一个信号量。标准的习语是:
var dict = new ConcurrentDictionary<TKey, Lazy<SemaphoreSlim>>();
...
var sem = dict.GetOrAdd( , _ => new new Lazy<SemaphoreSlim>(() => SemaphoreSlim(1, 1))).Value;
可能会创建多个懒惰,但其中只有一个会被揭示和物化。
此外,依赖记忆状态是一种值得怀疑的做法。如果您的队列处理应用程序回收,并且所有信号量都丢失了,该怎么办?您最好使用持久存储来跟踪此锁定信息。
你就快到了。。。你需要保留收到的订单吗?如果不是:
public static void Main(string[] args)
{
Enumerable.Range(1, 1000)
.AsParallel()
.ForAll( i => ManageConcurrency(i % 2, () => Task.Delay(TimeSpan.FromSeconds(10))).Wait());
}
private static readonly ConcurrentDictionary<int, SemaphoreSlim> _lockDict = new ConcurrentDictionary<int, SemaphoreSlim>();
private static async Task<bool> ManageConcurrency(int taskId, Func<Task> task)
{
var gate = _lockDict.GetOrAdd(taskId, _ => new SemaphoreSlim(1, 1));
await gate.WaitAsync();
try
{
Console.WriteLine($"{DateTime.Now.ToString("hh:mm:ss.ffffff")}, {taskId}, Lock pulled for TaskId {taskId}, Thread Id: {System.Threading.Thread.CurrentThread.ManagedThreadId}");
await task();
return true;
}
catch (Exception e)
{
return false;
}
finally
{
gate.Release();
}
}
在我看来,你担心信号量之类的事情让你的生活变得更加艰难。有更容易使用的抽象。
在这种情况下,使用Lazy<T>
是理想的,但由于您希望等待结果,因此Lazy<T>
需要升级到AsyncLazy<T>
。
public class AsyncLazy<T> : Lazy<Task<T>>
{
public AsyncLazy(Func<T> valueFactory) :
base(() => Task.Factory.StartNew(valueFactory))
{ }
public AsyncLazy(Func<T> valueFactory, LazyThreadSafetyMode mode) :
base(() => Task.Factory.StartNew(valueFactory), mode)
{ }
public AsyncLazy(Func<Task<T>> taskFactory) :
base(() => Task.Factory.StartNew(() => taskFactory()).Unwrap())
{ }
public AsyncLazy(Func<Task<T>> taskFactory, LazyThreadSafetyMode mode) :
base(() => Task.Factory.StartNew(() => taskFactory()).Unwrap(), mode)
{ }
public TaskAwaiter<T> GetAwaiter() { return Value.GetAwaiter(); }
}
我创建了一个类来模拟长期运行任务的结果:
public class LongRunningResult
{
public int Index;
}
需要运行才能进行计算的方法:
private LongRunningResult ComputeLongRunningResult(int index)
{
Console.WriteLine($"Running Index {index}");
Thread.Sleep(1000);
return new LongRunningResult() { Index = index };
}
现在我们需要一个字典来保存惰性异步:
private readonly ConcurrentDictionary<int, AsyncLazy<LongRunningResult>> _results
= new ConcurrentDictionary<int, AsyncLazy<LongRunningResult>>();
现在它变得超级容易:
Enumerable
.Range(1, 10)
.AsParallel()
.ForAll(async i =>
{
var index = i % 2;
Console.WriteLine($"Trying Index {index}");
_results.TryAdd(index,
new AsyncLazy<LongRunningResult>(
() => ComputeLongRunningResult(index),
LazyThreadSafetyMode.ExecutionAndPublication));
AsyncLazy<LongRunningResult> ayncLazy;
if (_results.TryGetValue(index, out ayncLazy))
{
await ayncLazy;
}
});
我从中得到的输出如下:
Trying Index 1
Trying Index 0
Trying Index 1
Trying Index 1
Trying Index 0
Trying Index 1
Running Index 1
Trying Index 0
Trying Index 1
Running Index 0
Trying Index 0
Trying Index 0