使用c#在SQL服务器中插入大量数据
本文关键字:插入 数据 服务器 SQL 使用 | 更新日期: 2023-09-27 18:18:39
我正在使用SQL Server 2012,并有一个大约20 GB大小的巨大文件。我想插入每条记录内的文件到数据库。为此,我使用SqlBulkCopy
类。但是,由于数据的大小非常大,我将不得不一部分一部分地插入它。下面是代码:
String line;
SqlConnection conn = new SqlConnection(ConfigurationManager.ConnectionStrings["conStrtingName"].ConnectionString);
conn.Open();
StreamReader readFile = new StreamReader(filePath);
SqlTransaction transaction = conn.BeginTransaction();
try
{
SqlBulkCopy copy = new SqlBulkCopy(conn, SqlBulkCopyOptions.KeepIdentity, transaction);
copy.BulkCopyTimeout = 600;
copy.DestinationTableName = "Txn";
int counter = 0;
while ((line = readFile.ReadLine()) != null)
{
string[] fields = line.Split(''t');
if (fields.Length == 3)
{
DateTime date = Convert.ToDateTime(fields[0]);
decimal txnCount = Convert.ToDecimal(fields[1]);
string merchantName = fields[2];
if (!string.IsNullOrEmpty(merchantName))
{
long MerchantId = Array.IndexOf(Program.merchantArray, merchantName) + 1;
tables[workerId].Rows.Add(MerchantId, date, txnCount);
counter++;
if (counter % 100000 == 0)
Console.WriteLine("Worker: " + workerId + " - Transaction Records Read: " + counter);
if (counter % 1000000 == 0)
{
copy.WriteToServer(tables[workerId]);
transaction.Commit();
tables[workerId].Rows.Clear();
//transaction = conn.BeginTransaction();
Console.WriteLine("Worker: " + workerId + " - Transaction Records Inserted: " + counter);
}
}
}
}
Console.WriteLine("Total Transaction Records Read: " + counter);
if (tables[workerId].Rows.Count > 0)
{
copy.WriteToServer(tables[workerId]);
transaction.Commit();
tables[workerId].Rows.Clear();
Console.WriteLine("Worker: " + workerId + " - Transaction Records Inserted: " + counter);
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
transaction.Rollback();
}
finally
{
conn.Close();
}
它适用于前100000条记录。但是对于下一组记录,我得到一个异常The transaction is either not associated with the current connection or has been completed.
当控制到达下一组记录的transaction.Commit();
时,会发生这种情况。
我可以有一个变通办法吗?
问题是事务提交后的注释行。您需要取消注释,和重新初始化您的SqlBulkCopy copy
变量。您最好重构您的代码,唯一需要事务和复制对象的地方是当您刷新正在填充的数据表时,就像这样(您可以进一步将重复的部分分解为单独的方法):
String line;
SqlConnection conn = new SqlConnection(ConfigurationManager.ConnectionStrings["conStrtingName"].ConnectionString);
conn.Open();
StreamReader readFile = new StreamReader(filePath);
SqlTransaction transaction = null;
try
{
int counter = 0;
while ((line = readFile.ReadLine()) != null)
{
string[] fields = line.Split(''t');
if (fields.Length == 3)
{
DateTime date = Convert.ToDateTime(fields[0]);
decimal txnCount = Convert.ToDecimal(fields[1]);
string merchantName = fields[2];
if (!string.IsNullOrEmpty(merchantName))
{
long MerchantId = Array.IndexOf(Program.merchantArray, merchantName) + 1;
tables[workerId].Rows.Add(MerchantId, date, txnCount);
counter++;
if (counter % 100000 == 0)
Console.WriteLine("Worker: " + workerId + " - Transaction Records Read: " + counter);
if (counter % 1000000 == 0)
{
transaction = conn.BeginTransaction()
SqlBulkCopy copy = new SqlBulkCopy(conn, SqlBulkCopyOptions.KeepIdentity, transaction);
copy.BulkCopyTimeout = 600;
copy.DestinationTableName = "Txn";
copy.WriteToServer(tables[workerId]);
transaction.Commit();
transaction = null;
tables[workerId].Rows.Clear();
Console.WriteLine("Worker: " + workerId + " - Transaction Records Inserted: " + counter);
}
}
}
}
Console.WriteLine("Total Transaction Records Read: " + counter);
if (tables[workerId].Rows.Count > 0)
{
transaction = conn.BeginTransaction()
SqlBulkCopy copy = new SqlBulkCopy(conn, SqlBulkCopyOptions.KeepIdentity, transaction);
copy.BulkCopyTimeout = 600;
copy.DestinationTableName = "Txn";
copy.WriteToServer(tables[workerId]);
transaction.Commit();
transaction = null;
tables[workerId].Rows.Clear();
Console.WriteLine("Worker: " + workerId + " - Transaction Records Inserted: " + counter);
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
if (transaction != null) transaction.Rollback();
}
finally
{
conn.Close();
}
问题是,现在你不能回滚所有的更改,以防出现问题。也许更好的解决方案是不要手动分割批量插入,而是使用某种IDataReader
实现来避免在内存中填充巨大的DataTable
(例如使用Marc Gravell的ObjectReader)。
您的事务每100000集提交一次。所以它"消失了",你必须用transaction = conn.BeginTransaction开始另一个。
也许应该重新编写代码以更好地反映事务的生命周期。您还可能需要确保使用新事务重新创建"copy"。
您可以像这样增加事务的超时(使用适合事务预期长度的值)。下面的代码是15分钟:Source
using (TransactionScope scope =
new TransactionScope(TransactionScopeOption.Required,
new System.TimeSpan(0, 15, 0)))
{
// working code here
}