C#和ConcurrentDictionary中的多线程:以下用法正确吗
本文关键字:用法 ConcurrentDictionary 多线程 | 更新日期: 2023-09-27 18:28:51
我手头有这样一个场景(使用C#):我需要在对象列表上使用并行的"foreach":这个列表中的每个对象都像数据源一样工作,它正在生成一系列二进制矢量模式(如"0010100110")。生成每个矢量模式时,我需要更新共享ConcurrentDictionary上当前矢量模式的出现次数。这个ConcurrentDictionary就像所有数据源中特定二进制模式的直方图。在伪代码中,它应该像这样工作:
ConcurrentDictionary<BinaryPattern,int> concDict = new ConcurrentDictionary<BinaryPattern,int>();
Parallel.Foreach(var dataSource in listOfDataSources)
{
for(int i=0;i<dataSource.OperationCount;i++)
{
BinaryPattern pattern = dataSource.GeneratePattern(i);
//Add the pattern to concDict if it does not exist,
//or increment the current value of it, in a thread-safe fashion among all
//dataSource objects in parallel steps.
}
}
我在文档中读过ConcurrentDictionary类的TryAdd()和TryUpdate()方法,但我不确定我是否清楚地理解了它们。TryAdd()获取对当前线程的Dictionary的访问权限,并查找是否存在特定的键,在这种情况下是二进制模式,然后如果不存在,则创建其条目,将其值设置为1,因为这是该模式的第一次出现。TryUpdate()访问当前线程的字典,查看具有指定键的条目的当前值是否等于"已知"值,如果是,则更新它。顺便说一句,TryGetValue()检查键是否在字典中存在,如果存在,则返回当前值。
现在我想到了以下用法,想知道它是否是ConcurrentDictionary的线程安全填充的正确实现:
ConcurrentDictionary<BinaryPattern,int> concDict = new ConcurrentDictionary<BinaryPattern,int>();
Parallel.Foreach(var dataSource in listOfDataSources)
{
for(int i=0;i<dataSource.OperationCount;i++)
{
BinaryPattern pattern = dataSource.GeneratePattern(i);
while(true)
{
//Look whether the pattern is in dictionary currently,
//if it is, get its current value.
int currOccurenceOfPattern;
bool isPatternInDict = concDict.TryGetValue(pattern,out currOccurenceOfPattern);
//Not in dict, try to add.
if(!isPatternInDict)
{
//If the pattern is not added in the meanwhile, add it to the dict.
//If added, then exit from the while loop.
//If not added, then skip this step and try updating again.
if(TryAdd(pattern,1))
break;
}
//The pattern is already in the dictionary.
//Try to increment its current occurrence value instead.
else
{
//If the pattern's occurence value is not incremented by another thread
//in the meanwhile, update it. If this succeeds, then exit from the loop.
//If TryUpdate fails, then we see that the value has been updated
//by another thread in the meanwhile, we need to try our chances in the next
//step of the while loop.
int newValue = currOccurenceOfPattern + 1;
if(TryUpdate(pattern,newValue,currOccurenceOfPattern))
break;
}
}
}
}
我试图在上面的代码片段中,在评论中坚定地总结我的逻辑。根据我从文档中收集到的信息,线程安全的更新方案可以用这种方式编码,给定ConcurrentDictionary的原子"TryXXX()"方法。这是解决问题的正确方法吗?如果没有,该如何改进或纠正?
您可以使用AddOrUpdate
方法,该方法将添加或更新逻辑封装为单个线程安全操作:
ConcurrentDictionary<BinaryPattern,int> concDict = new ConcurrentDictionary<BinaryPattern,int>();
Parallel.Foreach(listOfDataSources, dataSource =>
{
for(int i=0;i<dataSource.OperationCount;i++)
{
BinaryPattern pattern = dataSource.GeneratePattern(i);
concDict.AddOrUpdate(
pattern,
_ => 1, // if pattern doesn't exist - add with value "1"
(_, previous) => previous + 1 // if pattern exists - increment existing value
);
}
});
请注意,AddOrUpdate
操作不是原子操作,不确定这是否是您的要求,但如果您需要知道向字典中添加值时的确切迭代,您可以保留代码(或将其提取为某种扩展方法)
您可能还想阅读这篇文章
我不知道这里的BinaryPattern
是什么,但我可能会用不同的方式来解决这个问题。如果性能至关重要,我可能更倾向于简单地将实例计数器放在BinaryPattern
中,而不是像这样四处复制值类型、将内容插入字典等。然后,每当发现模式时,使用InterlockedIncrement()
递增计数器。
除非有理由将计数与模式分开,否则ConccurentDictionary
可能是一个不错的选择。
首先,这个问题有点令人困惑,因为不清楚Parallel.Foreach
的意思。我很天真地希望它是System.Threading.Tasks.Parallel.ForEach()
,但这对于您在这里显示的语法是不可用的。
也就是说,假设你的意思是Parallel.ForEach(listOfDataSources, dataSource => { ... } )
…
就我个人而言,除非您有一些特定的需要来显示中间结果,否则我不会在这里使用ConcurrentDictionary
。相反,我会让每个并发操作生成自己的计数字典,然后在最后合并结果。类似这样的东西:
var results = listOfDataSources.Select(dataSource =>
Tuple.Create(dataSource, new Dictionary<BinaryPattern, int>())).ToList();
Parallel.ForEach(results, result =>
{
for(int i = 0; i < result.Item1.OperationCount; i++)
{
BinaryPattern pattern = result.Item1.GeneratePattern(i);
int count;
result.Item2.TryGetValue(pattern, out count);
result.Item2[pattern] = count + 1;
}
});
var finalResult = new Dictionary<BinaryPattern, int>();
foreach (result in results)
{
foreach (var kvp in result.Item2)
{
int count;
finalResult.TryGetValue(kvp.Key, out count);
finalResult[kvp.Key] = count + kvp.Value;
}
}
这种方法将避免工作线程之间的争用(至少在涉及计数的情况下),从而有可能提高效率。最后的聚合操作应该非常快,并且可以在单个原始线程中轻松处理。