SqlBulkCopy列映射多个列

本文关键字:映射 SqlBulkCopy | 更新日期: 2023-09-27 17:58:45

我使用SqlBulkCopy将CSV数据导入数据库,但我希望能够组合列。例如,假设我的CSV中有一个名字和姓氏列。通过我的UI,我希望用户能够选择firstname+lastname来填充Displayname字段。这将指示SqlBulkCopy使用字段组合来填充此字段。

在伪代码中,我想做这样的事情。

foreach(Pick p in picks){
    if(p.csv_col_indexes.Count() == 1)
       bulkCopy.ColumnMappings.Add(p.csv_col_indexes[0], p.db_field_index);
    else
       bulkCopy.ColumnMappings.Add(p.csv_col_indexes, p.db_field_index);
}

除了我还需要能够设置字段格式,因为上面的代码不知道在字段之间放空格。

我现在可以通过批量复制到临时表,然后执行INSERT SELECT语句来完成这项工作,但如果可能的话,我更愿意去掉中间阶段。

SqlBulkCopy列映射多个列

我创建了自己的IDataReader实现,并解决了这个问题。

示例CSV文件

email,firstname,lastname,occupation
deborah.clarke@fakedomain.com,Deborah,Clarke,Knitter
diane.moore@fakedomain.com,Diane,Moore,Local Government Officer
billy.smith@fakedomain.co,Billy,Smith,Debt Counsellor
jennifer.campbell@fakedomain.ie,Jennifer,Campbell,Vending Machine Technician

示例数据库模式

CREATE TABLE [dbo].[Contact](
    [ID] [int] IDENTITY(1,1) NOT NULL,
    [FirstName] [nvarchar](max) NULL,
    [LastName] [nvarchar](max) NULL,
    [FullName] [nvarchar](max) NULL,
    [Email] [nvarchar](max) NULL,
    [Job] [nvarchar](max) NULL
)

自定义类

public class Mapping{
    public int[] csv_cols { get; set; }
    public string db_column_name { get; set; }
    public int db_column_index { get; set; }
    public string column_format { get; set; }
}

IDataReader实现

public class CustomDataReader : IDataReader
{
    private List<Mapping> mappings  = null;
    private string[] database_columns; 
    private int _currentIndex = -1;
    private System.Data.DataTable csv_data;
    public CustomDataReader(
        List<Mapping> _mappings, 
        string[] _database_columns, 
        System.Data.DataTable _csv_data, 
        bool skip_first_row)
    {
        mappings = _mappings;
        database_columns = _database_columns;
        csv_data = _csv_data;
        if (skip_first_row)
            _currentIndex++;
    }
    // get the number of data fields in each record.
    public int FieldCount
    {
        get { return database_columns.Count(); } 
    }
    public bool Read()
    {
        if ((_currentIndex + 1) < csv_data.Rows.Count)
        {
            _currentIndex++;
            return true;
        }
        else
        {
            return false;
        }
    }
    // get the column name for the specified db column number.  
    public string GetName(int i)
    {
        if(i > 0 && i < database_columns.Count())
        {
            return database_columns[i];
        }
        return string.Empty;
    }
    // get the column number for the specified dbcolumn name.
    public int GetOrdinal(string name)
    {
        for (int i = 0; i < database_columns.Count(); i++)
        {
            if (database_columns[i] == name)
                return i;
        }
        return -1;
    }

