如何使用文件帮助程序库将大型 SQL Server 表导出到 CSV 文件中

本文关键字:文件 CSV Server 大型 帮助 何使用 帮助程序 程序库 SQL | 更新日期: 2023-09-27 18:35:54

我希望使用 C# 和 FileHelpers 库将大型 SQL Server 表导出为 CSV 文件。我也可以考虑C#和bcp,但我认为FileHelpers会比bcp更灵活。速度不是特殊要求。 运行以下代码时,storage.ExtractRecords()上抛出OutOfMemoryException(省略了一些不太重要的代码):

  SqlServerStorage storage = new SqlServerStorage(typeof(Order));
    storage.ServerName = "SqlServer"; 
    storage.DatabaseName = "SqlDataBase";
    storage.SelectSql = "select * from Orders";
    storage.FillRecordCallback = new FillRecordHandler(FillRecordOrder);
    Order[] output = null;
    output = storage.ExtractRecords() as Order[];

运行以下代码时,link.ExtractToFile()上会抛出"超时过期":

 SqlServerStorage storage = new SqlServerStorage(typeof(Order));
    string sqlConnectionString = "Server=SqlServer;Database=SqlDataBase;Trusted_Connection=True";
    storage.ConnectionString = sqlConnectionString;
    storage.SelectSql = "select * from Orders";
    storage.FillRecordCallback = new FillRecordHandler(FillRecordOrder);
    FileDataLink link = new FileDataLink(storage);
    link.FileHelperEngine.HeaderText = headerLine;
    link.ExtractToFile("file.csv");

SQL 查询运行时间超过默认的 30 秒,因此会出现超时异常。不幸的是,我在文件助手文档中找不到如何将 SQL 命令超时设置为更高的值。

我可以考虑在小数据集上循环 SQL 选择,直到导出整个表,但这个过程太复杂了。有没有一种简单的方法可以在大型数据库表导出中使用文件助手?

如何使用文件帮助程序库将大型 SQL Server 表导出到 CSV 文件中

Rei Sivan的答案是正确的,因为它可以很好地扩展大文件,因为它避免了将整个表读入内存。 但是,可以清理代码。

shamp00的解决方案需要外部库。

下面是一个更简单的表到 CSV 文件导出器,它可以很好地扩展到大文件,并且不需要任何外部库:

using System;
using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using System.IO;
using System.Linq;
public class TableDumper
{
    public void DumpTableToFile(SqlConnection connection, string tableName, string destinationFile)
    {
        using (var command = new SqlCommand("select * from " + tableName, connection))
        using (var reader = command.ExecuteReader())
        using (var outFile = File.CreateText(destinationFile))
        {
            string[] columnNames = GetColumnNames(reader).ToArray();
            int numFields = columnNames.Length;
            outFile.WriteLine(string.Join(",", columnNames));
            if (reader.HasRows)
            {
                while (reader.Read())
                {
                    string[] columnValues = 
                        Enumerable.Range(0, numFields)
                                  .Select(i => reader.GetValue(i).ToString())
                                  .Select(field => string.Concat("'"", field.Replace("'"", "'"'""), "'""))
                                  .ToArray();
                    outFile.WriteLine(string.Join(",", columnValues));
                }
            }
        }
    }
    private IEnumerable<string> GetColumnNames(IDataReader reader)
    {
        foreach (DataRow row in reader.GetSchemaTable().Rows)
        {
            yield return (string)row["ColumnName"];
        }
    }
}

我编写了这段代码,并声明它为CC0(公共领域)。

我合并了 2 上面的代码。 我使用此代码。我使用 VS 2010。

      //this is all lib that i used|||||||||||||||
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Drawing;
using System.Linq;
using System.Text;
using System.Windows.Forms;
using UsbLibrary;
using System.Data;
using System.Data.SqlClient;
using System.Configuration;
using System.Globalization;


        //cocy in a button||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
        SqlConnection _connection = new SqlConnection();
        SqlDataAdapter _dataAdapter = new SqlDataAdapter();
        SqlCommand _command = new SqlCommand();
        DataTable _dataTable = new DataTable();
        _connection = new SqlConnection();
        _dataAdapter = new SqlDataAdapter();
        _command = new SqlCommand();
        _dataTable = new DataTable();
        //dbk is my database name that you can change it to your database name
        _connection.ConnectionString = "Data Source=.;Initial Catalog=dbk;Integrated Security=True";
        _connection.Open();
        SaveFileDialog saveFileDialogCSV = new SaveFileDialog();
        saveFileDialogCSV.InitialDirectory = Application.ExecutablePath.ToString();
        saveFileDialogCSV.Filter = "CSV files (*.csv)|*.csv|All files (*.*)|*.*";
        saveFileDialogCSV.FilterIndex = 1;
        saveFileDialogCSV.RestoreDirectory = true;
        string   path_csv="";
        if (saveFileDialogCSV.ShowDialog() == DialogResult.OK)
        {
            // Runs the export operation if the given filenam is valid.
            path_csv=   saveFileDialogCSV.FileName.ToString();
        }

             DumpTableToFile(_connection, "tbl_trmc", path_csv);
        }
        //end of code in button|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||

