使用多线程处理一组数据库记录的选项

本文关键字:数据库 一组 记录 选项 多线程处理 | 更新日期: 2023-09-27 18:36:27

我有一个数据库表,其中包含一些要处理的记录。该表具有一个表示以下状态值的标志列。1 - 准备处理,2- 成功处理,3- 处理失败。

.net 代码(重复进程 - 控制台/服务)将获取准备处理的记录列表,并遍历它们并尝试处理它们(不是很长),根据成功或失败更新状态。

为了获得更好的性能,我想为此过程启用多线程。我想生成 6 个线程,每个线程抓取一个子集。

显然,我想避免让不同的线程处理相同的记录。我不希望数据库中有一个"正在处理"标志来处理线程崩溃导致记录挂起的情况。

我看到这样做的唯一方法是获取可用记录的完整列表并为每个线程分配一个组(可能是 ids)。如果单个线程失败,则下次进程运行时将选取其未处理的记录。

在将组

分配给线程之前,是否有其他替代方法可以划分组?

使用多线程处理一组数据库记录的选项

实现此要求的最直接方法是使用任务并行库的

Parallel.ForEach (或 Parallel.For)。

允许它管理单个工作线程。

根据经验,我会推荐以下内容:

  • 具有"正在处理"的其他状态
  • 数据库中有一列指示何时选取记录进行处理,以及定期运行的清理任务/进程,以查找"正在处理"太久的记录(将状态重置为"准备处理")。
  • 即使您不希望它,"正在处理"对于崩溃恢复方案也是必不可少的(除非您可以容忍同一记录被处理两次)。

或者

考虑使用事务队列(想到 MSMQ 或 Rabbit MQ)。 它们针对此问题进行了优化。

这将是我明确的选择,因为我已经大规模地完成了这两项工作。

优化

如果从数据库中检索数据需要相当长的时间,则可以考虑使用生产者/使用者模式,该模式使用BlockingCollection实现非常简单。 该模式允许一个线程(生产者)使用要处理的数据库记录填充队列,并允许多个其他线程(使用者)处理该队列中的项目。

新的选择

鉴于在记录被视为完成之前有几个处理步骤会触及记录,请查看 Windows 工作流基础作为可能的替代方法。

我记得做了你描述的事情......线程会不时检查数据库中是否有需要处理的新内容。它将仅加载新的 id,因此如果在时间 x 最后一次读取 id 是 1000,则在 x+1 处将从 id 1001 读取。

它读取的所有内容都将进入线程安全队列。将项目添加到此队列时,您会通知工作线程(可能使用自动重置事件,或在此处生成线程)。每个线程将一次从此线程安全队列读取一个项目,直到队列被清空。

您不应该在工作之前为 each 线程分配(除非您知道 for each 文件的进程花费相同的时间)。 如果一个线程完成了工作,那么它应该从剩下的其他线程中获取负载。 使用此线程安全队列,可以确保这一点。

这是一种不依赖/使用其他数据库列(但请参阅 #4)或强制使用进程内队列的方法。这种方法的前提是根据某个一致的值跨工作线程"分片"记录,就像分布式缓存一样。

以下是我的假设:

  1. 再处理不会造成不必要的副作用;最多一些工作"被浪费"。
  2. 线程数在启动时是固定的。这不是必需的,但它确实简化了实现,并允许我跳过下面简单描述中的暂时细节。
  3. 只有一个"工作
  4. 进程"(但参见#1)控制"工作线程"。这简化了处理如何在工作人员之间拆分记录的过程。
  5. 有一些[不可变的]"ID"列是"分布良好"的。这是必需的,以便搜索辅助角色获得大致相同的工作量。
  6. 只要"最终完成"
  7. ,工作就可以"无序"完成。此外,工作人员可能并不总是"以 100%"运行,因为每个工作人员都在不同的队列上有效地工作。

[0, thread_count) 中为每个线程分配一个唯一的bucket值。如果线程死亡/重新启动,它将占用与它腾出的存储桶相同的存储桶。

然后,每次线程需要一条新记录时,它都会从数据库中获取:

SELECT *
FROM record
WHERE state = 'unprocessed'
AND (id % $thread_count) = $bucket
ORDER BY date

当然,对于批量读取"此线程任务"并将其存储在本地,可能还有其他假设。但是,本地队列将是每个线程的(因此在新线程启动时重新加载),因此它只会处理与给定bucket关联的记录。

线程处理完后,记录应使用适当的隔离级别和/或乐观并发将记录标记为已处理,然后继续处理下一条记录。