使用Parallel.ForEach的Azure TableQuery线程安全性
本文关键字:TableQuery 线程 安全性 Azure Parallel ForEach 使用 | 更新日期: 2023-09-27 18:25:31
我已经连续查询了一些基本的Azure表:
var query = new TableQuery<DynamicTableEntity>()
.Where(TableQuery.GenerateFilterCondition("PartitionKey",
QueryComparisons.Equal, myPartitionKey));
foreach (DynamicTableEntity entity in myTable.ExecuteQuery(query)) {
// Process entity here.
}
为了加快速度,我将其并行化如下:
Parallel.ForEach(myTable.ExecuteQuery(query), (entity, loopState) => {
// Process entity here in a thread-safe manner.
// Edited to add: Details of the loop body below:
// This is the essence of the fixed loop body:
lock (myLock) {
DataRow myRow = myDataTable.NewRow();
// [Add entity data to myRow.]
myDataTable.Rows.Add(myRow);
}
// Old code (apparently not thread-safe, though NewRow() is supposed to create
// a DataRow based on the table's schema without changing the table state):
/*
DataRow myRow = myDataTable.NewRow();
lock (myLock) {
// [Add entity data to myRow.]
myDataTable.Rows.Add(myRow);
}
*/
});
这会产生显著的加速,但运行之间的结果往往略有不同(即,尽管返回的实体数量完全相同,但某些实体偶尔会有所不同)。
通过这些和一些网络搜索,我得出结论,上面的枚举器并不总是线程安全的。文档似乎表明,只有当表对象是公共静态的时,线程安全才能得到保证,但这对我来说并没有什么不同
有人能建议如何解决这个问题吗?有没有一个标准的模式来并行化Azure表查询?
您的评论是正确的:DataTable不适合涉及突变的并发操作,并且是重复条目的来源。锁定DataTable对象进行行修改操作将解决以下问题:
lock (myTable)
{
DataRow myRow = myTable.NewRow();
myRow.SetField<int>("c1", (int)value);
myTable.Rows.Add(myRow);
}
将NewRow()置于锁外会间歇性地导致表中出现重复的行条目,或者NewRow行上出现"System.Data.dll中发生了类型为System.ArgumentException的未处理异常"的异常。有关并发DataTable使用的其他详细信息和替代方案,请参阅DataTable 的线程安全
要重现错误情况,请使用此代码。有些运行将是干净的,有些运行将包含重复条目,还有一些运行将遇到异常。
class Program
{
static DataTable myTable = GetTable();
static ManualResetEvent waitHandle = new ManualResetEvent(false);
static void Main(string[] args)
{
const int threadCount = 10;
List<Thread> threads = new List<System.Threading.Thread>();
for (int i = 0; i < threadCount; ++i)
{
threads.Add(new Thread(new ParameterizedThreadStart(AddRowThread)));
threads[i].Start(i);
}
waitHandle.Set(); // Release all the threads at once
for (int i = 0; i < threadCount; ++i)
{
threads[i].Join();
}
// Print results once threads return
for (int i = 0; i < myTable.Rows.Count; ++i)
{
Console.WriteLine(myTable.Rows[i].Field<int>(0));
}
Console.WriteLine("---Processing Complete---");
Console.ReadKey();
}
static void AddRowThread(object value)
{
waitHandle.WaitOne();
DataRow myRow = myTable.NewRow(); // THIS RESULTS IN INTERMITTENT ERRORS
lock (myTable)
{
//DataRow myRow = myTable.NewRow(); // MOVE NewRow() CALL HERE TO RESOLVE ISSUE
myRow.SetField<int>("c1", (int)value);
myTable.Rows.Add(myRow);
}
}
static DataTable GetTable()
{
// Here we create a DataTable with four columns.
DataTable table = new DataTable();
table.Columns.Add("c1", typeof(int));
return table;
}
}