使用Powershell从SQLDataReader中批量加载数据表
本文关键字:加载 数据表 Powershell SQLDataReader 使用 | 更新日期: 2023-09-27 18:03:45
我需要从SqlDataReader批量加载一个数据表。SqlDataReader将返回数百万条记录,而DataTable Load()方法将耗尽可用内存。
下面是我当前的代码:
[cmdletBinding( DefaultParameterSetName = 'Instance',
SupportsShouldProcess = $true,
ConfirmImpact = 'High' )]
Param (
[string] $SrcServer = "MySQLServer,12345",
[string] $SrcDatabase = "SourceDb",
[string] $SrcTable = "dbo.SourceTable",
[string] $SrcQuery = "SELECT TOP 100 HASHBYTES('SHA',stay_number) as stay_number_h, * FROM $SrcTable",
[string] $TgtServer,
[string] $TgtDatabase = "TargetDb",
[string] $TgtTable = "tmp.TargetTable",
[switch] $Truncate = $true
)
Function ConnectionString([string] $ServerName, [string] $DbName)
{
"Data Source=$ServerName;Initial Catalog=$DbName;Integrated Security=True;Connection Timeout=30"
}
########## Main body ############
Write-Host "Starting..."
If ($TgtServer.Length –eq 0) {
$TgtServer = $SrcServer
}
If ($TgtDatabase.Length –eq 0) {
$TgtDatabase = $SrcDatabase
}
If ($TgtTable.Length –eq 0) {
$TgtTable = $SrcTable
}
If ($Truncate) {
Write-Host "Truncating $TgtTable"
$TruncateSql = "TRUNCATE TABLE " + $TgtTable
Sqlcmd -S $TgtServer -d $TgtDatabase -Q $TruncateSql
}
$SrcConnStr = ConnectionString $SrcServer $SrcDatabase
$SrcConn = New-Object System.Data.SqlClient.SQLConnection($SrcConnStr)
$CmdText = $SrcQuery
$SqlCommand = New-Object system.Data.SqlClient.SqlCommand($CmdText, $SrcConn)
$SrcConn.Open()
[System.Data.SqlClient.SqlDataReader] $SqlReader = $SqlCommand.ExecuteReader()
# Can we convert the SqlReader to a DataTable?
$dtSchema = $SqlReader.GetSchemaTable()
$dt = New-Object System.Data.DataTable
if ($dtSchema -ne $null)
{
foreach ($drow in $dtSchema.Rows)
{
$columnName = $drow["ColumnName"]
$column = New-Object System.Data.DataColumn($columnName, $drow["DataType"])
$column.Unique = $drow["IsUnique"]
$column.AllowDBNull = $drow["AllowDBNull"]
$column.AutoIncrement = $drow["IsAutoIncrement"]
$dt.Columns.Add($column)
}
}
Write-Host "Now loading DataTable"
for ($i=0;$i -le 10; $i++) {
$i
$null = $dt.LoadDataRow($SqlReader,$true)
}
Write-Host "DataTable filled, how long and check memory consumption!"
sleep 30
$datatable.Clear()
Write-Host "Finished"
相关链接:CSV到SQL Server:
https://gallery.technet.microsoft.com/scriptcenter/Import-Large-CSVs-into-SQL-216223d9https://gallery.technet.microsoft.com/scriptcenter/Import-Large-CSVs-into-SQL-fa339046
https://blogs.technet.microsoft.com/heyscriptingguy/2011/05/06/use-powershell-to-copy-a-table-between-two-sql-server-instances/https://newsqlblog.com/2011/08/12/moving-data-between-sql-servers-with-powershell/https://raw.githubusercontent.com/RamblingCookieMonster/PowerShell/master/Invoke-SQLBulkCopy.ps1
理想情况下,我的最终解决方案将支持SS和外部文件作为导入(实际上任何可以实现为DataTable的文件)
不是Powershell,但这是我如何处理批处理从db:
它需要一个类的字段与你的表匹配,并使用SYSTEM以任何你喜欢的大小处理数据。反射:
public static void sql_Reader_To_Type(Type t, SqlDataReader r)
{
List<object> ret = new List<object>();
while (r.Read())
{
FieldInfo[] f = t.GetFields();
object o = Activator.CreateInstance(t);
for (int i = 0; i < f.Length; i++)
{
string thisType = f[i].FieldType.ToString();
switch (thisType)
{
case "System.String":
f[i].SetValue(o, Convert.ToString(r[f[i].Name]));
break;
case "System.Int16":
f[i].SetValue(o, Convert.ToInt16(r[f[i].Name]));
break;
case "System.Int32":
f[i].SetValue(o, Convert.ToInt32(r[f[i].Name]));
break;
case "System.Int64":
f[i].SetValue(o, Convert.ToInt64(r[f[i].Name]));
break;
case "System.Double":
// Console.WriteLine("converting " + f[i].Name + " to double");
double th;
if (r[f[i].Name] == null)
{
th = 0;
}
else
{
if (r[f[i].Name].GetType() == typeof(DBNull))
{
th = 0;
}
else
{
th = Convert.ToDouble(r[f[i].Name]);
}
}
f[i].SetValue(o, th);
break;
case "System.Boolean":
f[i].SetValue(o, Convert.ToInt32(r[f[i].Name]) == 1 ? true : false);
break;
case "System.DateTime":
f[i].SetValue(o, Convert.ToDateTime(r[f[i].Name]));
break;
default:
throw new Exception("Missed data type in sql select in getClassMembers class line 73");
}
}
ret.Add(o);
if (ret.Count == 50000)
{
//process data
ret.Clear();
}
}
if (ret.Count > 0)
{
//processdata
}
}
Create Table (firstName varchar(25), lastName varchar(50), birthday date)
class person{public string firstName; public string lastName; public DateTime birthday}
sql_Reader_To_Type(typeof(person), com.ExecuteReader());
如果你想避免使用泛型类型而使用datarrow:
public static void sql_Reader_To_DataTable_With_Buffer(Type t, SqlDataReader r)
{
DataTable dt = create_DataTable_From_Generic_Class(t);
while (r.Read())
{
FieldInfo[] f = t.GetFields();
object[] rowData = new object[dt.Columns.Count];
for (int i = 0; i < f.Length; i++)
{
string thisType = f[i].FieldType.ToString();
switch (thisType)
{
case "System.String":
rowData[i] = Convert.ToString(r[f[i].Name]);
break;
case "System.Int16":
rowData[i] = Convert.ToInt16(r[f[i].Name]);
break;
case "System.Int32":
rowData[i] = Convert.ToInt32(r[f[i].Name]);
break;
case "System.Int64":
rowData[i] = Convert.ToInt64(r[f[i].Name]);
break;
case "System.Double":
// Console.WriteLine("converting " + f[i].Name + " to double");
rowData[i] = Convert.ToDouble(r[f[i].Name]);
break;
case "System.Boolean":
rowData[i] = Convert.ToInt32(r[f[i].Name]) == 1 ? true : false;
break;
case "System.DateTime":
rowData[i] = Convert.ToDateTime(r[f[i].Name]);
break;
default:
throw new Exception("Missed data type in sql select in getClassMembers class line 73");
}
}
dt.Rows.Add(rowData);
if (dt.Rows.Count == 50000)
{
//process table (dt);
dt.Rows.Clear();
}
}
if (dt.Rows.Count > 0)
{
//processTable (dt);
}
}
com.CommandText = "select top 10000000000000000 * from myHugeTable";
SqlDataReader = com.ExecuteDataReader();
sql_Reader_To_DataTable_With_Buffer(typeof(person),read);