如何将列添加到DataReader

本文关键字:DataReader 添加 | 更新日期: 2023-09-27 18:29:37

我的目标是从数据源中检索数据,向其中添加一些元数据,并将其插入另一个目标。

目标具有比源(计算列)多四列的架构。

我使用的是SqlBulkCopy,它需要一个包含所有列(包括计算的4列)的读取器。

是否有手动将列添加到DataReader的方法或者如果不可能,我有什么数据插入的替代方案?

如何将列添加到DataReader

DataReader是一个只读构造,因此无法修改。您可以使用数据表

可能是

  • 创建自己的类来实现IDataReader接口
  • 在类构造函数中添加现有的DataReader
  • 根据需要重写接口以从基本DataReader返回结果或返回您自己计算的值

只是想了解一下,这可能是一个简单的实现(我跳过了大多数方法)

public class WrapperDataReader : IDataReader
{
    private IDataReader reader;
    public WrapperDataReader(IDataReader reader)
    {
        this.reader = reader;
    }
    public void Close()
    {
        reader.Close();
    }
    public int Depth
    {
        get { return reader.Depth; }
    }
    public DataTable GetSchemaTable()
    {
        var schemaTable = reader.GetSchemaTable();
        // add your computed column to the schema table
        schemaTable.Rows.Add(...);
        return schemaTable;
    }
    public bool GetBoolean(int i)
    {
        return reader.GetBoolean(i);
    }
    public int GetOrdinal(string name)
    {
        if (name.Equals("displayName", StringComparison.InvariantCultureIgnoreCase))
            return 15;
        return reader.GetOrdinal(name);
    }
    public string GetString(int i)
    {
        if (i == 15)
            return String.Format("{0}, {1}", GetString(1), GetString(2)); // lastname, firstname
        return reader.GetString(i);
    }
}

更新

由于您建议使用WriteToServer方法,因此可以使用重载来获取DataTable。

        var connectionString = "...";
        var copy = new SqlBulkCopy(connectionString, SqlBulkCopyOptions.Default);
        copy.DestinationTableName = "Customers";
        var reader = new SqlDataReader();
        var table = new DataTable();
        table.Load(reader);
        table.Columns.Add("DisplayName", typeof(string), "lastname, firstname");
        table.Columns.Add("CustomerCode", typeof(string));
        foreach (DataRow row in table.Rows)
            row["CustomerCode"] = ((int)row["id"] + 10000).ToString();
        copy.WriteToServer(table);

我需要做到这一点,并且还能够基于其他列创建列,并使列的值取决于读取器的行索引。如果这个类对您有用(对我有用),那么它将从Dapper实现IWrappedReader。该类并不完整,因为我没有实现所有的IDataRecord字段,但您可以看看我是如何实现IDataRecord.GetInt32的,以查看简单的模式。

