C#和ConcurrentDictionary中的多线程:以下用法正确吗

本文关键字:用法 ConcurrentDictionary 多线程 | 更新日期: 2023-09-27 18:28:51

我手头有这样一个场景(使用C#):我需要在对象列表上使用并行的"foreach":这个列表中的每个对象都像数据源一样工作,它正在生成一系列二进制矢量模式(如"0010100110")。生成每个矢量模式时,我需要更新共享ConcurrentDictionary上当前矢量模式的出现次数。这个ConcurrentDictionary就像所有数据源中特定二进制模式的直方图。在伪代码中,它应该像这样工作:

ConcurrentDictionary<BinaryPattern,int> concDict = new ConcurrentDictionary<BinaryPattern,int>();
Parallel.Foreach(var dataSource in listOfDataSources)
{
     for(int i=0;i<dataSource.OperationCount;i++)
     {
          BinaryPattern pattern = dataSource.GeneratePattern(i);
          //Add the pattern to concDict if it does not exist, 
          //or increment the current value of it, in a thread-safe fashion among all
          //dataSource objects in parallel steps.
     }
}

我在文档中读过ConcurrentDictionary类的TryAdd()和TryUpdate()方法,但我不确定我是否清楚地理解了它们。TryAdd()获取对当前线程的Dictionary的访问权限,并查找是否存在特定的键,在这种情况下是二进制模式,然后如果不存在,则创建其条目,将其值设置为1,因为这是该模式的第一次出现。TryUpdate()访问当前线程的字典,查看具有指定键的条目的当前值是否等于"已知"值,如果是,则更新它。顺便说一句,TryGetValue()检查键是否在字典中存在,如果存在,则返回当前值。

现在我想到了以下用法,想知道它是否是ConcurrentDictionary的线程安全填充的正确实现:

ConcurrentDictionary<BinaryPattern,int> concDict = new ConcurrentDictionary<BinaryPattern,int>();
Parallel.Foreach(var dataSource in listOfDataSources)
{
     for(int i=0;i<dataSource.OperationCount;i++)
     {
          BinaryPattern pattern = dataSource.GeneratePattern(i);
          while(true)
          {
             //Look whether the pattern is in dictionary currently,
             //if it is, get its current value.
             int currOccurenceOfPattern;
             bool isPatternInDict = concDict.TryGetValue(pattern,out currOccurenceOfPattern);
             //Not in dict, try to add.
             if(!isPatternInDict)
             {
                  //If the pattern is not added in the meanwhile, add it to the dict.
                  //If added, then exit from the while loop.
                  //If not added, then skip this step and try updating again.
                  if(TryAdd(pattern,1))
                        break;
             }
             //The pattern is already in the dictionary. 
             //Try to increment its current occurrence value instead.
             else
             {
                  //If the pattern's occurence value is not incremented by another thread
                  //in the meanwhile, update it. If this succeeds, then exit from the loop.
                  //If TryUpdate fails, then we see that the value has been updated
                  //by another thread in the meanwhile, we need to try our chances in the next
                  //step of the while loop.                   
                  int newValue = currOccurenceOfPattern + 1;
                  if(TryUpdate(pattern,newValue,currOccurenceOfPattern))
                       break;
             }
          }
     }
}

我试图在上面的代码片段中,在评论中坚定地总结我的逻辑。根据我从文档中收集到的信息,线程安全的更新方案可以用这种方式编码,给定ConcurrentDictionary的原子"TryXXX()"方法。这是解决问题的正确方法吗?如果没有,该如何改进或纠正?

C#和ConcurrentDictionary中的多线程:以下用法正确吗

您可以使用AddOrUpdate方法,该方法将添加或更新逻辑封装为单个线程安全操作:

ConcurrentDictionary<BinaryPattern,int> concDict = new ConcurrentDictionary<BinaryPattern,int>();
Parallel.Foreach(listOfDataSources, dataSource =>
{
    for(int i=0;i<dataSource.OperationCount;i++)
    {
        BinaryPattern pattern = dataSource.GeneratePattern(i);
        concDict.AddOrUpdate(
            pattern,
            _ => 1, // if pattern doesn't exist - add with value "1"
            (_, previous) => previous + 1 // if pattern exists - increment existing value
        );
    }
});

请注意,AddOrUpdate操作不是原子操作,不确定这是否是您的要求,但如果您需要知道向字典中添加值时的确切迭代,您可以保留代码(或将其提取为某种扩展方法)

您可能还想阅读这篇文章

我不知道这里的BinaryPattern是什么,但我可能会用不同的方式来解决这个问题。如果性能至关重要,我可能更倾向于简单地将实例计数器放在BinaryPattern中,而不是像这样四处复制值类型、将内容插入字典等。然后,每当发现模式时,使用InterlockedIncrement()递增计数器。

除非有理由将计数与模式分开,否则ConccurentDictionary可能是一个不错的选择。

首先,这个问题有点令人困惑,因为不清楚Parallel.Foreach的意思。我很天真地希望它是System.Threading.Tasks.Parallel.ForEach(),但这对于您在这里显示的语法是不可用的。

也就是说,假设你的意思是Parallel.ForEach(listOfDataSources, dataSource => { ... } )

就我个人而言,除非您有一些特定的需要来显示中间结果,否则我不会在这里使用ConcurrentDictionary。相反,我会让每个并发操作生成自己的计数字典,然后在最后合并结果。类似这样的东西:

var results = listOfDataSources.Select(dataSource =>
    Tuple.Create(dataSource, new Dictionary<BinaryPattern, int>())).ToList();
Parallel.ForEach(results, result =>
{
    for(int i = 0; i < result.Item1.OperationCount; i++)
    {
        BinaryPattern pattern = result.Item1.GeneratePattern(i);
        int count;
        result.Item2.TryGetValue(pattern, out count);
        result.Item2[pattern] = count + 1;
    }
});
var finalResult = new Dictionary<BinaryPattern, int>();
foreach (result in results)
{
    foreach (var kvp in result.Item2)
    {
        int count;
        finalResult.TryGetValue(kvp.Key, out count);
        finalResult[kvp.Key] = count + kvp.Value;
    }
}

这种方法将避免工作线程之间的争用(至少在涉及计数的情况下),从而有可能提高效率。最后的聚合操作应该非常快,并且可以在单个原始线程中轻松处理。