将数据集写入字符分隔文件的最快方法

本文关键字:方法 文件 分隔 数据集 字符 | 更新日期: 2023-09-27 18:23:43

这是迄今为止我发现的从Oracle数据库检索响应记录集并将其写入分隔文件的最快方法。更快会更好。请提出建议。

检索结果集:

using (var oracleConnection = new OracleConnection(ContextInfo.ConnectionString))
{
    oracleConnection.Open();
    try
    {
        using (var oracleCommand = new OracleCommand(extractToRun, OracleConnection))
        {
            oracleCommand.CommandType = CommandType.StoredProcedure;
            oracleCommand.BindByName = true;
            oracleCommand.FetchSize = oracleCommand.FetchSize * 128;
            oracleCommand.InitialLONGFetchSize = 5000;
            oracleCommand.Parameters.Add(refCursorOracleParameter);
            oracleCommand.Parameters.Add(startDateOracleParameter);
            oracleCommand.Parameters.Add(endDateOracleParameter);
            oracleCommand.Parameters.Add(jobIdOracleParameter);
            using (var oracleDataAdapter = new OracleDataAdapter(oracleCommand))
            {
                oracleDataAdapter.Fill(ds);
                return ds;
            }
        }
    }
    finally
    {
        oracleConnection.Close();
        oracleConnection.Dispose();
    }
}

处理数据并将其写入文件:

public static void ExportDataTableToDelimitedFile(DataTable table, string filename, string encloseWith, string delimiter, bool includeHeader, string fieldsToExclude, bool fixedLengthValues)
{
    String excludeList = String.Empty;
    if (!String.IsNullOrEmpty(fieldsToExclude))
    {
        excludeList = fieldsToExclude.ToUpper();
    }
    using (FileStream fs = new FileStream(filename, FileMode.Append, FileAccess.Write, FileShare.ReadWrite, 131072, FileOptions.None))
    {
        BinaryWriter sw = new BinaryWriter(fs);
        if (table.Rows.Count == 0)
        {
            sw.Write(String.Empty);
            sw.Close();
            sw.Dispose();
            return;
        }
        //Handle header
        if (includeHeader)
        {
            string header = String.Empty;
            String formattedHeader = String.Empty;
            foreach (DataColumn clm in table.Columns)
            {
                if (excludeList.Contains(clm.ColumnName.ToUpper()))
                    continue;
                if (clm.ColumnName.Length > 0)
                {
                    formattedHeader = String.Empty;
                    formattedHeader = encloseWith + clm.ColumnName + encloseWith;
                    if (header.Length > 0)
                        header = String.Join(delimiter, new string[] { header, formattedHeader });
                    else
                        header = formattedHeader;
                }
            }
            sw.Write(header);
        }
        // handle  values in data rows now
        Boolean hasEnlosedCharacter = !String.IsNullOrEmpty(encloseWith);
        ParallelOptions rowOptions = new ParallelOptions();
        rowOptions.MaxDegreeOfParallelism = Environment.ProcessorCount;
        Parallel.ForEach(table.Rows.Cast<DataRow>(), rowOptions, row =>
        {
            char[] rowValue = new char[8192];
            Int32 rowValueIndex = 0;
            string[] dcc = row.ItemArray.Select(field => field.ToString()).ToArray();
            foreach (String dc in dcc)
            {
                if (rowValueIndex > 0)
                {
                    if (!String.IsNullOrEmpty(dc) && hasEnlosedCharacter)
                    {
                        rowValue[rowValueIndex++] = delimiter[0];
                        rowValue[rowValueIndex++] = encloseWith[0];
                        foreach (char c in dc)
                        {
                            rowValue[rowValueIndex++] = c;
                        }
                        rowValue[rowValueIndex++] = encloseWith[0];
                    }
                    else
                    {
                        rowValue[rowValueIndex++] = delimiter[0];
                        foreach (char c in dc)
                        {
                            rowValue[rowValueIndex++] = c;
                        }
                    }
                }
                else
                {
                    if (!String.IsNullOrEmpty(dc) && hasEnlosedCharacter)
                    {
                        rowValue[rowValueIndex++] = encloseWith[0];
                        foreach (char c in dc)
                        {
                            rowValue[rowValueIndex++] = c;
                        }
                        rowValue[rowValueIndex++] = encloseWith[0];
                    }
                    else
                    {
                        foreach (char c in dc)
                        {
                            rowValue[rowValueIndex++] = c;
                        }
                    }
                }
            }
            rowValue[rowValueIndex++] = ''r';
            rowValue[rowValueIndex++] = ''n';
            lock (sw)
            {
                sw.Write(rowValue, 0, rowValueIndex);
            }
        });
        sw.Close();
        sw.Dispose();
        table.Dispose();
        fs.Close();
    }
}