    // get the value of the field for the supplied column number. 
    public object GetValue(int i)
    {            
        // loop through our mappings
        foreach (Mapping p in mappings)
        {
            // find the mapping that relates to this db column
            if(p.db_column_index == i)
            {
                // if column format is specified, build the data
                if (p.column_format != null)
                {
                    List<string> data = new List<string>();
                    foreach (int b in p.csv_cols)
                    {
                        data.Add(csv_data.Rows[_currentIndex][b].ToString());
                    }
                    return string.Format(p.column_format, data.ToArray());
                }
                // otherwise, just return the value we need
                else
                    return csv_data.Rows[_currentIndex][p.csv_cols[0]];
            }
        }
        return null;
    }
    #region Not Implemented
    public void Close()
    {
        throw new NotImplementedException();
    }
    public int Depth
    {
        get { throw new NotImplementedException(); }
    }
    public DataTable GetSchemaTable()
    {
        throw new NotImplementedException();
    }
    public bool IsClosed
    {
        get { throw new NotImplementedException(); }
    }
    public bool NextResult()
    {
        throw new NotImplementedException();
    }
    public int RecordsAffected
    {
        get { throw new NotImplementedException(); }
    }
    public void Dispose()
    {
        throw new NotImplementedException();
    }
    public bool GetBoolean(int i)
    {
        throw new NotImplementedException();
    }
    public byte GetByte(int i)
    {
        throw new NotImplementedException();
    }
    public long GetBytes(int i, long fieldOffset, byte[] buffer, int bufferoffset, int length)
    {
        throw new NotImplementedException();
    }
    public char GetChar(int i)
    {
        throw new NotImplementedException();
    }
    public long GetChars(int i, long fieldoffset, char[] buffer, int bufferoffset, int length)
    {
        throw new NotImplementedException();
    }
    public IDataReader GetData(int i)
    {
        throw new NotImplementedException();
    }
    public string GetDataTypeName(int i)
    {
        throw new NotImplementedException();
    }
    public DateTime GetDateTime(int i)
    {
        throw new NotImplementedException();
    }
    public decimal GetDecimal(int i)
    {
        throw new NotImplementedException();
    }
    public double GetDouble(int i)
    {
        throw new NotImplementedException();
    }
    public Type GetFieldType(int i)
    {
        throw new NotImplementedException();
    }
    public float GetFloat(int i)
    {
        throw new NotImplementedException();
    }
    public Guid GetGuid(int i)
    {
        throw new NotImplementedException();
    }
    public short GetInt16(int i)
    {
        throw new NotImplementedException();
    }
    public int GetInt32(int i)
    {
        throw new NotImplementedException();
    }
    public long GetInt64(int i)
    {
        throw new NotImplementedException();
    }
    public string GetString(int i)
    {
        throw new NotImplementedException();
    }
    public int GetValues(object[] values)
    {
        throw new NotImplementedException();
    }
    public bool IsDBNull(int i)
    {
        throw new NotImplementedException();
    }
    public object this[string name]
    {
        get { throw new NotImplementedException(); }
    }
    public object this[int i]
    {
        get { throw new NotImplementedException(); }
    }
    #endregion

代码调用CustomDataReader

// build a list of the column mappings
List<Mapping> mappings = new List<Mapping>();
mappings.Add(new Mapping{ csv_cols = new int[] { 0 },    db_column_name = "Email" });
mappings.Add(new Mapping{ csv_cols = new int[] { 1 },    db_column_name = "FirstName" });
mappings.Add(new Mapping{ csv_cols = new int[] { 2 },    db_column_name = "MiddleName" });
mappings.Add(new Mapping{ csv_cols = new int[] { 3 },    db_column_name = "LastName" });
mappings.Add(new Mapping{ csv_cols = new int[] { 1, 3 }, db_column_name = "DisplayName", col_format = "{0} {1}" });
List<string> columns = new List<string>();
using (System.Data.SqlClient.SqlConnection conn = new System.Data.SqlClient.SqlConnection(ConfigurationManager.ConnectionStrings["MyConnString"].ToString()))
{
    conn.Open();
    // get a list of all the database columns
    using (var cmd = new System.Data.SqlClient.SqlCommand("SELECT column_name FROM MyDatabase.INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = N'Contact'", conn))
    {
        cmd.CommandType = CommandType.Text;
        System.Data.SqlClient.SqlDataReader sdr = cmd.ExecuteReader();
        while (sdr.Read())
        {
            var col = sdr.GetValue(0).ToString();
            columns.Add(col);
        }
        sdr.Close();
    }
    // get the db column for each mapping
    foreach (Mapping p in mappings)
    {
        p.db_col_num = columns.FindIndex(x => x.ToLower() == p.db_field.ToLower());
    }
    // create our custom data reader
    CustomDataReader cdr = new CustomDataReader(mappings, columns.ToArray(), csv, true);
    // bulkcopy data to Contacts table
    using (var bulkCopy = new System.Data.SqlClient.SqlBulkCopy(conn))
    {
        bulkCopy.DestinationTableName = "Contact";
        bulkCopy.WriteToServer(cdr);
    }
}