使用Task.WhenAll跨多个线程的TransactionScope

本文关键字:线程 TransactionScope Task WhenAll 使用 | 更新日期: 2023-09-27 18:08:30

我正在尝试使用Task.WhenAll对数据库进行多次并行更新。代码流如下。

在main方法中,我创建了一个事务范围,并创建了主事务的克隆并传递给子事务。主事务被阻止,直到子事务完成

using (var scope = DalcHelper.GetTransactionScope())
{
    DependentTransaction transaction = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete);
    var task1= Dalc.UpdateDetails1(transaction );
    DependentTransaction transaction1 = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete);
    var task2 = Dalc.UpdateDetails2(transaction1);
    await Task.WhenAll(task1, task2 ).ConfigureAwait(false);
    scope.Complete();
}

DalcMethod是这样的。在这里,从外部事务创建的克隆作为一个参数。从属事务完成,通知主事务从属事务已完成

try
{
    using (SqlCommand databaseCommand = DalcHelper.GetCommand(SPName))
    using (var scope = new TransactionScope(dependentCloneTransaction, TransactionScopeAsyncFlowOption.Enabled))
    {
        -- Update database
        scope.Complete();
    }
}
finally
{
    //Call complete on the dependent transaction
    dependentCloneTransaction.Complete();
}

Dalc方法是异步方法,返回任务

我得到了以下异常

事务已中止。尝试提升事务时失败。已经有一个打开的DataReader与此命令关联,必须先关闭它。等待操作超时

有人能告诉我我在这里做错了什么吗?

使用Task.WhenAll跨多个线程的TransactionScope

namespace Playground
{
    static class DalcHelper
    {
        public static TransactionScope GetTransactionScope()
        {
            return new TransactionScope(TransactionScopeAsyncFlowOption.Enabled);
        }
        public async static Task ReadDetails1(DependentTransaction transaction,SqlConnection conn)
        {
            try
            {
                string commandText = "SELECT * FROM dbo.Persons"; // some table, say Persons
                using (SqlCommand cmd = new SqlCommand(commandText, conn))
                {
                    cmd.CommandType = System.Data.CommandType.Text;
                    SqlDataReader reader = await cmd.ExecuteReaderAsync(CommandBehavior.Default);
                    while (reader.Read())
                    {
                        int Id = reader.GetInt32("Id");
                        Console.WriteLine("Id " + Id);
                    }
                    reader.Close();
                }
                transaction.Complete();
                return;
            }
            catch (Exception ex)
            {
                Console.WriteLine("Task 1"+ ex.Message);
            }
        }
        public async static Task ReadDetails2(DependentTransaction transaction1, SqlConnection conn)
        {
            try
            {
                string commandText = "SELECT * FROM dbo.Persons";
                using (SqlCommand cmd = new SqlCommand(commandText, conn))
                {
                    cmd.CommandType = System.Data.CommandType.Text;
                    SqlDataReader reader = await cmd.ExecuteReaderAsync(CommandBehavior.Default);
                    while (reader.Read())
                    {
                        int age = reader.GetInt32("Age");
                        Console.WriteLine("Age " + age);
                    }
                    reader.Close();
                }
                transaction1.Complete();
                return;
            }
            catch (Exception ex)
            {
                Console.WriteLine("Task 2" + ex.Message);
            }
        }
    }
    class Program
    {
        static void Main(string[] args)
        {
            string connectionString = "YourConnectionString";
            _ = RunMe(connectionString);
        }
        private async static Task RunMe(string connectionString)
        {
            
                try
                {
                    
                    Task task1 = Task.Run( async()=> {
                        using (TransactionScope scope = DalcHelper.GetTransactionScope())
                        {
                            using (SqlConnection conn = new SqlConnection(connectionString))
                            {
                                DependentTransaction transaction = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete);
                                conn.Open();
                                await DalcHelper.ReadDetails1(transaction, conn);
                                /*
                                * add more tasks if you wish to
                                */
                                Console.WriteLine("Completed task 1");
                                conn.Close();
                            }
                            scope.Complete();
                        }
                    });
                    
                    Task task2 = Task.Run(async () =>
                    {
                        using (TransactionScope scope = DalcHelper.GetTransactionScope())
                        {
                            using (SqlConnection conn = new SqlConnection(connectionString))
                            {
                                DependentTransaction transaction = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete);
                                conn.Open();
                                await DalcHelper.ReadDetails2(transaction, conn);
                                /*
                                    may be update some column of table based on previous op.
                                   // await DalcHelper.UpdateDetails2(transaction, conn); 
                                */ 
                                Console.WriteLine("Completed task 2");
                                conn.Close();
                            }
                            /*
                            calling `Complete` method will commit all the changes within the transaction scope(including the UpdateDetails2 method)
                            need not dispose transaction scope explicitly, `using` block takes care of that
                            */ 
                            scope.Complete(); 
                        }
                    });
                 await Task.WhenAll(task1, task2);// at this point every task added is complete
                 Console.WriteLine("completed both tasks");
                 Console.ReadLine();
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Message);
                }
            }
        }
    }

使用事务范围时需要记住的一些要点

  1. 它需要在创建的同一线程中处理TransactionScope,否则可能会引发类似Transaction already aborted的错误
  2. 只有当调用了TransactionScope.Complete()方法时,才会持久化任何更新操作
  3. 确保为每个线程打开单独的连接,并在使用后关闭它。话虽如此,从性能的角度来看,我不确定是否要为每个线程使用单独的连接。我很高兴能在这方面得到更多的教育,我会更新我的答案。但是,这个解决方案应该可以帮助您解决问题

一定要阅读一些已经发布的与主题相关的有用答案

  • 从属交易
  • 事务中止问题
  • 基于线程的连接