如何使用异步操作正确获取数据
本文关键字:获取 数据 何使用 异步操作 | 更新日期: 2023-09-27 18:25:54
我需要通过等待已经运行的操作的结果来处理并发请求。
private readonly object _gettingDataTaskLock = new object();
private Task<Data> _gettingDataTask;
public virtual Data GetData(Credential credential)
{
Task<Data> inProgressDataTask = null;
lock (_gettingDataTaskLock)
{
if (_gettingDataTask == null)
{
_gettingDataTask = Task.Factory.StartNew(()
=> GetDataInternal(credential));
_gettingDataTask.ContinueWith((task) =>
{
lock (_gettingDataTaskLock)
{
_gettingDataTask = null;
}
},
TaskContinuationOptions.ExecuteSynchronously);
}
inProgressDataTask = _gettingDataTask;
}
try
{
return inProgressDataTask.Result;
}
catch (AggregateException ex)
{
_logger.ErrorException(ex, "An error occurs during getting full data");
throw ex.InnerException;
}
}
我有一个潜在的问题:几个调用GetData
可能使用不同的credentials
。首先使用正确的凭据进行呼叫,然后使用错误凭据进行呼叫。这两个请求得到了与第一个相同的答案。如何解决这个问题?
有没有什么方法可以简化这些代码,使其线程安全且无错误?
更新:
这里似乎实现了双锁检查,就像在singleton模式中一样。CCD_ 3只要执行就被缓存。例如,如果它在100ms内执行,并且每10ms运行一个GetData()
,则前10个调用将使用相同的GetDataInternal()
。
TaskContinuationOptions.ExecuteSynchronously用于确保在同一线程中运行continuation。
public virtual Data GetData(Credential credential)
{
Task<Data> inProgressDataTask = Task.Factory.StartNew(() => GetDataInternal(credential));
try
{
return inProgressDataTask.Result;
}
catch (AggregateException ex)
{
_logger.ErrorException(ex, "An error occurs during getting full data");
throw ex.InnerException;
}
}
异步/等待场景:
将Data GetInternalData()
更改为async Task<Data> GetInternalData()
将方法GetData()
更改为
public virtual async Task<Data> GetData()
{
try
{
return await GetInternalData();
}
catch (AggregateException ex)
{
_logger.ErrorException(ex, "An error occurs during getting full data");
throw ex.InnerException;
}
}
以下是我对需求的解释:
- 对数据的请求可能同时带有相同/不同的凭据
- 对于每个唯一的凭据集,最多可以有一个正在进行的
GetDataInternal
调用,当它准备好时,该调用的结果将返回给所有排队的服务员 - 在此之后,上一次调用的结果将无效,并且将允许具有相同凭据集的新
GetDataInternal
调用 - 允许使用不同凭据并行调用
GetDataInternal
很简单。
private readonly Dictionary<Credential, Lazy<Data>> Cache
= new Dictionary<Credential, Lazy<Data>>();
public Data GetData(Credential credential)
{
if (credential == null)
{
// Pass-through, no caching.
return GetDataInternal(null);
}
Lazy<Data> lazy;
lock (Cache)
{
if (!Cache.TryGetValue(credential, out lazy))
{
// ExecutionAndPublication is the default LazyThreadSafetyMode, but I
// wanted to spell it out to drive the point home: this Lazy instance
// will only allow a single call to GetDataInternal, even if multiple
// threads query its Value property simultaneously.
lazy = new Lazy<Data>(
() => GetDataInternal(credential),
LazyThreadSafetyMode.ExecutionAndPublication
);
Cache.Add(credential, lazy);
}
}
// We have released the lock on Cache to allow other threads
// (with potentially *different* credentials) to access the
// cache and proceed with their own work in parallel with us.
Data data;
try
{
// Wait for the GetDataInternal call to complete.
data = lazy.Value;
}
finally
{
// At this point the call to GetDataInternal has completed and Data is ready.
// We will invalidate the cache entry if another thread hasn't done so already.
lock (Cache)
{
// The lock is required to ensure the atomicity of our "fetch + compare + remove" operation.
// *ANY* thread is allowed to invalidate the cached value, not just the thread that created it.
// This ensures that the cache entry is cleaned up sooner rather than later.
// The equality check on the Lazy<Data> instance ensures that the cache entry
// is not cleaned up too soon, and prevents the following race:
// (assume all operations use identical credentials)
// - Thread A creates and stores a Lazy<Data> instance in the cache.
// - Thread B fetches the Lazy<Data> instance created by Thread A.
// - Threads A and B access Lazy<Data>.Value simultaneously.
// - Thread B wins the race and enters the second (this) protected
// region and invalidates the cache entry created by Thread A.
// - Thread C creates and stores a *NEW* Lazy<Data> instance in the cache.
// - Thread C accesses its Lazy<Data>.Value.
// - Thread A finally gets to invalidate the cache, and OOPS, Thread C's cache
// entry is invalidated before the call to Lazy<Data>.Value has completed.
// With the equality check in place, Thread A will *not*
// invalidate the cache entry created by another thread.
Lazy<Data> currentLazy;
if (Cache.TryGetValue(credential, out currentLazy) && lazy == currentLazy)
{
// Need to invalidate.
Cache.Remove(credential);
}
}
}
return data;
}
需要注意的事项:
- 凭证必须覆盖
Equals
和GetHashCode
,以便进行相等性比较 - 如果您选择将此方法设为虚拟方法,请小心覆盖此方法
- 删除了错误处理,将重点放在"肉"上
如果您使用的是.NET 4.5或更高版本,上面的转换为Task<Data>
返回方法,并进行了一些小的调整(如果您使用.NET 4.0,则会进行一些大的恶意更改)。请告诉我这是否是一个要求,如果是,您要针对的是.NET的哪个版本。
编辑:我添加了一个try/finally
-应该从一开始就在那里。
此外,这里有一个ConcurrentDictionary
版本,我在其中绕过了它的局限性——你会注意到它只是稍微干净了一点:
private readonly ConcurrentDictionary<Credential, Lazy<Data>> Cache
= new ConcurrentDictionary<Credential, Lazy<Data>>();
public Data GetData(Credential credential)
{
if (credential == null)
{
// Pass-through, no caching.
return GetDataInternal(null);
}
// This instance will be thrown away if a cached
// value with our "credential" key already exists.
Lazy<Data> newLazy = new Lazy<Data>(
() => GetDataInternal(credential),
LazyThreadSafetyMode.ExecutionAndPublication
);
Lazy<Data> lazy = Cache.GetOrAdd(credential, newLazy);
bool added = ReferenceEquals(newLazy, lazy); // If true, we won the race.
Data data;
try
{
// Wait for the GetDataInternal call to complete.
data = lazy.Value;
}
finally
{
// Only the thread which created the cache value
// is allowed to remove it, to prevent races.
if (added) {
Cache.TryRemove(credential, out lazy);
}
}
return data;
}
当您使用Multithreading
或Multiprocessing
时,您必须执行一些操作来管理它们。CCD_ 19是一个庞大而复杂的课题。您的代码可能会出现不同的情况;假设在运行这行代码之前:
return inProgressDataTask.Result;
每个线程执行这行代码:
_gettingDataTask = null;
这是可能的,因为Task
s是在async
中运行的。注意,inProgressDataTask
是指_gettingDataTask
所指的地方:
inProgressDataTask = _gettingDataTask;
_gettingDataTask
对象在所有任务(线程)之间共享,这可能会导致程序出错。如果你想使用它,在这种情况下,你可以使用类似Singleton模式的东西。如果使用共享对象,请不要在代码的锁部分中使用Task.Factory.StartNew
或_gettingDataTask.ContinueWith
(以异步方式运行)。
或者假设在运行这行代码之前:
_gettingDataTask = null;
其他一些线程执行这行代码:
if (_gettingDataTask == null)
这使得与第一个线程相关的所有答案都被执行。
为了澄清问题,筛选新项目,并在其中添加以下代码:
class Test
{
private readonly object _gettingDataTaskLock = new object();
private Task<int> _gettingDataTask;
public int GetData(int credential)
{
Task<int> inProgressDataTask = null;
Console.WriteLine(credential + "-TEST0");
lock (_gettingDataTaskLock)
{
Console.WriteLine(credential + "-TEST1");
if (_gettingDataTask == null)
{
Console.WriteLine(credential + "-TEST2");
_gettingDataTask = Task.Factory.StartNew(()
=> GetDataInternal(credential));
_gettingDataTask.ContinueWith((task) =>
{
lock (_gettingDataTaskLock)
{
Console.WriteLine(credential + "-TEST3");
_gettingDataTask = null;
Console.WriteLine(credential + "-TEST4");
}
},
TaskContinuationOptions.ExecuteSynchronously);
Console.WriteLine(credential + "-TEST5");
}
//_gettingDataTask.Wait();
Console.WriteLine(credential + "-TEST6");
inProgressDataTask = _gettingDataTask;
Console.WriteLine(credential + "-TEST7");
}
Console.WriteLine(credential + "-TEST8");
try
{
Console.WriteLine(credential + "-TEST9");
return inProgressDataTask.Result;
}
catch (AggregateException ex)
{
throw ex.InnerException;
}
}
private int GetDataInternal(int credential)
{
return credential;
}
}
在主要方法中写入:
private static void Main(string[] args)
{
var test = new Test();
var results = new List<int>();
for (int i = 0; i < 5; i++)
results.Add(test.GetData(i));
Task.WaitAll();
Console.WriteLine(string.Join(",",results));
Console.ReadLine();
}
现在看看结果。
最后,根据给出的描述,代码的正确实现之一如下:
public virtual Data GetData(Credential credential)
{
Task<Data> inProgressDataTask = null;
lock (_gettingDataTaskLock)
{
inProgressDataTask = Task.Factory.StartNew(() => GetDataInternal(credential));
inProgressDataTask.ContinueWith((task) =>
{
lock (_gettingDataTaskLock)
{
//inProgressDataTask = null;
}
},
TaskContinuationOptions.ExecuteSynchronously);
}
try
{
return inProgressDataTask.Result;
}
catch (AggregateException ex)
{
throw ex.InnerException;
}
}