在c#的生产者/消费者ConcurrentDictionary中获取重复对象

本文关键字:获取 对象 ConcurrentDictionary 消费者 生产者 | 更新日期: 2023-09-27 18:15:20

我被困在一个问题上,我想知道如果我只是编码错误的东西。应用程序每隔几秒钟轮询一次,并从表中获取每条记录,该表的唯一目的是表示要对哪些记录进行操作。

请注意,为了空间和可读性,我省略了错误处理代码

    //Producing Thread, this is triggered every 5 seconds... UGH, I hate timers
    foreach (var Record in GetRecordsFromDataBase())  // returns a dictionary
    {
        if (!ConcurrentDictionary.Contains(Record.Key))
            ConcurrentDictionary.TryAdd(Record.Key, Record.Value);
    }

这段代码工作得很好,令人恼火的是,它可能会多次选择相同的记录,直到所述记录被处理。通过处理,每个选定的记录都被写入其自己新创建的唯一命名的文件中。然后为该记录的键调用一个存储过程以将其从数据库中删除,此时该特定键将从ConcurrentDictionary中删除。

    // Consuming Thread, located within another loop to allow
    // the below code to continue to cycle until instructed
    // to terminate
    while (!ConcurrentDictionary.IsEmpty)
    {
        var Record = ConcurrentDictionary.Take(1).First();
        WriteToNewFile(Record.Value);
        RemoveFromDatabase(Record.Key);
        ConcurrentDictionary.TryRemove(Record.Key);
    }

对于吞吐量测试,我向表中添加了20k多条记录,然后将应用程序转为松散状态。当我注意到22k以上的文件数量继续增长到100k以上时,我感到非常惊讶。

我做错了什么?我是否完全误解了并发字典的用途?我是不是忘了加分号了?

在c#的生产者/消费者ConcurrentDictionary中获取重复对象

首先,消除对Contains的调用。TryAdd已经检查重复项,如果项已经存在则返回false。

foreach (var Record in GetRecordsFromDataBase())  // returns a dictionary
{
        ConcurrentDictionary.TryAdd(Record.Key, Record.Value);
}

我看到的下一个问题是,我不认为ConcurrentDictionary.Take(1). first()是从字典中获取项目的好方法,因为它不是原子的。我认为你想使用BlockingCollection()代替。它是专门为实现生产者-消费者模式而设计的。

最后,我认为你的问题与字典无关,而是与数据库有关。字典本身是线程安全的,但是字典不是数据库的原子性。假设记录A在数据库中。GetRecordsFromDataBase()提取它并将其添加到字典中。然后它开始处理记录A(我假设这是在另一个线程中)。然后,第一个循环再次调用GetRecordsFromDataBase()并再次获得记录A。同时,记录A被处理并从数据库中删除。但是太迟了!GetRecordsFromDataBase()已经抓取了它!因此,在它被删除后,初始循环将它再次添加到字典中。

我认为你可能需要把要处理的记录完全移到另一个表中。这样,他们就不会再被抓了。在c#级别(而不是数据库级别)执行此操作将是一个问题。或者,您不希望在处理记录时将记录添加到队列中。

我做错了什么?

foreach (add)循环试图将任何非数据库中的记录添加到字典中。

while (remove)循环从数据库中删除项,然后从字典中删除项,并将它们写入文件。

这个逻辑看起来是正确的。但是有一场比赛:

GetRecordsFromDataBase(); // returns records 1 through 10.

切换上下文以移除循环。

    WriteToNewFile(Record.Value);    // write record 5
    RemoveFromDatabase(Record.Key);  // remove record 5 from db
    ConcurrentDictionary.TryRemove(Record.Key); // remove record 5 from dictionary

切换回添加循环

 ConcurrentDictionary.TryAdd(Record.Key, Record.Value); // add record 5 even though it is not in the DB becuase it was part of the records returned by ConcurrentDictionary.TryAdd(Record.Key, Record.Value);;

删除项后,foreach循环再次添加它。这就是为什么你的文件数在增加。

foreach (var Record in GetRecordsFromDataBase())  // returns a dictionary
    {
        if (!ConcurrentDictionary.Contains(Record.Key)) // this if is not required. try add will do.
            ConcurrentDictionary.TryAdd(Record.Key, Record.Value);
    }

试试这样做:增加循环:

   foreach (var Record in GetRecordsFromDataBase())  // returns a dictionary
            {
               if (ConcurrentDictionary.TryAdd(Record.Key, false)) // only adds the record if it has not been processed.
               {
                   ConcurrentQueue.Enque(record) // enqueue the record
               } 
            }

删除循环

var record;//   you will need to specify the type
    if (ConcurrentQueue.TryDequeue(record))
    {
         if (ConcurrentDictionary.TryUpdate(record.key,true,false)) // update the value from true to false
         {
            WriteToNewFile(Record.Value);    // write record 5
            RemoveFromDatabase(Record.Key);  // remove record 5 from db
         }
    }

这将为处理的每个记录在字典中留下项。您最终可以将它们从字典中删除,但涉及db的多线程可能会很棘手。