OracleBulkCopy可以将数据从我的进程地址空间移动到表中

本文关键字:移动 地址空间 进程 我的 数据 OracleBulkCopy | 更新日期: 2023-09-27 18:11:09

我希望加速一些大规模的ETL操作,这些操作从格式奇怪的平面文件和更奇怪的XML数据存储中读取和转换一堆东西,然后将许多记录插入Oracle dbms。我用的是ODP。. NET 11g在c#/dotnet 4.

OracleBulkCopy类似乎是一个不错的尝试。问题是,它想从IDataReader实例或IDataRecord实例数组(即一些其他dbms查询的结果集)读取其数据。

是否有一个简单的方法来包装我的数据行在IDataReader或IDataRecord类的数组,在内存中,所以我可以把它们推到OracleBulkCopy?

我发现的例子显示数据从一个dbms迁移到另一个。但是,我试图避免将这些项目一个接一个地写入dbms,以便我可以批量加载它们。我宁愿使用IDataReader等效的内存流。谢谢。

OracleBulkCopy可以将数据从我的进程地址空间移动到表中

我有一个类似的问题,我在使用Link2Sql时发现了性能问题。我找到了一个使用反射和扩展方法的解决方案,我从ColumnAttribute中读取列数据。

public static void Insert<T>(this OracleBulkCopy copy, IEnumerable<T> entities)
    where T : class
{
    var properties = typeof(T).GetProperties();
    var columnNames = new List<string>();
    var columnPropertyNames = new List<string>();
    foreach (var propertyInfo in properties)
    {
        var attribute = Attribute.GetCustomAttribute(propertyInfo, typeof(ColumnAttribute)) as ColumnAttribute;
        if (attribute != null)
        {
            columnNames.Add(attribute.Name);
            columnPropertyNames.Add(propertyInfo.Name);
        }
    }
    copy.ColumnMappings.Clear();
    foreach (var columnName in columnNames)
    {
        copy.ColumnMappings.Add(columnName, columnName);
    }
    var table = new DataTable(copy.DestinationTableName);
    foreach (var columnMapping in copy.ColumnMappings)
    {
        table.Columns.Add(((OracleBulkCopyColumnMapping)columnMapping).DestinationColumn);
    }
    foreach (var entity in entities)
    {
        var row = table.NewRow();
        for (var i = 0; i < columnNames.Count; i++)
        {
            var value = typeof (T).GetProperty(columnPropertyNames[i]).GetValue(entity, null);
            if (value is DateTime)
                value = new OracleDate((DateTime) value);
            row[columnNames[i]] = value;
        }
        table.Rows.Add(row);
    }
    copy.WriteToServer(table);
}

实体看起来像:

public class SomeEntity
{
  [Column(Name = "Id")]
  public string Id { get; set; }
  //more columns
}

使用它就像:

using (var connection = CreateDbConnection())
{
    connection.Open();
    using (var copy = new OracleBulkCopy(connection))
    {
        copy.DestinationTableName = tableName;
        copy.BulkCopyTimeout = DefaultTimeoutInSeconds;
        copy.BatchSize = BatchSize;
        copy.Insert(entities);
    }
}

你也可以创建一个"实体阅读器",这将更加"内存友好"。让我提醒你,反射真的很慢,所以,也许你应该考虑使用lambda表达式解决方案,它会构建和编译所有的属性getter,然后,对于每一行,只调用它。

我没有测试,因为我没有oracle 11g像这样:

public static class GetterGenerator<T>
    {
        private readonly static Type type = typeof(T);
        private readonly static Dictionary<string, Func<T, object>> getters = Generate();
        private readonly static PropertyInfo[] properties = type.GetProperties().ToArray();
        private static Dictionary<string, Func<T, object>> Generate()
        {
            PropertyInfo[] properties = type.GetProperties().ToArray();
            int propertiesLength = properties.Length;
            var props = new Dictionary<string, Func<T, object>>(propertiesLength);
            for (int i = 0; i < propertiesLength; i++)
            {
                ParameterExpression p = LambdaExpression.Parameter(typeof(T), "p");
                LambdaExpression l = LambdaExpression.Lambda<Func<T, object>>(
                    LambdaExpression.Convert(
                        LambdaExpression.Property(p, properties[i]), typeof(object)
                    ), p);
                props.Add(properties[i].Name, l.Compile() as Func<T, object>);
            }
            return props;
        }
        public static Dictionary<string, Func<T, object>> GettersDictionary
        {
            get { return getters; }
        }
        public static Func<T, object>[] Getters
        {
            get { return getters.Values.ToArray(); }
        }
        public static PropertyInfo[] Properties 
        {
            get { return properties;}
        }
    }
    public class EntityReader<T> : IDataReader
    {
        private Func<T, object>[] getters = GetterGenerator<T>.Getters;
        private Dictionary<string, Func<T, object>> gettersDictionary = GetterGenerator<T>.GettersDictionary;
        private PropertyInfo[] properties = GetterGenerator<T>.Properties;
        private IEnumerator<T> enumerator;
        private int affected = 0;
        public EntityReader(IEnumerable<T> enumerable)
        {
            enumerator = enumerable.GetEnumerator();
        }
        #region IDataReader Members
        public void Close()
        {
        }
        public int Depth
        {
            get { return 0; }
        }
        public DataTable GetSchemaTable()
        {
            throw new NotImplementedException();
        }
        public bool IsClosed
        {
            get { return false; }
        }
        public bool NextResult()
        {
            return false;
        }
        public bool Read()
        {
            bool read = enumerator.MoveNext();
            if (read)
                affected++;
            return read;
        }
        public int RecordsAffected
        {
            get { return affected; }
        }
        #endregion
        #region IDisposable Members
        public void Dispose()
        {
            properties = null;
            getters = null;
            gettersDictionary = null;
            enumerator = null;
            affected = 0;
        }
        #endregion
        #region IDataRecord Members
        public int FieldCount
        {
            get { return getters.Length; }
        }
        private Y GetValue<Y>(int i)
        {
            try
            {
                return (Y)GetValue(i);
            }
            catch (InvalidCastException)
            {
                throw new InvalidCastException(string.Format("Invalid cast from '{0}' to '{1}'", GetFieldType(i), typeof(Y)));
            }
        }
        public bool GetBoolean(int i)
        {
            return GetValue<bool>(i);
        }
        public byte GetByte(int i)
        {
            return GetValue<byte>(i);
        }
        public long GetBytes(int i, long fieldOffset, byte[] buffer, int bufferoffset, int length)
        {
            throw new NotImplementedException();
        }
        public char GetChar(int i)
        {
            return GetValue<char>(i);
        }
        public long GetChars(int i, long fieldoffset, char[] buffer, int bufferoffset, int length)
        {
            throw new NotImplementedException();
        }
        public IDataReader GetData(int i)
        {
            throw new NotImplementedException();
        }
        public string GetDataTypeName(int i)
        {
            return GetFieldType(i).Name;
        }
        public DateTime GetDateTime(int i)
        {
            return GetValue<DateTime>(i);
        }
        public decimal GetDecimal(int i)
        {
            return GetValue<decimal>(i);
        }
        public double GetDouble(int i)
        {
            return GetValue<double>(i);
        }
        public Type GetFieldType(int i)
        {
            return properties[i].PropertyType;
        }
        public float GetFloat(int i)
        {
            return GetValue<float>(i);
        }
        public Guid GetGuid(int i)
        {
            return GetValue<Guid>(i);
        }
        public short GetInt16(int i)
        {
            return GetValue<short>(i);
        }
        public int GetInt32(int i)
        {
            return GetValue<int>(i);
        }
        public long GetInt64(int i)
        {
            return GetValue<long>(i);
        }
        public string GetName(int i)
        {
            return properties[i].Name;
        }
        public int GetOrdinal(string name)
        {
            bool found = false;
            int i = 0;
            int propertiesLength = properties.Length;
            while (!found && i < propertiesLength)
                found = properties[i++].Name == name;
            if (!found)
                i = -1;
            return i;
        }
        public string GetString(int i)
        {
            return GetValue<string>(i);
        }
        public object GetValue(int i)
        {
            return getters[i](enumerator.Current);
        }
        public int GetValues(object[] values)
        {
            int length = Math.Min(values.Length, getters.Length);
            for (int i = 0; i < length; i++)
                values[i] = getters[i](enumerator.Current);
            return length;
        }
        public bool IsDBNull(int i)
        {
            return GetValue(i) == null;
        }
        public object this[string name]
        {
            get { return GetValue(GetOrdinal(name)); }
        }
        public object this[int i]
        {
            get { return GetValue(i); }
        }
        #endregion
    }
    public static void Insert<T>(this OracleBulkCopy copy, IEnumerable<T> entities, Dictionary<string, string> ColumnMap) 
        where T : class
    {
        copy.ColumnMappings.Clear();
        foreach (var map in ColumnMap)
            copy.ColumnMappings.Add(map.Key, map.Value);
        IDataReader reader = new EntityReader<T>(entities);
        copy.WriteToServer(reader);
    }