使用async和DataRow的非线程安全代码
本文关键字:线程 安全 代码 async DataRow 使用 | 更新日期: 2023-09-27 18:00:16
执行以下代码时。
using System;
using System.Collections.Generic;
using System.Data;
using System.Threading.Tasks;
namespace AsyncDataRow
{
internal class Program
{
private static void Main(string[] args)
{
var program = new Program();
try
{
Console.WriteLine("Execute");
for (int i = 0; i < 100; i++)
{
program.Execute();
}
}
catch (AggregateException aggregateException)
{
foreach (var exception in aggregateException.InnerExceptions)
{
Console.WriteLine("AggregateException.InnerExceptions: " + exception.Message);
}
}
catch (Exception ex)
{
Console.WriteLine("Exception.Message: " + ex.Message);
}
Console.ReadKey();
}
private void Execute()
{
DataTable dataTable = new DataTable();
dataTable.Columns.Add("ID", typeof(int));
dataTable.Columns.Add("Name", typeof(string));
for (int i = 0; i < 1000; i++)
{
dataTable.Rows.Add(i, i.ToString());
}
var taskList = new List<Task>();
foreach (DataRow dataRow in dataTable.Rows)
{
taskList.Add(ChangeStringValue(dataRow));
}
Task.WaitAll(taskList.ToArray());
}
private async Task ChangeStringValue(DataRow dataRow)
{
var id = (int)dataRow["ID"];
var newValue = await Task.Run(() => CreateNewValue(id));
dataRow["Name"] = newValue;
}
private string CreateNewValue(int id)
{
Console.WriteLine(string.Format("{0} - CreateNewValue", id));
return id.ToString() + "New StringValue";
}
}
}
在的这一行代码上偶尔引发System.ArgumentOutOfRangeException
dataRow["Name"] = newValue
异常消息:
Exception thrown: 'System.ArgumentOutOfRangeException' in mscorlib.dll
Additional information: Index was out of range. Must be non-negative and less than the size of the collection.
我试图更具体地识别问题,但我最接近的是简化代码以重现错误。
我认为这与传递到异步方法中的DataRow引用类型有关,但我想知道为什么这段代码不是线程安全的。
.Net框架中线程安全类的数量非常少,每个类都会显式调用其安全性(大多数在System.Collecions.Concurrent
命名空间中)。对于所有其他类型,您需要提供自己的线程安全机制。与DB相关的类肯定属于非线程安全的写类。类通常显式地允许多线程安全读取。
在您的情况下,DataRow
可以从多个线程读取,并且您只需要同步写入:
线程安全
这种类型对于多线程读取操作是安全的。您必须同步任何写入操作。
构建大量项目的标准方法-在范围内拆分项目并独立生成每个项目,然后在所有项目完成后合并。
部分样本基于原始代码,MyRowResult
是用于保存需要更新的行的所有数据的类。
var taskList = new List<Task<MyDataRow>();
foreach (DataRow dataRow in dataTable.Rows)
{
taskList.Add(ChangeStringValue(dataRow));
}
// await for proper async code, WaitAll is fin for console app.
await Task.WhenAll(taskList.ToArray());
// back to single thread - safe to update rows
foreach (var task in taskList)
{
task.Result.DataRow["Name"] = task.Result.Name;
}
}
private async Task<MyRowResult> ChangeStringValue(DataRow dataRow)
{
// on multiple threads - perform read only operations of rows
// which is safe as explicitly called out in the documentation
// "This type is safe for multithreaded read operations."
var id = (int)dataRow["ID"];
var newValue = await Task.Run(() => CreateNewValue(id));
return new MyRowResult { Name = newValue, DataRow = dataRow };
}
注:
- 有些类(如UI控件)需要更严格的线程安全性-只能在原始线程上调用
Parallel
类中的帮助程序可以简化并行运行的代码(如Parallel.ForEach
)