使用c#在SQL服务器中插入大量数据

本文关键字:插入 数据 服务器 SQL 使用 | 更新日期: 2023-09-27 18:18:39

我正在使用SQL Server 2012,并有一个大约20 GB大小的巨大文件。我想插入每条记录内的文件到数据库。为此,我使用SqlBulkCopy类。但是,由于数据的大小非常大,我将不得不一部分一部分地插入它。下面是代码:

String line;
SqlConnection conn = new SqlConnection(ConfigurationManager.ConnectionStrings["conStrtingName"].ConnectionString);
conn.Open();
StreamReader readFile = new StreamReader(filePath);
SqlTransaction transaction = conn.BeginTransaction();
try
{
    SqlBulkCopy copy = new SqlBulkCopy(conn, SqlBulkCopyOptions.KeepIdentity, transaction);
    copy.BulkCopyTimeout = 600;
    copy.DestinationTableName = "Txn";
    int counter = 0;
    while ((line = readFile.ReadLine()) != null)
    {
        string[] fields = line.Split(''t');
        if (fields.Length == 3)
        {
            DateTime date = Convert.ToDateTime(fields[0]);
            decimal txnCount = Convert.ToDecimal(fields[1]);
            string merchantName = fields[2];
            if (!string.IsNullOrEmpty(merchantName))
            {
                long MerchantId = Array.IndexOf(Program.merchantArray, merchantName) + 1;
                tables[workerId].Rows.Add(MerchantId, date, txnCount);
                counter++;
                if (counter % 100000 == 0)
                    Console.WriteLine("Worker: " + workerId + " - Transaction Records Read: " + counter);
                if (counter % 1000000 == 0)
                {
                    copy.WriteToServer(tables[workerId]);
                    transaction.Commit();
                    tables[workerId].Rows.Clear();
                    //transaction = conn.BeginTransaction();
                    Console.WriteLine("Worker: " + workerId + " - Transaction Records Inserted: " + counter);
                }
            }
        }
    }
    Console.WriteLine("Total Transaction Records Read: " + counter);
    if (tables[workerId].Rows.Count > 0)
    {
        copy.WriteToServer(tables[workerId]);
        transaction.Commit();
        tables[workerId].Rows.Clear();
        Console.WriteLine("Worker: " + workerId + " - Transaction Records Inserted: " + counter);
    }
}
catch (Exception ex)
{
    Console.WriteLine(ex.Message);
    transaction.Rollback();
}
finally
{
    conn.Close();
}

它适用于前100000条记录。但是对于下一组记录,我得到一个异常The transaction is either not associated with the current connection or has been completed.

当控制到达下一组记录的transaction.Commit();时,会发生这种情况。

我可以有一个变通办法吗?

使用c#在SQL服务器中插入大量数据

问题是事务提交后的注释行。您需要取消注释,重新初始化您的SqlBulkCopy copy变量。您最好重构您的代码,唯一需要事务和复制对象的地方是当您刷新正在填充的数据表时,就像这样(您可以进一步将重复的部分分解为单独的方法):

String line;
SqlConnection conn = new SqlConnection(ConfigurationManager.ConnectionStrings["conStrtingName"].ConnectionString);
conn.Open();
StreamReader readFile = new StreamReader(filePath);
SqlTransaction transaction = null;
try
{
    int counter = 0;
    while ((line = readFile.ReadLine()) != null)
    {
        string[] fields = line.Split(''t');
        if (fields.Length == 3)
        {
            DateTime date = Convert.ToDateTime(fields[0]);
            decimal txnCount = Convert.ToDecimal(fields[1]);
            string merchantName = fields[2];
            if (!string.IsNullOrEmpty(merchantName))
            {
                long MerchantId = Array.IndexOf(Program.merchantArray, merchantName) + 1;
                tables[workerId].Rows.Add(MerchantId, date, txnCount);
                counter++;
                if (counter % 100000 == 0)
                    Console.WriteLine("Worker: " + workerId + " - Transaction Records Read: " + counter);
                if (counter % 1000000 == 0)
                {
                    transaction = conn.BeginTransaction()
                    SqlBulkCopy copy = new SqlBulkCopy(conn, SqlBulkCopyOptions.KeepIdentity, transaction);
                    copy.BulkCopyTimeout = 600;
                    copy.DestinationTableName = "Txn";
                    copy.WriteToServer(tables[workerId]);
                    transaction.Commit();
                    transaction = null;
                    tables[workerId].Rows.Clear();
                    Console.WriteLine("Worker: " + workerId + " - Transaction Records Inserted: " + counter);
                }
            }
        }
    }
    Console.WriteLine("Total Transaction Records Read: " + counter);
    if (tables[workerId].Rows.Count > 0)
    {
        transaction = conn.BeginTransaction()
        SqlBulkCopy copy = new SqlBulkCopy(conn, SqlBulkCopyOptions.KeepIdentity, transaction);
        copy.BulkCopyTimeout = 600;
        copy.DestinationTableName = "Txn";
        copy.WriteToServer(tables[workerId]);
        transaction.Commit();
        transaction = null;
        tables[workerId].Rows.Clear();
        Console.WriteLine("Worker: " + workerId + " - Transaction Records Inserted: " + counter);
    }
}
catch (Exception ex)
{
    Console.WriteLine(ex.Message);
    if (transaction != null) transaction.Rollback();
}
finally
{
    conn.Close();
}

问题是,现在你不能回滚所有的更改,以防出现问题。也许更好的解决方案是不要手动分割批量插入,而是使用某种IDataReader实现来避免在内存中填充巨大的DataTable(例如使用Marc Gravell的ObjectReader)。

您的事务每100000集提交一次。所以它"消失了",你必须用transaction = conn.BeginTransaction开始另一个。

也许应该重新编写代码以更好地反映事务的生命周期。您还可能需要确保使用新事务重新创建"copy"。

您可以像这样增加事务的超时(使用适合事务预期长度的值)。下面的代码是15分钟:Source

using (TransactionScope scope = 
             new TransactionScope(TransactionScopeOption.Required, 
                                   new System.TimeSpan(0, 15, 0)))
  {
      // working code here
  }