将C#数据表大容量传输到postgresql表

本文关键字:postgresql 传输 大容量 数据表 | 更新日期: 2023-09-27 17:59:13

我有一个包含数千条记录的数据表。我有一个postgres表,它的字段与数据表的字段相同。我希望每天都截断这个表,然后用数据表的数据再次填充。我见过sql批量复制,但它在postgres上不可用。那么,哪种方法最有效呢?

  • 每条记录插入一个
  • 多次插入:插入表中的值(1,1)、(1,2)、(1,3)和(2,1)
  • 从数据表中选择并使用linq插入postgres?不知道

谢谢。

将C#数据表大容量传输到postgresql表

PostgreSQL确实有一个大容量副本(它实际上被称为copy),并且它有一个很好的.NET包装器。如果你正在加载,你想使用NpgsqlCopyIn,如果你正在提取数据,你可以使用NpgsqlCopyOut.

你的问题在细节上有点模糊——我不知道你的数据表中的字段,也不知道你实际数据库的任何信息,所以以这个为例,介绍如何使用C#/PostgreSQL将数据大容量插入表中:

    NpgsqlCopyIn copy = new NpgsqlCopyIn("copy table1 from STDIN WITH NULL AS '' CSV;",
        conn);
    copy.Start();
    NpgsqlCopySerializer cs = new NpgsqlCopySerializer(conn);
    cs.Delimiter = ",";
    foreach (var record in RecordList)
    {
        cs.AddString(record.UserId);
        cs.AddInt32(record.Age);
        cs.AddDateTime(record.HireDate);
        cs.EndRow();
    }
    cs.Close();
    copy.End();

--编辑8/27/2019-

Npgsql的构造已经完全改变。下面是上面相同示例的样板,使用二进制导入(文本也可用):

using (var writer = conn.BeginBinaryImport(
    "copy user_data.part_list from STDIN (FORMAT BINARY)"))
{
    foreach (var record in RecordList)
    {
        writer.StartRow();
        writer.Write(record.UserId);
        writer.Write(record.Age, NpgsqlTypes.NpgsqlDbType.Integer);
        writer.Write(record.HireDate, NpgsqlTypes.NpgsqlDbType.Date);
    }
    writer.Complete();
}

也许你可以查看我的另一个答案,我在其中描述了我为这个问题创建的一个小助手,让使用另一个助手变得非常容易:https://stackoverflow.com/a/46063313/6654362

编辑:我最近也遇到过类似的问题,但我们使用的是Postgresql。我想使用有效的填充剂,结果发现这很难。我还没有在这个数据库上找到任何合适的免费库。我只找到了这个助手:https://bytefish.de/blog/postgresql_bulk_insert/它也在Nuget上。我已经编写了一个小型映射器,它以实体框架的方式自动映射属性:

public static PostgreSQLCopyHelper<T> CreateHelper<T>(string schemaName, string tableName)
        {
            var helper = new PostgreSQLCopyHelper<T>(schemaName, "'"" + tableName + "'"");
            var properties = typeof(T).GetProperties();
            foreach(var prop in properties)
            {
                var type = prop.PropertyType;
                if (Attribute.IsDefined(prop, typeof(KeyAttribute)))
                    continue;
                switch (type)
                {
                    case Type intType when intType == typeof(int) || intType == typeof(int?):
                        {
                            helper = helper.MapInteger("'"" + prop.Name + "'"",  x => (int?)typeof(T).GetProperty(prop.Name).GetValue(x, null));
                            break;
                        }
                    case Type stringType when stringType == typeof(string):
                        {
                            helper = helper.MapText("'"" + prop.Name + "'"", x => (string)typeof(T).GetProperty(prop.Name).GetValue(x, null));
                            break;
                        }
                    case Type dateType when dateType == typeof(DateTime) || dateType == typeof(DateTime?):
                        {
                            helper = helper.MapTimeStamp("'"" + prop.Name + "'"", x => (DateTime?)typeof(T).GetProperty(prop.Name).GetValue(x, null));
                            break;
                        }
                    case Type decimalType when decimalType == typeof(decimal) || decimalType == typeof(decimal?):
                        {
                            helper = helper.MapMoney("'"" + prop.Name + "'"", x => (decimal?)typeof(T).GetProperty(prop.Name).GetValue(x, null));
                            break;
                        }
                    case Type doubleType when doubleType == typeof(double) || doubleType == typeof(double?):
                        {
                            helper = helper.MapDouble("'"" + prop.Name + "'"", x => (double?)typeof(T).GetProperty(prop.Name).GetValue(x, null));
                            break;
                        }
                    case Type floatType when floatType == typeof(float) || floatType == typeof(float?):
                        {
                            helper = helper.MapReal("'"" + prop.Name + "'"", x => (float?)typeof(T).GetProperty(prop.Name).GetValue(x, null));
                            break;
                        }
                    case Type guidType when guidType == typeof(Guid):
                        {
                            helper = helper.MapUUID("'"" + prop.Name + "'"", x => (Guid)typeof(T).GetProperty(prop.Name).GetValue(x, null));
                            break;
                        }
                }
            }
            return helper;
        }

我以以下方式使用它(我有一个名为"承诺"的实体):

var undertakingHelper = BulkMapper.CreateHelper<Model.Undertaking>("dbo", nameof(Model.Undertaking));
undertakingHelper.SaveAll(transaction.UnderlyingTransaction.Connection as Npgsql.NpgsqlConnection, undertakingsToAdd));

我展示了一个关于事务的示例,但它也可以使用从上下文检索的正常连接来完成。undertakingsToAdd是可枚举的普通实体记录,我想将其批量插入数据库。

经过几个小时的研究和尝试,我得到了这个解决方案,正如你所期望的那样,速度更快,最终易于使用且免费!我真的建议你使用这个解决方案,不仅是因为上面提到的原因,而且因为它是我唯一对Postgresql本身没有问题的解决方案,许多其他解决方案都可以完美地工作,例如SqlServer。

有一些选项可以批量插入PostgreSQL。

例如,在我的库中,我使用SQL Copy

COPY TableName (Column1, Column2, Column3) FROM STDIN BINARY

免责声明:我是Bulk-Operations.NET 项目的所有者

这个库使得执行任何类型的批量操作都非常容易:

  • 隔板插入件
  • BulkUpdate
  • BulkDelete
  • BulkMerge

在包括PostgreSQL 在内的多个数据库提供商中

// Easy to use
var bulk = new BulkOperation(connection);
bulk.BulkInsert(dt);
bulk.BulkUpdate(dt);
bulk.BulkDelete(dt);
bulk.BulkMerge(dt);

正如在其他答案中所说,没有内置的解决方案,只有一些辅助库(免费和非免费),我个人提出了自己的解决方案。这样做的优点是

  • 免费,易于使用
  • 不需要额外的映射设置,它重用DB本身和EF DbContext中的元数据
  • 使用dyamic代码构建来提高性能

用法如下:

var uploader = new NpgsqlBulkUploader(context);
var data = GetALotOfData();
uploader.Insert(data);
// OR
uploader.Update(data);

我在那里描述过

上述解决方案要求您指定列数及其类型,从而使代码表具有特定性。如果您的表相对较小,并且具有相同数量的列和相同/兼容的列类型,则可以用通用的方式来实现。假设您想将Sqlite表迁移到PosgreSql:

// Get data from SqlLite database table
SQLiteConnection sqliteConnection = new SQLiteConnection(new SQLiteConnectionStringBuilder() { DataSource = @"C:'dataBase.sqlite" }.ConnectionString);
sqliteConnection.Open();
var reader = new SQLiteCommand($"SELECT * from table_which_we_want_to_migrate").ExecuteReader();
var dataFromSqliteTable = new DataTable() { CaseSensitive = true };
dataFromSqliteTable.Load(reader);
// Connect to PostgreSql database
var connection = new NpgsqlConnection(new NpgsqlConnectionStringBuilder()
{
    Host = "localhost",
    Port = 5432,
    Database = "DatabaseName",
    Username = "UserName",
    Password = "Password"
}.ToString());
connection.Open();
// Insert every row from the Sqlite table into PostgreSql table
foreach (DataRow row in dataFromSqliteTable.Rows)
{
    // Create an NpgsqlParameter for every field in the column
    var parameters = new List<DbParameter>();
    for (var i = 0; i < dataFromSqliteTable.Columns.Count; i++)
    {
        parameters.Add(new NpgsqlParameter($"@p{i}", row[i]));
    }
    var parameterNames = string.Join(", ", parameters.Select(p => p.ParameterName));
    
    // Create an INSERT SQL query which inserts the data from the current row into PostgreSql table
    var command = new NpgsqlCommand(
        $"INSERT INTO table_which_we_want_to_migrate VALUES ({parameterNames})",
        connection);
    command.Parameters.AddRange(parameters.ToArray());
    command.ExecuteNonQuery();
}

另一种方法是使用命令行实用程序和通过CSV文件导入/导出。这种方式要快得多,甚至适用于大表:

sqlite3 dataBase.sqlite ".output 'temporaryFile.csv.tmp'" ".headers off" ".mode csv" "SELECT * FROM table_which_we_want_to_migrate;" ".quit"
psql --command="'copy table_which_we_want_to_migrate FROM 'temporaryFile.csv.tmp' DELIMITER ',' CSV"