ConcurrentDictionary
本文关键字:ConcurrentDictionary | 更新日期: 2023-09-27 18:21:06
简介
我正在为SE API 2.0构建一个API包装器
目前,我正在实现缓存功能,直到现在这还不是一个问题。现在我要考虑并发性。这将是我的测试方法:
代码
public static void TestConcurrency()
{
Stopwatch sw = new Stopwatch();
sw.Start();
IList<Task> tasks = new List<Task>();
for (int i = 0; i < 1000; i++)
{
tasks.Add(Task.Factory.StartNew(p => client.GetAnswers(), null));
}
Task.WaitAll(tasks.ToArray());
sw.Stop();
Console.WriteLine("elapsed: {0}", sw.Elapsed.ToString());
Console.ReadKey();
}
说明
在内部,客户端有一个RequestHandler
类,它试图从缓存中获取一个值,如果没有成功,它将执行实际的请求。
代码
/// <summary>
/// Checks the cache and then performs the actual request, if required.
/// </summary>
/// <typeparam name="T">The strong type of the expected API result against which to deserialize JSON.</typeparam>
/// <param name="endpoint">The API endpoint to query.</param>
/// <returns>The API response object.</returns>
private IApiResponse<T> InternalProcessing<T>(string endpoint) where T : class
{
IApiResponse<T> result = FetchFromCache<T>(endpoint);
return result ?? PerformRequest<T>(endpoint);
}
说明
实际执行请求的代码与此问题无关。尝试访问缓存的代码执行以下操作:
代码
/// <summary>
/// Attempts to fetch the response object from the cache instead of directly from the API.
/// </summary>
/// <typeparam name="T">The strong type of the expected API result against which to deserialize JSON.</typeparam>
/// <param name="endpoint">The API endpoint to query.</param>
/// <returns>The API response object.</returns>
private IApiResponse<T> FetchFromCache<T>(string endpoint) where T : class
{
IApiResponseCacheItem<T> cacheItem = Store.Get<T>(endpoint);
if (cacheItem != null)
{
IApiResponse<T> result = cacheItem.Response;
result.Source = ResultSourceEnum.Cache;
return result;
}
return null;
}
说明
缓存存储的实际实现在ConcurrentDictionary
上工作,当调用Get<T>()
方法时,I:
- 检查字典中是否有
endpoint
的条目 - 如果是,我将验证它是否包含响应对象
- 如果它还没有响应对象,那么缓存项的状态将为
Processing
,线程将在一小段时间内处于睡眠状态,等待实际请求完成 - 一旦或如果响应被"提交"到缓存存储(一旦请求完成,就会发生这种情况),就会返回缓存项
- 如果缓存项太旧或请求处理超时,则会从存储中删除该项
- 如果缓存中没有
endpoint
的条目,则null
会作为endpoint
的响应被推入缓存,从而发出正在处理该端点上的请求的信号,并且无需在同一端点上发出更多请求。然后返回null
,用信号通知应该进行实际请求
代码
/// <summary>
/// Attempts to access the internal cache and retrieve a response cache item without querying the API.
/// <para>If the endpoint is not present in the cache yet, null is returned, but the endpoint is added to the cache.</para>
/// <para>If the endpoint is present, it means the request is being processed. In this case we will wait on the processing to end before returning a result.</para>
/// </summary>
/// <typeparam name="T">The strong type of the expected API result.</typeparam>
/// <param name="endpoint">The API endpoint</param>
/// <returns>Returns an API response cache item if successful, null otherwise.</returns>
public IApiResponseCacheItem<T> Get<T>(string endpoint) where T : class
{
IApiResponseCacheItem cacheItem;
if (Cache.TryGetValue(endpoint, out cacheItem))
{
while (cacheItem.IsFresh && cacheItem.State == CacheItemStateEnum.Processing)
{
Thread.Sleep(10);
}
if (cacheItem.IsFresh && cacheItem.State == CacheItemStateEnum.Cached)
{
return (IApiResponseCacheItem<T>)cacheItem;
}
IApiResponseCacheItem value;
Cache.TryRemove(endpoint, out value);
}
Push<T>(endpoint, null);
return null;
}
这个问题是不确定的,有时会有两个请求通过,而不是像设计的那样只通过一个。
我在想,在这个过程中的某个地方,一些线程不安全的东西正在被访问。但我无法确定可能是什么。可能是什么,或者我应该如何正确调试?
更新
问题是我在ConcurrentDictionary
上并不总是线程安全
此方法没有重新调整指示缓存是否成功更新的boolean
,因此,如果此方法失败,Get<T>()
将返回两次null
。
代码
/// <summary>
/// Attempts to push API responses into the cache store.
/// </summary>
/// <typeparam name="T">The strong type of the expected API result.</typeparam>
/// <param name="endpoint">The queried API endpoint.</param>
/// <param name="response">The API response.</param>
/// <returns>True if the operation was successful, false otherwise.</returns>
public bool Push<T>(string endpoint, IApiResponse<T> response) where T : class
{
if (endpoint.NullOrEmpty())
{
return false;
}
IApiResponseCacheItem item;
if (Cache.TryGetValue(endpoint, out item))
{
((IApiResponseCacheItem<T>)item).UpdateResponse(response);
return true;
}
else
{
item = new ApiResponseCacheItem<T>(response);
return Cache.TryAdd(endpoint, item);
}
}
说明
解决方案是实现返回值,并更改Get<T>()
添加以下内容:
代码
if (Push<T>(endpoint, null) || retries > 1) // max retries for sanity.
{
return null;
}
else
{
return Get<T>(endpoint, ++retries); // retry push.
}
IApiResponseCacheItem<T> cacheItem = Store.Get<T>(endpoint);
if (cacheItem != null)
{
// etc..
}
ConcurrentDirectional是线程安全的,但它不会自动使代码线程安全。上面的片段是问题的核心。两个线程可以同时调用Get()方法并获得null。它们都将继续并同时调用PerformRequest()。您需要合并InternalProcessing()和FetchFromCache(),并确保只有一个线程可以使用锁调用PerformRequest。这可能会产生较差的并发性,也许您可以丢弃一个重复的响应。无论如何,请求都可能由SE服务器序列化,所以这可能无关紧要。