使用async和DataRow的非线程安全代码

本文关键字:线程 安全 代码 async DataRow 使用 | 更新日期: 2023-09-27 18:00:16

执行以下代码时。

using System;
using System.Collections.Generic;
using System.Data;
using System.Threading.Tasks;
namespace AsyncDataRow
{
    internal class Program
    {
        private static void Main(string[] args)
        {
            var program = new Program();
            try
            {
                Console.WriteLine("Execute");
                for (int i = 0; i < 100; i++)
                {
                    program.Execute();
                }
            }
            catch (AggregateException aggregateException)
            {
                foreach (var exception in aggregateException.InnerExceptions)
                {
                    Console.WriteLine("AggregateException.InnerExceptions: " + exception.Message);
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine("Exception.Message: " + ex.Message);
            }
            Console.ReadKey();
        }
        private void Execute()
        {
            DataTable dataTable = new DataTable();
            dataTable.Columns.Add("ID", typeof(int));
            dataTable.Columns.Add("Name", typeof(string));
            for (int i = 0; i < 1000; i++)
            {
                dataTable.Rows.Add(i, i.ToString());
            }
            var taskList = new List<Task>();
            foreach (DataRow dataRow in dataTable.Rows)
            {
                taskList.Add(ChangeStringValue(dataRow));
            }
            Task.WaitAll(taskList.ToArray());
        }
        private async Task ChangeStringValue(DataRow dataRow)
        {
            var id = (int)dataRow["ID"];
            var newValue = await Task.Run(() => CreateNewValue(id));
            dataRow["Name"] = newValue;
        }
        private string CreateNewValue(int id)
        {
            Console.WriteLine(string.Format("{0} - CreateNewValue", id));
            return id.ToString() + "New StringValue";
        }
    }
}

在的这一行代码上偶尔引发System.ArgumentOutOfRangeException

dataRow["Name"] = newValue

异常消息:

Exception thrown: 'System.ArgumentOutOfRangeException' in mscorlib.dll
Additional information: Index was out of range. Must be non-negative and less than the size of the collection.

我试图更具体地识别问题,但我最接近的是简化代码以重现错误。

我认为这与传递到异步方法中的DataRow引用类型有关,但我想知道为什么这段代码不是线程安全的。

使用async和DataRow的非线程安全代码

.Net框架中线程安全类的数量非常少,每个类都会显式调用其安全性(大多数在System.Collecions.Concurrent命名空间中)。对于所有其他类型,您需要提供自己的线程安全机制。与DB相关的类肯定属于非线程安全的写类。类通常显式地允许多线程安全读取。

在您的情况下,DataRow可以从多个线程读取,并且您只需要同步写入:

线程安全

这种类型对于多线程读取操作是安全的。您必须同步任何写入操作。

构建大量项目的标准方法-在范围内拆分项目并独立生成每个项目,然后在所有项目完成后合并。

部分样本基于原始代码,MyRowResult是用于保存需要更新的行的所有数据的类。

        var taskList = new List<Task<MyDataRow>();
        foreach (DataRow dataRow in dataTable.Rows)
        {
            taskList.Add(ChangeStringValue(dataRow));
        }
        // await for proper async code, WaitAll is fin for console app.
        await Task.WhenAll(taskList.ToArray());
        // back to single thread - safe to update rows
        foreach (var task  in taskList)
        {
            task.Result.DataRow["Name"] = task.Result.Name;
        }
    }
    private async Task<MyRowResult> ChangeStringValue(DataRow dataRow)
    {
        // on multiple threads - perform read only operations of rows
        // which is safe as explicitly called out in the documentation
        // "This type is safe for multithreaded read operations."
        var id = (int)dataRow["ID"];
        var newValue = await Task.Run(() => CreateNewValue(id));
        return new MyRowResult { Name = newValue, DataRow = dataRow };
    }

注:

  • 有些类(如UI控件)需要更严格的线程安全性-只能在原始线程上调用
  • Parallel类中的帮助程序可以简化并行运行的代码(如Parallel.ForEach