    public void DumpTableToFile(SqlConnection connection, string tableName, string destinationFile)
    {
        using (var command = new SqlCommand("select * from " + tableName, connection))
        using (var reader = command.ExecuteReader())
        using (var outFile = System.IO.File.CreateText(destinationFile))
        {
            string[] columnNames = GetColumnNames(reader).ToArray();
            int numFields = columnNames.Length;
            outFile.WriteLine(string.Join(",", columnNames));
            if (reader.HasRows)
            {
                while (reader.Read())
                {
                    string[] columnValues =
                        Enumerable.Range(0, numFields)
                                  .Select(i => reader.GetValue(i).ToString())
                                  .Select(field => string.Concat("'"", field.Replace("'"", "'"'""), "'""))
                                  .ToArray();
                    outFile.WriteLine(string.Join(",", columnValues));
                }
            }
        }
    }
    private IEnumerable<string> GetColumnNames(IDataReader reader)
    {
        foreach (DataRow row in reader.GetSchemaTable().Rows)
        {
            yield return (string)row["ColumnName"];
        }
    }

试试这个:

private void exportToCSV()
{
    //Asks the filenam with a SaveFileDialog control.
    SaveFileDialog saveFileDialogCSV = new SaveFileDialog();
    saveFileDialogCSV.InitialDirectory = Application.ExecutablePath.ToString();
    saveFileDialogCSV.Filter = "CSV files (*.csv)|*.csv|All files (*.*)|*.*";
    saveFileDialogCSV.FilterIndex = 1;
    saveFileDialogCSV.RestoreDirectory = true;
    if (saveFileDialogCSV.ShowDialog() == DialogResult.OK)
    {
        // Runs the export operation if the given filenam is valid.
        exportToCSVfile(saveFileDialogCSV.FileName.ToString());
    }
}

 * Exports data to the CSV file.
 */
private void exportToCSVfile(string fileOut)
{
    // Connects to the database, and makes the select command.
    string sqlQuery = "select * from dbo." + this.lbxTables.SelectedItem.ToString();
    SqlCommand command = new SqlCommand(sqlQuery, objConnDB_Auto);
    // Creates a SqlDataReader instance to read data from the table.
    SqlDataReader dr = command.ExecuteReader();
    // Retrives the schema of the table.
    DataTable dtSchema = dr.GetSchemaTable();
    // Creates the CSV file as a stream, using the given encoding.
    StreamWriter sw = new StreamWriter(fileOut, false, this.encodingCSV);
    string strRow; // represents a full row
    // Writes the column headers if the user previously asked that.
    if (this.chkFirstRowColumnNames.Checked)
    {
        sw.WriteLine(columnNames(dtSchema, this.separator));
    }
    // Reads the rows one by one from the SqlDataReader
    // transfers them to a string with the given separator character and
    // writes it to the file.
    while (dr.Read())
    {
        strRow = "";
        for (int i = 0; i < dr.FieldCount; i++)
        {
            switch (Convert.ToString(dr.GetFieldType(i)))
            {
                case "System.Int16":
                    strRow += Convert.ToString(dr.GetInt16(i));
                    break;
                case "System.Int32" :
                    strRow += Convert.ToString(dr.GetInt32(i));
                    break;
                case "System.Int64":
                    strRow += Convert.ToString(dr.GetInt64(i));
                    break;
                case "System.Decimal":
                    strRow += Convert.ToString(dr.GetDecimal(i));
                    break;
                case "System.Double":
                    strRow += Convert.ToString(dr.GetDouble(i));
                    break;
                case "System.Float":
                    strRow += Convert.ToString(dr.GetFloat(i));
                    break;
                case "System.Guid":
                    strRow += Convert.ToString(dr.GetGuid(i));
                    break;
                case "System.String":
                    strRow += dr.GetString(i);
                    break;
                case "System.Boolean":
                    strRow += Convert.ToString(dr.GetBoolean(i));
                    break;
                case "System.DateTime":
                    strRow += Convert.ToString(dr.GetDateTime(i));
                    break;
            }
            if (i < dr.FieldCount - 1)
            {
                strRow += this.separator;
            }
        }
        sw.WriteLine(strRow);
    }

    // Closes the text stream and the database connenction.
    sw.Close();
    dr.Close();
    // Notifies the user.
    MessageBox.Show("ready");
}

非常感谢Jay Sullivan的回答 - 对我非常有帮助。

在此基础上,我观察到在他的解决方案中,varbinary和字符串数据类型的字符串格式不好 - varbinary字段会显示为字面"System.Byte"或类似的东西,而日期时间字段将被格式化为MM/dd/yyyy hh:mm:ss tt,这对我来说是不可取的。

下面是我的黑客解决方案,它根据数据类型以不同的方式转换为字符串。 它使用嵌套的三元运算符,但它有效!

希望对某人有帮助。

public static void DumpTableToFile(SqlConnection connection, Dictionary<string, string> cArgs)
{
    string query = "SELECT ";
    string z = "";
    if (cArgs.TryGetValue("top_count", out z))
    {
        query += string.Format("TOP {0} ", z);
    }
    query += string.Format("* FROM {0} (NOLOCK) ", cArgs["table"]);
    string lower_bound = "", upper_bound = "", column_name = "";
    if (cArgs.TryGetValue("lower_bound", out lower_bound) && cArgs.TryGetValue("column_name", out column_name))
    {
        query += string.Format("WHERE {0} >= {1} ", column_name, lower_bound);
        if (cArgs.TryGetValue("upper_bound", out upper_bound))
        {
            query += string.Format("AND {0} < {1} ", column_name, upper_bound);
        }
    }
    Console.WriteLine(query);
    Console.WriteLine("");
    using (var command = new SqlCommand(query, connection))
    using (var reader = command.ExecuteReader())
    using (var outFile = File.CreateText(cArgs["out_file"]))
    {
        string[] columnNames = GetColumnNames(reader).ToArray();
        int numFields = columnNames.Length;
        Console.WriteLine(string.Join(",", columnNames));
        Console.WriteLine("");
        if (reader.HasRows)
        {
            Type datetime_type = Type.GetType("System.DateTime");
            Type byte_arr_type = Type.GetType("System.Byte[]");
            string format = "yyyy-MM-dd HH:mm:ss.fff";
            int ii = 0;
            while (reader.Read())
            {
                ii += 1;
                string[] columnValues =
                    Enumerable.Range(0, numFields)
                        .Select(i => reader.GetValue(i).GetType()==datetime_type?((DateTime) reader.GetValue(i)).ToString(format):(reader.GetValue(i).GetType() == byte_arr_type? String.Concat(Array.ConvertAll((byte[]) reader.GetValue(i), x => x.ToString("X2"))) :reader.GetValue(i).ToString()))
                        ///.Select(field => string.Concat("'"", field.Replace("'"", "'"'""), "'""))
                        .Select(field => field.Replace("'t", " "))
                                .ToArray();
                outFile.WriteLine(string.Join("'t", columnValues));
                if (ii % 100000 == 0)
                {
                    Console.WriteLine("row {0}", ii);
                }
            }
        }
    }
}
public static IEnumerable<string> GetColumnNames(IDataReader reader)
{
    foreach (DataRow row in reader.GetSchemaTable().Rows)
    {
        yield return (string)row["ColumnName"];
    }
}

FileHelpers 有一个异步引擎,更适合处理大文件。不幸的是,FileDataLink类不使用它,因此没有简单的方法可以将其与SqlStorage一起使用。

修改 SQL 超时也不是很容易。最简单的方法是复制代码以供SqlServerStorage创建自己的替代存储提供程序,并为ExecuteAndClose()ExecuteAndLeaveOpen()提供替换,以设置IDbCommand超时。(SqlServerStorage是一个密封类,所以你不能只是子类化它)。

您可能想查看ReactiveETL,它使用FileHelpers异步引擎来处理文件,以及使用ReactiveExtensions重写Ayende的RhinoETL来处理大型数据集。