我知道我应该重命名一些变量,并以同样的方式处理头(我不是在写头),所以这实际上是一个纯粹的逻辑问题,样式回答无助于提高性能。

令人费解的是网络性能。当它快速返回5个几千行的数据集时,它只使用了1.5%的带宽?我正在使用最新的ODP.Net(Oracle)对11g数据库。我尝试了Devarts提供商,它对我来说彻底失败了。

网络性能

处理器负载反映了Parallel.ForEach对数据表中的行的影响,这是一件好事。

处理器性能

将数据集写入字符分隔文件的最快方法

这是我能得到的最快的。

检索数据:

public static DataTable GetData(String extractToRun, DateTime startDate, DateTime endDate)
{
    //RefCursor
    OracleParameter refCursorOracleParameter = new OracleParameter
                                            {
                                                ParameterName = "pCursor",
                                                Direction = ParameterDirection.Output,
                                                OracleDbType = OracleDbType.RefCursor
                                            };
    OracleParameter startDateOracleParameter = new OracleParameter
    {
        ParameterName = "pStartDate",
        Direction = ParameterDirection.Input,
        OracleDbType = OracleDbType.Varchar2,
        Value =   startDate
    };
    OracleParameter endDateOracleParameter = new OracleParameter
    {
        ParameterName = "pEndDate",
        Direction = ParameterDirection.Input,
        OracleDbType = OracleDbType.Varchar2,
        Value =   endDate
    };
    OracleParameter jobIdOracleParameter = new OracleParameter
    {
        ParameterName = "pJobId",
        Direction = ParameterDirection.Input,                
        Value =   "123456"
    };
    using (var oracleConnection = new OracleConnection(ContextInfo.ConnectionString))
    {
        oracleConnection.Open();
        try
        {
            using (var oracleCommand = new OracleCommand(extractToRun, oracleConnection))
            {
                oracleCommand.CommandType = CommandType.StoredProcedure;
                oracleCommand.BindByName = true;
                oracleCommand.FetchSize = oracleCommand.FetchSize * 128;
                oracleCommand.InitialLONGFetchSize = 5000;
                oracleCommand.Parameters.Add(refCursorOracleParameter);
                oracleCommand.Parameters.Add(startDateOracleParameter);
                oracleCommand.Parameters.Add(endDateOracleParameter);
                oracleCommand.Parameters.Add(jobIdOracleParameter);
                using (OracleDataReader rdr = oracleCommand.ExecuteReader())
                {
                    rdr.FetchSize = rdr.RowSize * 65536;
                    DataTable dt = new DataTable();
                    dt.MinimumCapacity = 400000;
                    dt.BeginLoadData();
                    dt.Load(rdr, LoadOption.Upsert);
                    dt.EndLoadData();
                    rdr.Close();
                    rdr.Dispose();
                    oracleCommand.Dispose();
                    return dt;
                }
            }
        }
        finally
        {
            oracleConnection.Close();
            oracleConnection.Dispose();
        }
    }
}

处理数据:

public static void ExportDataTableToDelimitedFile(DataTable table, string filename, string encloseWith, string delimiter, bool includeHeader, string fieldsToExclude, bool fixedLengthValues)
{
    String excludeList = String.Empty;
    if (!String.IsNullOrEmpty(fieldsToExclude))
    {
        excludeList = fieldsToExclude.ToUpper();
    }
    using (FileStream fs = new FileStream(filename, FileMode.Append, FileAccess.Write, FileShare.ReadWrite, 2097152, FileOptions.None))
    {
        BinaryWriter sw = new BinaryWriter(fs);
        if (table.Rows.Count == 0)
        {
            sw.Write(String.Empty);
            sw.Close();
            sw.Dispose();
            return;
        }
        //Handle header
        if (includeHeader)
        {
            string header = String.Empty;
            String formattedHeader = String.Empty;
            foreach (DataColumn clm in table.Columns)
            {
                if (excludeList.Contains(clm.ColumnName.ToUpper()))
                    continue;
                if (clm.ColumnName.Length > 0)
                {
                    formattedHeader = String.Empty;
                    formattedHeader = encloseWith + clm.ColumnName + encloseWith;
                    if (header.Length > 0)
                        header = String.Join(delimiter, new string[] { header, formattedHeader });
                    else
                        header = formattedHeader;
                }
            }
            sw.Write(header);
        }
        // handle  values in data rows now
        Boolean hasEnlosedCharacter = !String.IsNullOrEmpty(encloseWith);
        Parallel.ForEach(table.Rows.Cast<DataRow>(), row =>
        {
            char[] rowValue = new char[8192];
            Int32 rowValueIndex = 0;
            char[][] rowData = row.ItemArray.Select(field => field.ToString().ToCharArray()).ToArray();
            for (int i = 0; i < rowData.Length; i++)
            {
                Boolean useEnclosed = rowData[i].Length > 0 && hasEnlosedCharacter;
                if (rowValueIndex > 0)
                {
                    if (useEnclosed)
                    {
                        rowValue[rowValueIndex++] = delimiter[0];
                        rowValue[rowValueIndex++] = encloseWith[0];
                        rowData[i].CopyTo(rowValue, rowValueIndex);
                        rowValueIndex += rowData[i].Length;
                        rowValue[rowValueIndex++] = encloseWith[0];
                    }
                    else
                    {
                        rowValue[rowValueIndex++] = delimiter[0];
                        rowData[i].CopyTo(rowValue, rowValueIndex);
                        rowValueIndex += rowData[i].Length;
                    }
                }
                else
                {
                    if (useEnclosed)
                    {
                        rowValue[rowValueIndex++] = encloseWith[0];
                        rowData[i].CopyTo(rowValue, rowValueIndex);
                        rowValueIndex += rowData[i].Length;
                        rowValue[rowValueIndex++] = encloseWith[0];
                    }
                    else
                    {
                        rowData[i].CopyTo(rowValue, rowValueIndex);
                        rowValueIndex += rowData[i].Length;
                    }
                }
            }
            rowValue[rowValueIndex++] = ''r';
            rowValue[rowValueIndex++] = ''n';
            lock (sw)
            {
                sw.Write(rowValue, 0, rowValueIndex);
            }
        });
        sw.Close();
        sw.Dispose();
        table.Dispose();
        fs.Close();
    }
}

有几个要点值得注意。使用Load将DataReader加载到DataTable比Dataset快40%。填充但不要将fetchsize设置为64K以上。之后性能下降。32K可能是最好的。字符数组甚至比StringBuilder快得多。在我看来,C#的缺陷在于我们不能拥有汇编程序子例程。我考虑过写一个C++dll,这样我就可以有一个汇编语言子程序来复制内存。那么我就不需要调用ToCharArray()了。当然,我还没有查看IL来了解ToCharArray()到底做了什么,但性能分析器指出这行代码占用了26%的时间。

令人惊讶的是,这些变化将网络利用率提高到4.5%(这对于公司网络上的单个PC来说是很高的),并将CPU利用率降低到80%左右,因为它现在主要在等待磁盘写入方法,而不是忙于复制字符串。

我没有显示原始代码,但将数据导出到管道分隔的文件需要13-15分钟。通过这些更改,导出完全相同的数据需要40-45秒。