/// <inheritdoc />
public class WrappedDataReader : IDataReader
{
    private readonly IList<AdditionalField> _additionalFields;
    private readonly int _originalOrdinalCount;
    private IDbCommand _cmd;
    private int _currentRowIndex = -1; //The first Read() will make this 0
    private IDataReader _underlyingReader;
    public WrappedDataReader(IDataReader underlyingReader, IList<AdditionalField> additionalFields)
    {
        _additionalFields = additionalFields;
        _underlyingReader = underlyingReader;
        var schema = Reader.GetSchemaTable();
        if (schema == null)
        {
            throw new ObjectDisposedException(GetType().Name);
        }
        _originalOrdinalCount = schema.Rows.Count;
    }
    public object this[int i]
    {
        get { throw new NotImplementedException(); }
    }
    public IDataReader Reader
    {
        get
        {
            if (_underlyingReader == null)
            {
                throw new ObjectDisposedException(GetType().Name);
            }
            return _underlyingReader;
        }
    }
    IDbCommand IWrappedDataReader.Command
    {
        get
        {
            if (_cmd == null)
            {
                throw new ObjectDisposedException(GetType().Name);
            }
            return _cmd;
        }
    }
    void IDataReader.Close() => _underlyingReader?.Close();
    int IDataReader.Depth => Reader.Depth;
    DataTable IDataReader.GetSchemaTable()
    {
        var rv = Reader.GetSchemaTable();
        if (rv == null)
        {
            throw new ObjectDisposedException(GetType().Name);
        }
        for (var i = 0; i < _additionalFields.Count; i++)
        {
            var row = rv.NewRow();
            row["ColumnName"] = _additionalFields[i].ColumnName;
            row["ColumnOrdinal"] = GetAppendColumnOrdinal(i);
            row["DataType"] = _additionalFields[i].DataType;
            rv.Rows.Add(row);
        }
        return rv;
    }
    bool IDataReader.IsClosed => _underlyingReader?.IsClosed ?? true;
    bool IDataReader.NextResult() => Reader.NextResult();
    bool IDataReader.Read()
    {
        _currentRowIndex++;
        return Reader.Read();
    }
    int IDataReader.RecordsAffected => Reader.RecordsAffected;
    void IDisposable.Dispose()
    {
        _underlyingReader?.Close();
        _underlyingReader?.Dispose();
        _underlyingReader = null;
        _cmd?.Dispose();
        _cmd = null;
    }
    int IDataRecord.FieldCount => Reader.FieldCount + _additionalFields.Count;
    bool IDataRecord.GetBoolean(int i) => Reader.GetBoolean(i);
    byte IDataRecord.GetByte(int i) => Reader.GetByte(i);
    long IDataRecord.GetBytes(int i, long fieldOffset, byte[] buffer, int bufferoffset, int length) =>
        Reader.GetBytes(i, fieldOffset, buffer, bufferoffset, length);
    char IDataRecord.GetChar(int i) => Reader.GetChar(i);
    long IDataRecord.GetChars(int i, long fieldoffset, char[] buffer, int bufferoffset, int length) =>
        Reader.GetChars(i, fieldoffset, buffer, bufferoffset, length);
    IDataReader IDataRecord.GetData(int i) => Reader.GetData(i);
    string IDataRecord.GetDataTypeName(int i) => Reader.GetDataTypeName(i);
    DateTime IDataRecord.GetDateTime(int i) => Reader.GetDateTime(i);
    decimal IDataRecord.GetDecimal(int i) => Reader.GetDecimal(i);
    double IDataRecord.GetDouble(int i) => Reader.GetDouble(i);
    Type IDataRecord.GetFieldType(int i) => Reader.GetFieldType(i);
    float IDataRecord.GetFloat(int i) => Reader.GetFloat(i);
    Guid IDataRecord.GetGuid(int i) => Reader.GetGuid(i);
    short IDataRecord.GetInt16(int i) => Reader.GetInt16(i);
    int IDataRecord.GetInt32(int i)
    {
        return i >= _originalOrdinalCount ? (int) ExecuteAdditionalFieldFunc(i) : Reader.GetInt32(i);
    }
    long IDataRecord.GetInt64(int i) => Reader.GetInt64(i);
    string IDataRecord.GetName(int i)
    {
        return i >= _originalOrdinalCount ? _additionalFields[GetAppendColumnIndex(i)].ColumnName : Reader.GetName(i);
    }
    int IDataRecord.GetOrdinal(string name)
    {
        for (var i = 0; i < _additionalFields.Count; i++)
        {
            if (name.Equals(_additionalFields[i].ColumnName, StringComparison.OrdinalIgnoreCase))
            {
                return GetAppendColumnOrdinal(i);
            }
        }
        return Reader.GetOrdinal(name);
    }
    string IDataRecord.GetString(int i) => Reader.GetString(i);
    object IDataRecord.GetValue(int i)
    {
        return i >= _originalOrdinalCount ? ExecuteAdditionalFieldFunc(i) : Reader.GetValue(i);
    }
    int IDataRecord.GetValues(object[] values) => Reader.GetValues(values);
    bool IDataRecord.IsDBNull(int i)
    {
        return i >= _originalOrdinalCount ? ExecuteAdditionalFieldFunc(i) == null : Reader.IsDBNull(i);
    }
    object IDataRecord.this[string name]
    {
        get
        {
            var ordinal = ((IDataRecord) this).GetOrdinal(name);
            return ((IDataRecord) this).GetValue(ordinal);
        }
    }

