ConcurrentDictionary

本文关键字:ConcurrentDictionary | 更新日期: 2023-09-27 18:21:06

简介

我正在为SE API 2.0构建一个API包装器
目前,我正在实现缓存功能,直到现在这还不是一个问题。现在我要考虑并发性。这将是我的测试方法:

代码

public static void TestConcurrency()
{
    Stopwatch sw = new Stopwatch();
    sw.Start();
    IList<Task> tasks = new List<Task>();
    for (int i = 0; i < 1000; i++)
    {
        tasks.Add(Task.Factory.StartNew(p => client.GetAnswers(), null));
    }
    Task.WaitAll(tasks.ToArray());
    sw.Stop();
    Console.WriteLine("elapsed: {0}", sw.Elapsed.ToString());
    Console.ReadKey();
}

说明

在内部,客户端有一个RequestHandler类,它试图从缓存中获取一个值,如果没有成功,它将执行实际的请求。

代码

/// <summary>
/// Checks the cache and then performs the actual request, if required.
/// </summary>
/// <typeparam name="T">The strong type of the expected API result against which to deserialize JSON.</typeparam>
/// <param name="endpoint">The API endpoint to query.</param>
/// <returns>The API response object.</returns>
private IApiResponse<T> InternalProcessing<T>(string endpoint) where T : class
{
    IApiResponse<T> result = FetchFromCache<T>(endpoint);
    return result ?? PerformRequest<T>(endpoint);
}

说明

实际执行请求的代码与此问题无关。尝试访问缓存的代码执行以下操作:

代码

/// <summary>
/// Attempts to fetch the response object from the cache instead of directly from the API.
/// </summary>
/// <typeparam name="T">The strong type of the expected API result against which to deserialize JSON.</typeparam>
/// <param name="endpoint">The API endpoint to query.</param>
/// <returns>The API response object.</returns>
private IApiResponse<T> FetchFromCache<T>(string endpoint) where T : class
{
    IApiResponseCacheItem<T> cacheItem = Store.Get<T>(endpoint);
    if (cacheItem != null)
    {
        IApiResponse<T> result = cacheItem.Response;
        result.Source = ResultSourceEnum.Cache;
        return result;
    }
    return null;
}

说明

缓存存储的实际实现在ConcurrentDictionary上工作,当调用Get<T>()方法时,I:

  • 检查字典中是否有endpoint的条目
  • 如果是,我将验证它是否包含响应对象
  • 如果它还没有响应对象,那么缓存项的状态将为Processing,线程将在一小段时间内处于睡眠状态,等待实际请求完成
  • 一旦或如果响应被"提交"到缓存存储(一旦请求完成,就会发生这种情况),就会返回缓存项
  • 如果缓存项太旧或请求处理超时,则会从存储中删除该项
  • 如果缓存中没有endpoint的条目,则null会作为endpoint的响应被推入缓存,从而发出正在处理该端点上的请求的信号,并且无需在同一端点上发出更多请求。然后返回null,用信号通知应该进行实际请求

代码

/// <summary>
/// Attempts to access the internal cache and retrieve a response cache item without querying the API.
/// <para>If the endpoint is not present in the cache yet, null is returned, but the endpoint is added to the cache.</para>
/// <para>If the endpoint is present, it means the request is being processed. In this case we will wait on the processing to end before returning a result.</para>
/// </summary>
/// <typeparam name="T">The strong type of the expected API result.</typeparam>
/// <param name="endpoint">The API endpoint</param>
/// <returns>Returns an API response cache item if successful, null otherwise.</returns>
public IApiResponseCacheItem<T> Get<T>(string endpoint) where T : class
{
    IApiResponseCacheItem cacheItem;
    if (Cache.TryGetValue(endpoint, out cacheItem))
    {
        while (cacheItem.IsFresh && cacheItem.State == CacheItemStateEnum.Processing)
        {
            Thread.Sleep(10);
        }
        if (cacheItem.IsFresh && cacheItem.State == CacheItemStateEnum.Cached)
        {
            return (IApiResponseCacheItem<T>)cacheItem;
        }
        IApiResponseCacheItem value;
        Cache.TryRemove(endpoint, out value);
    }
    Push<T>(endpoint, null);
    return null;
}

这个问题是不确定的,有时会有两个请求通过,而不是像设计的那样只通过一个。

我在想,在这个过程中的某个地方,一些线程不安全的东西正在被访问。但我无法确定可能是什么。可能是什么,或者我应该如何正确调试?

更新

问题是我在ConcurrentDictionary 上并不总是线程安全

此方法没有重新调整指示缓存是否成功更新的boolean,因此,如果此方法失败,Get<T>()将返回两次null

代码

/// <summary>
/// Attempts to push API responses into the cache store.
/// </summary>
/// <typeparam name="T">The strong type of the expected API result.</typeparam>
/// <param name="endpoint">The queried API endpoint.</param>
/// <param name="response">The API response.</param>
/// <returns>True if the operation was successful, false otherwise.</returns>
public bool Push<T>(string endpoint, IApiResponse<T> response) where T : class
{
    if (endpoint.NullOrEmpty())
    {
        return false;
    }
    IApiResponseCacheItem item;
    if (Cache.TryGetValue(endpoint, out item))
    {
        ((IApiResponseCacheItem<T>)item).UpdateResponse(response);
        return true;
    }
    else
    {
        item = new ApiResponseCacheItem<T>(response);
        return Cache.TryAdd(endpoint, item);
    }
}

说明

解决方案是实现返回值,并更改Get<T>()添加以下内容:

代码

if (Push<T>(endpoint, null) || retries > 1) // max retries for sanity.
{
    return null;
}
else
{
    return Get<T>(endpoint, ++retries); // retry push.
}

ConcurrentDictionary

IApiResponseCacheItem<T> cacheItem = Store.Get<T>(endpoint);
if (cacheItem != null)
{
   // etc..
}

ConcurrentDirectional是线程安全的,但它不会自动使代码线程安全。上面的片段是问题的核心。两个线程可以同时调用Get()方法并获得null。它们都将继续并同时调用PerformRequest()。您需要合并InternalProcessing()和FetchFromCache(),并确保只有一个线程可以使用锁调用PerformRequest。这可能会产生较差的并发性,也许您可以丢弃一个重复的响应。无论如何,请求都可能由SE服务器序列化,所以这可能无关紧要。