我也没有显示原始DB查询中有七个查询,所有查询都联合在一起。我把它们拆开,这样我就可以并行运行了。性能修复需要作为一个整体来解决。许多以前试图解决这个问题的人都将注意力集中在DB上。没有人真正关注客户端,并试图确定真正的问题是什么

希望这对将来的某个人有所帮助。

好的!这是一个更好的答案!

public static List<ROW_DATA> GetData(String extractToRun, DateTime startDate, DateTime endDate)
{
    List<ROW_DATA> dataTable = new List<ROW_DATA>();
    //RefCursor
    OracleParameter refCursorOracleParameter = new OracleParameter
                                            {
                                                ParameterName = "pCursor",
                                                Direction = ParameterDirection.Output,
                                                OracleDbType = OracleDbType.RefCursor
                                            };
    OracleParameter startDateOracleParameter = new OracleParameter
    {
        ParameterName = "pStartDate",
        Direction = ParameterDirection.Input,
        OracleDbType = OracleDbType.Varchar2,
        Value =   startDate
    };
    OracleParameter endDateOracleParameter = new OracleParameter
    {
        ParameterName = "pEndDate",
        Direction = ParameterDirection.Input,
        OracleDbType = OracleDbType.Varchar2,
        Value =   endDate
    };
    OracleParameter jobIdOracleParameter = new OracleParameter
    {
        ParameterName = "pJobId",
        Direction = ParameterDirection.Input,                
        Value =   "123456"
    };
    using (var oracleConnection = new OracleConnection(ContextInfo.ConnectionString))
    {
        oracleConnection.Open();
        try
        {
            using (var oracleCommand = new OracleCommand(extractToRun, oracleConnection))
            {
                oracleCommand.CommandType = CommandType.StoredProcedure;
                oracleCommand.BindByName = true;
                oracleCommand.FetchSize = oracleCommand.FetchSize * 128;
                oracleCommand.InitialLONGFetchSize = 5000;
                oracleCommand.Parameters.Add(refCursorOracleParameter);
                oracleCommand.Parameters.Add(startDateOracleParameter);
                oracleCommand.Parameters.Add(endDateOracleParameter);
                oracleCommand.Parameters.Add(jobIdOracleParameter);
                using (OracleDataReader rdr = oracleCommand.ExecuteReader())
                {
                    //byte[] columnBytes = new byte[16384];
                    Int32 tryCount = 0;
                    rdr.FetchSize = rdr.RowSize * 262144;
                            while (rdr.Read())
                            {
                                Int32 charLength = (Int32)rdr.GetChars(0, 0, null, 0, 0);
                                char[] colChars = new char[charLength];
                                rdr.GetChars(0, 0, colChars, 0, charLength);
                                //OracleString colValue = rdr.GetOracleString(0);
                                //int valueLength = colValue.Length;
                                //unsafe
                                //{
                                //    fixed (char* pcolValue = colValue.Value)
                                //    {
                                //        fixed (byte* pcolBytes = columnBytes)
                                //        {
                                //            for (int i = 0; i < valueLength; i++)
                                //            {
                                //                pcolBytes[i] = (byte)pcolValue[i];
                                //            }
                                //        }
                                //    }
                                //}
                                ROW_DATA rowData = new ROW_DATA { length = charLength, rowValues = colChars };
                                dataTable.Add(rowData);
                        }
                    }
                    rdr.Close();
                    rdr.Dispose();
                    oracleCommand.Dispose();
                    return dataTable;
                }
            }
        }
        finally
        {
            oracleConnection.Close();
            oracleConnection.Dispose();
        }
    }
}

我故意在注释掉的代码中留下,以表明我甚至尝试了不安全的代码来将数据转换为我需要的格式。事实证明,GetChars按照我想要的方式返回它,这样我就可以简单地将它流式传输到磁盘。我的网络利用率高达11%,检索413K行并将其写入磁盘需要27秒。我还修改了存储过程以返回管道分隔的字符串,这样我在客户端只接收一列数据。它真的很快,但我有办法把时间减半。敬请关注。