如何在最短的时间内插入 1000 万条记录
本文关键字:1000 记录 万条 插入 时间 | 更新日期: 2023-09-27 18:21:19
我有一个文件(有1000万条记录(,如下所示:
line1
line2
line3
line4
.......
......
10 million lines
所以基本上我想在数据库中插入 1000 万条记录。所以我读取了文件并将其上传到SQL Server。
C# 代码
System.IO.StreamReader file =
new System.IO.StreamReader(@"c:'test.txt");
while((line = file.ReadLine()) != null)
{
// insertion code goes here
//DAL.ExecuteSql("insert into table1 values("+line+")");
}
file.Close();
但是插入需要很长时间。如何使用 C# 在尽可能短的时间内插入 1000 万条记录?
更新 1:
批量插入:
BULK INSERT DBNAME.dbo.DATAs
FROM 'F:'dt10000000'dt10000000.txt'
WITH
(
ROWTERMINATOR =' 'n'
);
我的表格如下所示:
DATAs
(
DatasField VARCHAR(MAX)
)
但我收到以下错误:
Msg 4866,级别 16,状态 1,第 1
行 批量加载失败。数据文件中的列对于第 1 行第 1 列来说太长。验证是否正确指定了字段终止符和行终止符。Msg 7399,级别 16,状态 1,第 1
行 链接服务器"(空("的 OLE DB 提供程序"BULK"报告了错误。提供程序未提供有关错误的任何信息。Msg 7330,级别 16,状态 2,第 1
行 无法从链接服务器"(null("的 OLE DB 提供程序"BULK"中获取行。
下面的代码有效:
BULK INSERT DBNAME.dbo.DATAs
FROM 'F:'dt10000000'dt10000000.txt'
WITH
(
FIELDTERMINATOR = ''t',
ROWTERMINATOR = ''n'
);
请不要创建要通过 BulkCopy 加载的DataTable
。对于较小的数据集来说,这是一个不错的解决方案,但在调用数据库之前,绝对没有理由将所有 1000 万行加载到内存中。
最好的选择(在 BCP
/BULK INSERT
/OPENROWSET(BULK...)
之外(是通过表值参数 (TVP( 将文件的内容流式传输到数据库中。 通过使用 TVP,您可以打开文件,读取一行并发送一行直到完成,然后关闭文件。此方法的内存占用量仅为一行。 我写了一篇文章,从应用程序将数据流式传输到SQL Server 2008,其中有一个场景的示例。
的简单概述如下。我假设导入表和字段名称与上述问题所示相同。
必需的数据库对象:
-- First: You need a User-Defined Table Type
CREATE TYPE ImportStructure AS TABLE (Field VARCHAR(MAX));
GO
-- Second: Use the UDTT as an input param to an import proc.
-- Hence "Tabled-Valued Parameter" (TVP)
CREATE PROCEDURE dbo.ImportData (
@ImportTable dbo.ImportStructure READONLY
)
AS
SET NOCOUNT ON;
-- maybe clear out the table first?
TRUNCATE TABLE dbo.DATAs;
INSERT INTO dbo.DATAs (DatasField)
SELECT Field
FROM @ImportTable;
GO
下面提供了使用上述 SQL 对象的 C# 应用代码。请注意,在此方法中,不是填充对象(例如 DataTable(然后执行存储过程,而是执行存储过程来启动文件内容的读取。存储过程的输入参数不是变量;它是一个方法的返回值,GetFileContents
.当SqlCommand
调用 ExecuteNonQuery
时调用该方法,这将打开文件,读取一行并通过IEnumerable<SqlDataRecord>
和yield return
构造将该行发送到 SQL Server,然后关闭该文件。 存储过程只看到一个表变量,@ImportTable,一旦数据开始传入,就可以访问该变量(注意:数据确实会在 tempdb 中保留很短的时间,即使不是完整的内容也是如此(。
using System.Collections;
using System.Data;
using System.Data.SqlClient;
using System.IO;
using Microsoft.SqlServer.Server;
private static IEnumerable<SqlDataRecord> GetFileContents()
{
SqlMetaData[] _TvpSchema = new SqlMetaData[] {
new SqlMetaData("Field", SqlDbType.VarChar, SqlMetaData.Max)
};
SqlDataRecord _DataRecord = new SqlDataRecord(_TvpSchema);
StreamReader _FileReader = null;
try
{
_FileReader = new StreamReader("{filePath}");
// read a row, send a row
while (!_FileReader.EndOfStream)
{
// You shouldn't need to call "_DataRecord = new SqlDataRecord" as
// SQL Server already received the row when "yield return" was called.
// Unlike BCP and BULK INSERT, you have the option here to create a string
// call ReadLine() into the string, do manipulation(s) / validation(s) on
// the string, then pass that string into SetString() or discard if invalid.
_DataRecord.SetString(0, _FileReader.ReadLine());
yield return _DataRecord;
}
}
finally
{
_FileReader.Close();
}
}
上面的GetFileContents
方法用作存储过程的输入参数值,如下所示:
public static void test()
{
SqlConnection _Connection = new SqlConnection("{connection string}");
SqlCommand _Command = new SqlCommand("ImportData", _Connection);
_Command.CommandType = CommandType.StoredProcedure;
SqlParameter _TVParam = new SqlParameter();
_TVParam.ParameterName = "@ImportTable";
_TVParam.TypeName = "dbo.ImportStructure";
_TVParam.SqlDbType = SqlDbType.Structured;
_TVParam.Value = GetFileContents(); // return value of the method is streamed data
_Command.Parameters.Add(_TVParam);
try
{
_Connection.Open();
_Command.ExecuteNonQuery();
}
finally
{
_Connection.Close();
}
return;
}
附加说明:
- 通过一些修改,上面的 C# 代码可以适应批处理数据。
- 只需稍作修改,上面的 C# 代码就可以适应在多个字段中发送(如"Steaming Data..."中显示的示例(。上面链接的文章在 2 个字段中传递(。
- 您还可以在 proc 的
SELECT
语句中操作每条记录的值。 - 您还可以通过在过程中使用 WHERE 条件筛选出行。
- 您可以多次访问 TVP 表变量;它是只读的,但不是"仅转发"。
- 与
SqlBulkCopy
相比的优势:-
SqlBulkCopy
是仅插入的,而使用 TVP 允许以任何方式使用数据:您可以调用MERGE
;您可以根据某些条件DELETE
;您可以将数据拆分为多个表;等等。 - 由于 TVP 不是仅插入的,因此不需要单独的临时表将数据转储到其中。
- 您可以通过调用
ExecuteReader
而不是ExecuteNonQuery
从数据库中获取数据。例如,如果导入DATAs
表上有一个IDENTITY
字段,则可以向INSERT
添加一个OUTPUT
子句以传递回INSERTED.[ID]
(假设ID
是IDENTITY
字段的名称(。或者,您可以传回完全不同的查询结果,或者两者兼而有之,因为可以通过Reader.NextResult()
发送和访问多个结果集。使用SqlBulkCopy
时不可能从数据库中获取信息,但是在S.O.上有几个问题,人们想要这样做(至少在新创建的IDENTITY
值方面(。 - 有关为什么整个过程有时更快(即使将数据从磁盘导入 SQL Server 的速度稍慢(的详细信息,请参阅 SQL Server 客户咨询团队的此白皮书:使用 TVP 最大化吞吐量
-
在 C# 中,最好的解决方案是让SqlBulkCopy
读取文件。为此,您需要将IDataReader
直接传递给SqlBulkCopy.WriteToServer
方法。下面是一个例子:http://www.codeproject.com/Articles/228332/IDataReader-implementation-plus-SqlBulkCopy
最好的方法是混合使用第一个解决方案和第二个解决方案,创建DataTable
并在循环中添加行,然后使用BulkCopy
上传到一个连接中的数据库 在大容量复制中使用它获得帮助
需要注意的另一件事是,批量复制是一个非常敏感的操作,几乎每个错误都会使副本无效,例如,如果您在 dataTable 中将列名声明为"text",而在数据库中将其声明为"text",它将抛出异常,祝你好运。
如果要在最短的时间内插入 1000 万条记录以使用 SQL 查询进行直接测试,则应使用此查询
CREATE TABLE TestData(ID INT IDENTITY (1,1), CreatedDate DATETIME)
GO
INSERT INTO TestData(CreatedDate) SELECT GetDate()
GO 10000000