使用Task.WhenAll跨多个线程的TransactionScope
本文关键字:线程 TransactionScope Task WhenAll 使用 | 更新日期: 2023-09-27 18:08:30
我正在尝试使用Task.WhenAll对数据库进行多次并行更新。代码流如下。
在main方法中,我创建了一个事务范围,并创建了主事务的克隆并传递给子事务。主事务被阻止,直到子事务完成
using (var scope = DalcHelper.GetTransactionScope())
{
DependentTransaction transaction = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete);
var task1= Dalc.UpdateDetails1(transaction );
DependentTransaction transaction1 = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete);
var task2 = Dalc.UpdateDetails2(transaction1);
await Task.WhenAll(task1, task2 ).ConfigureAwait(false);
scope.Complete();
}
DalcMethod是这样的。在这里,从外部事务创建的克隆作为一个参数。从属事务完成,通知主事务从属事务已完成
try
{
using (SqlCommand databaseCommand = DalcHelper.GetCommand(SPName))
using (var scope = new TransactionScope(dependentCloneTransaction, TransactionScopeAsyncFlowOption.Enabled))
{
-- Update database
scope.Complete();
}
}
finally
{
//Call complete on the dependent transaction
dependentCloneTransaction.Complete();
}
Dalc方法是异步方法,返回任务
我得到了以下异常
事务已中止。尝试提升事务时失败。已经有一个打开的DataReader与此命令关联,必须先关闭它。等待操作超时
有人能告诉我我在这里做错了什么吗?
namespace Playground
{
static class DalcHelper
{
public static TransactionScope GetTransactionScope()
{
return new TransactionScope(TransactionScopeAsyncFlowOption.Enabled);
}
public async static Task ReadDetails1(DependentTransaction transaction,SqlConnection conn)
{
try
{
string commandText = "SELECT * FROM dbo.Persons"; // some table, say Persons
using (SqlCommand cmd = new SqlCommand(commandText, conn))
{
cmd.CommandType = System.Data.CommandType.Text;
SqlDataReader reader = await cmd.ExecuteReaderAsync(CommandBehavior.Default);
while (reader.Read())
{
int Id = reader.GetInt32("Id");
Console.WriteLine("Id " + Id);
}
reader.Close();
}
transaction.Complete();
return;
}
catch (Exception ex)
{
Console.WriteLine("Task 1"+ ex.Message);
}
}
public async static Task ReadDetails2(DependentTransaction transaction1, SqlConnection conn)
{
try
{
string commandText = "SELECT * FROM dbo.Persons";
using (SqlCommand cmd = new SqlCommand(commandText, conn))
{
cmd.CommandType = System.Data.CommandType.Text;
SqlDataReader reader = await cmd.ExecuteReaderAsync(CommandBehavior.Default);
while (reader.Read())
{
int age = reader.GetInt32("Age");
Console.WriteLine("Age " + age);
}
reader.Close();
}
transaction1.Complete();
return;
}
catch (Exception ex)
{
Console.WriteLine("Task 2" + ex.Message);
}
}
}
class Program
{
static void Main(string[] args)
{
string connectionString = "YourConnectionString";
_ = RunMe(connectionString);
}
private async static Task RunMe(string connectionString)
{
try
{
Task task1 = Task.Run( async()=> {
using (TransactionScope scope = DalcHelper.GetTransactionScope())
{
using (SqlConnection conn = new SqlConnection(connectionString))
{
DependentTransaction transaction = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete);
conn.Open();
await DalcHelper.ReadDetails1(transaction, conn);
/*
* add more tasks if you wish to
*/
Console.WriteLine("Completed task 1");
conn.Close();
}
scope.Complete();
}
});
Task task2 = Task.Run(async () =>
{
using (TransactionScope scope = DalcHelper.GetTransactionScope())
{
using (SqlConnection conn = new SqlConnection(connectionString))
{
DependentTransaction transaction = Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete);
conn.Open();
await DalcHelper.ReadDetails2(transaction, conn);
/*
may be update some column of table based on previous op.
// await DalcHelper.UpdateDetails2(transaction, conn);
*/
Console.WriteLine("Completed task 2");
conn.Close();
}
/*
calling `Complete` method will commit all the changes within the transaction scope(including the UpdateDetails2 method)
need not dispose transaction scope explicitly, `using` block takes care of that
*/
scope.Complete();
}
});
await Task.WhenAll(task1, task2);// at this point every task added is complete
Console.WriteLine("completed both tasks");
Console.ReadLine();
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
}
}
使用事务范围时需要记住的一些要点
- 它需要在创建的同一线程中处理
TransactionScope
,否则可能会引发类似Transaction already aborted
的错误 - 只有当调用了
TransactionScope.Complete()
方法时,才会持久化任何更新操作 - 确保为每个线程打开单独的连接,并在使用后关闭它。话虽如此,从性能的角度来看,我不确定是否要为每个线程使用单独的连接。我很高兴能在这方面得到更多的教育,我会更新我的答案。但是,这个解决方案应该可以帮助您解决问题
一定要阅读一些已经发布的与主题相关的有用答案
- 从属交易
- 事务中止问题
- 基于线程的连接