将数据集写入字符分隔文件的最快方法
本文关键字:方法 文件 分隔 数据集 字符 | 更新日期: 2023-09-27 18:23:43
这是迄今为止我发现的从Oracle数据库检索响应记录集并将其写入分隔文件的最快方法。更快会更好。请提出建议。
检索结果集:
using (var oracleConnection = new OracleConnection(ContextInfo.ConnectionString))
{
oracleConnection.Open();
try
{
using (var oracleCommand = new OracleCommand(extractToRun, OracleConnection))
{
oracleCommand.CommandType = CommandType.StoredProcedure;
oracleCommand.BindByName = true;
oracleCommand.FetchSize = oracleCommand.FetchSize * 128;
oracleCommand.InitialLONGFetchSize = 5000;
oracleCommand.Parameters.Add(refCursorOracleParameter);
oracleCommand.Parameters.Add(startDateOracleParameter);
oracleCommand.Parameters.Add(endDateOracleParameter);
oracleCommand.Parameters.Add(jobIdOracleParameter);
using (var oracleDataAdapter = new OracleDataAdapter(oracleCommand))
{
oracleDataAdapter.Fill(ds);
return ds;
}
}
}
finally
{
oracleConnection.Close();
oracleConnection.Dispose();
}
}
处理数据并将其写入文件:
public static void ExportDataTableToDelimitedFile(DataTable table, string filename, string encloseWith, string delimiter, bool includeHeader, string fieldsToExclude, bool fixedLengthValues)
{
String excludeList = String.Empty;
if (!String.IsNullOrEmpty(fieldsToExclude))
{
excludeList = fieldsToExclude.ToUpper();
}
using (FileStream fs = new FileStream(filename, FileMode.Append, FileAccess.Write, FileShare.ReadWrite, 131072, FileOptions.None))
{
BinaryWriter sw = new BinaryWriter(fs);
if (table.Rows.Count == 0)
{
sw.Write(String.Empty);
sw.Close();
sw.Dispose();
return;
}
//Handle header
if (includeHeader)
{
string header = String.Empty;
String formattedHeader = String.Empty;
foreach (DataColumn clm in table.Columns)
{
if (excludeList.Contains(clm.ColumnName.ToUpper()))
continue;
if (clm.ColumnName.Length > 0)
{
formattedHeader = String.Empty;
formattedHeader = encloseWith + clm.ColumnName + encloseWith;
if (header.Length > 0)
header = String.Join(delimiter, new string[] { header, formattedHeader });
else
header = formattedHeader;
}
}
sw.Write(header);
}
// handle values in data rows now
Boolean hasEnlosedCharacter = !String.IsNullOrEmpty(encloseWith);
ParallelOptions rowOptions = new ParallelOptions();
rowOptions.MaxDegreeOfParallelism = Environment.ProcessorCount;
Parallel.ForEach(table.Rows.Cast<DataRow>(), rowOptions, row =>
{
char[] rowValue = new char[8192];
Int32 rowValueIndex = 0;
string[] dcc = row.ItemArray.Select(field => field.ToString()).ToArray();
foreach (String dc in dcc)
{
if (rowValueIndex > 0)
{
if (!String.IsNullOrEmpty(dc) && hasEnlosedCharacter)
{
rowValue[rowValueIndex++] = delimiter[0];
rowValue[rowValueIndex++] = encloseWith[0];
foreach (char c in dc)
{
rowValue[rowValueIndex++] = c;
}
rowValue[rowValueIndex++] = encloseWith[0];
}
else
{
rowValue[rowValueIndex++] = delimiter[0];
foreach (char c in dc)
{
rowValue[rowValueIndex++] = c;
}
}
}
else
{
if (!String.IsNullOrEmpty(dc) && hasEnlosedCharacter)
{
rowValue[rowValueIndex++] = encloseWith[0];
foreach (char c in dc)
{
rowValue[rowValueIndex++] = c;
}
rowValue[rowValueIndex++] = encloseWith[0];
}
else
{
foreach (char c in dc)
{
rowValue[rowValueIndex++] = c;
}
}
}
}
rowValue[rowValueIndex++] = ''r';
rowValue[rowValueIndex++] = ''n';
lock (sw)
{
sw.Write(rowValue, 0, rowValueIndex);
}
});
sw.Close();
sw.Dispose();
table.Dispose();
fs.Close();
}
}
我知道我应该重命名一些变量,并以同样的方式处理头(我不是在写头),所以这实际上是一个纯粹的逻辑问题,样式回答无助于提高性能。
令人费解的是网络性能。当它快速返回5个几千行的数据集时,它只使用了1.5%的带宽?我正在使用最新的ODP.Net(Oracle)对11g数据库。我尝试了Devarts提供商,它对我来说彻底失败了。
网络性能
处理器负载反映了Parallel.ForEach对数据表中的行的影响,这是一件好事。
处理器性能
这是我能得到的最快的。
检索数据:
public static DataTable GetData(String extractToRun, DateTime startDate, DateTime endDate)
{
//RefCursor
OracleParameter refCursorOracleParameter = new OracleParameter
{
ParameterName = "pCursor",
Direction = ParameterDirection.Output,
OracleDbType = OracleDbType.RefCursor
};
OracleParameter startDateOracleParameter = new OracleParameter
{
ParameterName = "pStartDate",
Direction = ParameterDirection.Input,
OracleDbType = OracleDbType.Varchar2,
Value = startDate
};
OracleParameter endDateOracleParameter = new OracleParameter
{
ParameterName = "pEndDate",
Direction = ParameterDirection.Input,
OracleDbType = OracleDbType.Varchar2,
Value = endDate
};
OracleParameter jobIdOracleParameter = new OracleParameter
{
ParameterName = "pJobId",
Direction = ParameterDirection.Input,
Value = "123456"
};
using (var oracleConnection = new OracleConnection(ContextInfo.ConnectionString))
{
oracleConnection.Open();
try
{
using (var oracleCommand = new OracleCommand(extractToRun, oracleConnection))
{
oracleCommand.CommandType = CommandType.StoredProcedure;
oracleCommand.BindByName = true;
oracleCommand.FetchSize = oracleCommand.FetchSize * 128;
oracleCommand.InitialLONGFetchSize = 5000;
oracleCommand.Parameters.Add(refCursorOracleParameter);
oracleCommand.Parameters.Add(startDateOracleParameter);
oracleCommand.Parameters.Add(endDateOracleParameter);
oracleCommand.Parameters.Add(jobIdOracleParameter);
using (OracleDataReader rdr = oracleCommand.ExecuteReader())
{
rdr.FetchSize = rdr.RowSize * 65536;
DataTable dt = new DataTable();
dt.MinimumCapacity = 400000;
dt.BeginLoadData();
dt.Load(rdr, LoadOption.Upsert);
dt.EndLoadData();
rdr.Close();
rdr.Dispose();
oracleCommand.Dispose();
return dt;
}
}
}
finally
{
oracleConnection.Close();
oracleConnection.Dispose();
}
}
}
处理数据:
public static void ExportDataTableToDelimitedFile(DataTable table, string filename, string encloseWith, string delimiter, bool includeHeader, string fieldsToExclude, bool fixedLengthValues)
{
String excludeList = String.Empty;
if (!String.IsNullOrEmpty(fieldsToExclude))
{
excludeList = fieldsToExclude.ToUpper();
}
using (FileStream fs = new FileStream(filename, FileMode.Append, FileAccess.Write, FileShare.ReadWrite, 2097152, FileOptions.None))
{
BinaryWriter sw = new BinaryWriter(fs);
if (table.Rows.Count == 0)
{
sw.Write(String.Empty);
sw.Close();
sw.Dispose();
return;
}
//Handle header
if (includeHeader)
{
string header = String.Empty;
String formattedHeader = String.Empty;
foreach (DataColumn clm in table.Columns)
{
if (excludeList.Contains(clm.ColumnName.ToUpper()))
continue;
if (clm.ColumnName.Length > 0)
{
formattedHeader = String.Empty;
formattedHeader = encloseWith + clm.ColumnName + encloseWith;
if (header.Length > 0)
header = String.Join(delimiter, new string[] { header, formattedHeader });
else
header = formattedHeader;
}
}
sw.Write(header);
}
// handle values in data rows now
Boolean hasEnlosedCharacter = !String.IsNullOrEmpty(encloseWith);
Parallel.ForEach(table.Rows.Cast<DataRow>(), row =>
{
char[] rowValue = new char[8192];
Int32 rowValueIndex = 0;
char[][] rowData = row.ItemArray.Select(field => field.ToString().ToCharArray()).ToArray();
for (int i = 0; i < rowData.Length; i++)
{
Boolean useEnclosed = rowData[i].Length > 0 && hasEnlosedCharacter;
if (rowValueIndex > 0)
{
if (useEnclosed)
{
rowValue[rowValueIndex++] = delimiter[0];
rowValue[rowValueIndex++] = encloseWith[0];
rowData[i].CopyTo(rowValue, rowValueIndex);
rowValueIndex += rowData[i].Length;
rowValue[rowValueIndex++] = encloseWith[0];
}
else
{
rowValue[rowValueIndex++] = delimiter[0];
rowData[i].CopyTo(rowValue, rowValueIndex);
rowValueIndex += rowData[i].Length;
}
}
else
{
if (useEnclosed)
{
rowValue[rowValueIndex++] = encloseWith[0];
rowData[i].CopyTo(rowValue, rowValueIndex);
rowValueIndex += rowData[i].Length;
rowValue[rowValueIndex++] = encloseWith[0];
}
else
{
rowData[i].CopyTo(rowValue, rowValueIndex);
rowValueIndex += rowData[i].Length;
}
}
}
rowValue[rowValueIndex++] = ''r';
rowValue[rowValueIndex++] = ''n';
lock (sw)
{
sw.Write(rowValue, 0, rowValueIndex);
}
});
sw.Close();
sw.Dispose();
table.Dispose();
fs.Close();
}
}
有几个要点值得注意。使用Load将DataReader加载到DataTable比Dataset快40%。填充但不要将fetchsize设置为64K以上。之后性能下降。32K可能是最好的。字符数组甚至比StringBuilder快得多。在我看来,C#的缺陷在于我们不能拥有汇编程序子例程。我考虑过写一个C++dll,这样我就可以有一个汇编语言子程序来复制内存。那么我就不需要调用ToCharArray()了。当然,我还没有查看IL来了解ToCharArray()到底做了什么,但性能分析器指出这行代码占用了26%的时间。
令人惊讶的是,这些变化将网络利用率提高到4.5%(这对于公司网络上的单个PC来说是很高的),并将CPU利用率降低到80%左右,因为它现在主要在等待磁盘写入方法,而不是忙于复制字符串。
我没有显示原始代码,但将数据导出到管道分隔的文件需要13-15分钟。通过这些更改,导出完全相同的数据需要40-45秒。
我也没有显示原始DB查询中有七个查询,所有查询都联合在一起。我把它们拆开,这样我就可以并行运行了。性能修复需要作为一个整体来解决。许多以前试图解决这个问题的人都将注意力集中在DB上。没有人真正关注客户端,并试图确定真正的问题是什么
希望这对将来的某个人有所帮助。
好的!这是一个更好的答案!
public static List<ROW_DATA> GetData(String extractToRun, DateTime startDate, DateTime endDate)
{
List<ROW_DATA> dataTable = new List<ROW_DATA>();
//RefCursor
OracleParameter refCursorOracleParameter = new OracleParameter
{
ParameterName = "pCursor",
Direction = ParameterDirection.Output,
OracleDbType = OracleDbType.RefCursor
};
OracleParameter startDateOracleParameter = new OracleParameter
{
ParameterName = "pStartDate",
Direction = ParameterDirection.Input,
OracleDbType = OracleDbType.Varchar2,
Value = startDate
};
OracleParameter endDateOracleParameter = new OracleParameter
{
ParameterName = "pEndDate",
Direction = ParameterDirection.Input,
OracleDbType = OracleDbType.Varchar2,
Value = endDate
};
OracleParameter jobIdOracleParameter = new OracleParameter
{
ParameterName = "pJobId",
Direction = ParameterDirection.Input,
Value = "123456"
};
using (var oracleConnection = new OracleConnection(ContextInfo.ConnectionString))
{
oracleConnection.Open();
try
{
using (var oracleCommand = new OracleCommand(extractToRun, oracleConnection))
{
oracleCommand.CommandType = CommandType.StoredProcedure;
oracleCommand.BindByName = true;
oracleCommand.FetchSize = oracleCommand.FetchSize * 128;
oracleCommand.InitialLONGFetchSize = 5000;
oracleCommand.Parameters.Add(refCursorOracleParameter);
oracleCommand.Parameters.Add(startDateOracleParameter);
oracleCommand.Parameters.Add(endDateOracleParameter);
oracleCommand.Parameters.Add(jobIdOracleParameter);
using (OracleDataReader rdr = oracleCommand.ExecuteReader())
{
//byte[] columnBytes = new byte[16384];
Int32 tryCount = 0;
rdr.FetchSize = rdr.RowSize * 262144;
while (rdr.Read())
{
Int32 charLength = (Int32)rdr.GetChars(0, 0, null, 0, 0);
char[] colChars = new char[charLength];
rdr.GetChars(0, 0, colChars, 0, charLength);
//OracleString colValue = rdr.GetOracleString(0);
//int valueLength = colValue.Length;
//unsafe
//{
// fixed (char* pcolValue = colValue.Value)
// {
// fixed (byte* pcolBytes = columnBytes)
// {
// for (int i = 0; i < valueLength; i++)
// {
// pcolBytes[i] = (byte)pcolValue[i];
// }
// }
// }
//}
ROW_DATA rowData = new ROW_DATA { length = charLength, rowValues = colChars };
dataTable.Add(rowData);
}
}
rdr.Close();
rdr.Dispose();
oracleCommand.Dispose();
return dataTable;
}
}
}
finally
{
oracleConnection.Close();
oracleConnection.Dispose();
}
}
}
我故意在注释掉的代码中留下,以表明我甚至尝试了不安全的代码来将数据转换为我需要的格式。事实证明,GetChars按照我想要的方式返回它,这样我就可以简单地将它流式传输到磁盘。我的网络利用率高达11%,检索413K行并将其写入磁盘需要27秒。我还修改了存储过程以返回管道分隔的字符串,这样我在客户端只接收一列数据。它真的很快,但我有办法把时间减半。敬请关注。