    object IDataRecord.this[int i] => ((IDataRecord) this).GetValue(i);
    private int GetAppendColumnOrdinal(int index)
    {
        return _originalOrdinalCount + index;
    }
    private int GetAppendColumnIndex(int oridinal)
    {
        return oridinal - _originalOrdinalCount;
    }
    private object ExecuteAdditionalFieldFunc(int oridinal)
    {
        return _additionalFields[GetAppendColumnIndex(oridinal)].Func(_currentRowIndex, Reader);
    }
    public struct AdditionalField
    {
        public AdditionalField(string columnName, Type dataType, Func<int, IDataReader, object> func = null)
        {
            ColumnName = columnName;
            DataType = dataType;
            Func = func;
        }
        public string ColumnName;
        public Type DataType;
        public Func<int, IDataReader, object> Func;
    }
}

下面是我写的一个快速NUnit测试,以展示它的使用

[Test]
public void ReaderTest()
{
    using (var conn = new SqlConnection(ConnectionSettingsCollection.Default))
    {
        conn.Open();
        const string sql = @"
            SELECT 1 as OriginalField
            UNION
            SELECT -500 as OriginalField
            UNION
            SELECT 100 as OriginalField
        ";
        var additionalFields = new[]
        {
            new WrappedDataReader.AdditionalField("StaticField", typeof(int), delegate { return "X"; }),
            new WrappedDataReader.AdditionalField("CounterField", typeof(int), (i, reader) => i),
            new WrappedDataReader.AdditionalField("ComputedField", typeof(int), (i, reader) => (int) reader["OriginalField"] + 1000)
        };
        const string expectedJson = @"
            [
                {""OriginalField"":-500,""StaticField"":""X"",""CounterField"":0,""ComputedField"":500},
                {""OriginalField"":1,   ""StaticField"":""X"",""CounterField"":1,""ComputedField"":1001},
                {""OriginalField"":100, ""StaticField"":""X"",""CounterField"":2,""ComputedField"":1100}
            ]
        ";
        var actualJson = ToJson(new WrappedDataReader(new SqlCommand(sql, conn).ExecuteReader(), additionalFields));
        Assert.Zero(CultureInfo.InvariantCulture.CompareInfo.Compare(expectedJson, actualJson, CompareOptions.IgnoreSymbols));
    }
}
private static string ToJson(IDataReader reader)
{
    using (var strWriter = new StringWriter(new StringBuilder()))
    using (var jsonWriter = new JsonTextWriter(strWriter))
    {
        jsonWriter.WriteStartArray();
        while (reader.Read())
        {
            jsonWriter.WriteStartObject();
            for (var i = 0; i < reader.FieldCount; i++)
            {
                jsonWriter.WritePropertyName(reader.GetName(i));
                jsonWriter.WriteValue(reader[i]);
            }
            jsonWriter.WriteEndObject();
        }
        jsonWriter.WriteEndArray();
        return strWriter.ToString();
    }
}

Datareader仅用于读取数据。您不能修改其架构或值

数据集/数据表就是为了这个目的。

我找到了另一个可能的解决方案,我目前正在使用:

  1. 从现有的DataReader创建以下对象:
    IEnumerable<object[]>-表示数据
    List<Column>—表示数据DataReader的列。

  2. 修改数据并添加额外的列

  3. 创建一个方法AsDataReader(IEnumerable<object[]> updatedData,updatedColumns List<Column>),该方法获取更新对象并返回*修改的DataReader

    • 我用收益率来提高业绩