在c#的生产者/消费者ConcurrentDictionary中获取重复对象
本文关键字:获取 对象 ConcurrentDictionary 消费者 生产者 | 更新日期: 2023-09-27 18:15:20
我被困在一个问题上,我想知道如果我只是编码错误的东西。应用程序每隔几秒钟轮询一次,并从表中获取每条记录,该表的唯一目的是表示要对哪些记录进行操作。
请注意,为了空间和可读性,我省略了错误处理代码
//Producing Thread, this is triggered every 5 seconds... UGH, I hate timers
foreach (var Record in GetRecordsFromDataBase()) // returns a dictionary
{
if (!ConcurrentDictionary.Contains(Record.Key))
ConcurrentDictionary.TryAdd(Record.Key, Record.Value);
}
这段代码工作得很好,令人恼火的是,它可能会多次选择相同的记录,直到所述记录被处理。通过处理,每个选定的记录都被写入其自己新创建的唯一命名的文件中。然后为该记录的键调用一个存储过程以将其从数据库中删除,此时该特定键将从ConcurrentDictionary中删除。
// Consuming Thread, located within another loop to allow
// the below code to continue to cycle until instructed
// to terminate
while (!ConcurrentDictionary.IsEmpty)
{
var Record = ConcurrentDictionary.Take(1).First();
WriteToNewFile(Record.Value);
RemoveFromDatabase(Record.Key);
ConcurrentDictionary.TryRemove(Record.Key);
}
对于吞吐量测试,我向表中添加了20k多条记录,然后将应用程序转为松散状态。当我注意到22k以上的文件数量继续增长到100k以上时,我感到非常惊讶。
我做错了什么?我是否完全误解了并发字典的用途?我是不是忘了加分号了?
首先,消除对Contains的调用。TryAdd已经检查重复项,如果项已经存在则返回false。
foreach (var Record in GetRecordsFromDataBase()) // returns a dictionary
{
ConcurrentDictionary.TryAdd(Record.Key, Record.Value);
}
我看到的下一个问题是,我不认为ConcurrentDictionary.Take(1). first()是从字典中获取项目的好方法,因为它不是原子的。我认为你想使用BlockingCollection()代替。它是专门为实现生产者-消费者模式而设计的。
最后,我认为你的问题与字典无关,而是与数据库有关。字典本身是线程安全的,但是字典不是数据库的原子性。假设记录A在数据库中。GetRecordsFromDataBase()提取它并将其添加到字典中。然后它开始处理记录A(我假设这是在另一个线程中)。然后,第一个循环再次调用GetRecordsFromDataBase()并再次获得记录A。同时,记录A被处理并从数据库中删除。但是太迟了!GetRecordsFromDataBase()已经抓取了它!因此,在它被删除后,初始循环将它再次添加到字典中。
我认为你可能需要把要处理的记录完全移到另一个表中。这样,他们就不会再被抓了。在c#级别(而不是数据库级别)执行此操作将是一个问题。或者,您不希望在处理记录时将记录添加到队列中。
我做错了什么?
foreach (add)循环试图将任何非数据库中的记录添加到字典中。
while (remove)循环从数据库中删除项,然后从字典中删除项,并将它们写入文件。
这个逻辑看起来是正确的。但是有一场比赛:
GetRecordsFromDataBase(); // returns records 1 through 10.
切换上下文以移除循环。
WriteToNewFile(Record.Value); // write record 5
RemoveFromDatabase(Record.Key); // remove record 5 from db
ConcurrentDictionary.TryRemove(Record.Key); // remove record 5 from dictionary
切换回添加循环
ConcurrentDictionary.TryAdd(Record.Key, Record.Value); // add record 5 even though it is not in the DB becuase it was part of the records returned by ConcurrentDictionary.TryAdd(Record.Key, Record.Value);;
删除项后,foreach循环再次添加它。这就是为什么你的文件数在增加。
foreach (var Record in GetRecordsFromDataBase()) // returns a dictionary
{
if (!ConcurrentDictionary.Contains(Record.Key)) // this if is not required. try add will do.
ConcurrentDictionary.TryAdd(Record.Key, Record.Value);
}
试试这样做:增加循环:
foreach (var Record in GetRecordsFromDataBase()) // returns a dictionary
{
if (ConcurrentDictionary.TryAdd(Record.Key, false)) // only adds the record if it has not been processed.
{
ConcurrentQueue.Enque(record) // enqueue the record
}
}
删除循环
var record;// you will need to specify the type
if (ConcurrentQueue.TryDequeue(record))
{
if (ConcurrentDictionary.TryUpdate(record.key,true,false)) // update the value from true to false
{
WriteToNewFile(Record.Value); // write record 5
RemoveFromDatabase(Record.Key); // remove record 5 from db
}
}
这将为处理的每个记录在字典中留下项。您最终可以将它们从字典中删除,但涉及db的多线程可能会很棘手。