异步循环
本文关键字:循环 异步 | 更新日期: 2023-09-27 18:35:56
我有一个非常棒的SqlDataReader包装器,我可以在其中将输出映射到强类型列表中。
我现在发现,在具有大量列的较大数据集上,如果我可以优化映射,性能可能会更好一些。
在考虑这个问题时,我特别关注一个部分,因为它似乎是最重的击球手
我真正想知道的是,是否有一种方法可以使这个循环异步? 我觉得这只会让这个野兽改变世界:)
这是整个Map
方法,以防有人可以看到我可以在哪些方面对其进行进一步改进......
IList<T> Map<T>
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Data.Common;
using System.Data.SqlClient;
using System.Linq;
using System.Reflection;
using System.Threading.Tasks;
namespace o7th.Class.Library.Data
{
public class WrapperTest
{
public static string Message { set { _Msg = value; } get { return _Msg; } }
private static string _Msg;
// Instantiate our caching methods
internal static Common.CustomCache _Cache = new Common.CustomCache();
private static IEnumerable<T> Map<T>(SqlDataReader dr) where T : new()
{
var enumerableDataReader = dr.Cast<DbDataRecord>().AsEnumerable();
var tObj = new T();
PropertyInfo[] propertyInfo = tObj.GetType().GetProperties();
var batches = enumerableDataReader.Batch(10000);
var resultCollection = new ConcurrentBag<List<T>>();
Parallel.ForEach(batches, batch => resultCollection.Add(MapThis<T>(propertyInfo, batch)));
return resultCollection.SelectMany(m => m.Select(x => x));
}
private static List<T> MapThis<T>(PropertyInfo[] propertyInfo, IEnumerable<DbDataRecord> batch) where T : new()
{
var list = new List<T>();
batch.AsParallel().ForAll(record =>
{
T obj = new T();
foreach (PropertyInfo prop in propertyInfo)
{
var dbVal = record[prop.Name];
if (!Equals(dbVal, DBNull.Value))
{
prop.SetValue(obj, dbVal, null);
}
}
list.Add(obj);
});
return list;
}
public static IEnumerable<T> GetResults<T>(string _Qry, System.Data.CommandType _QryType,
string[] _ParamNames = null,
object[] _ParamVals = null,
System.Data.SqlDbType[] _ParamDTs = null,
bool _ShouldCache = false,
string _CacheID = "") where T : new()
{
// Create a reference to a potential already cached IList
IEnumerable<T> _CachedItem = _Cache.Get<IEnumerable<T>>(_CacheID);
// If we're already cached, there's no need to fire up the data access objects, so return the cached item instead
if (_CachedItem != null && _ShouldCache)
{
return _CachedItem;
}
else
{
// Fire up our data access object
using (Access db = new Access())
{
try
{
// create a new ilist reference of our strongly typed class
IEnumerable<T> _Query = null;
// set the query type
db.QueryType = _QryType;
// set the query text
db.Query = _Qry;
// make sure we've got some parameters, if we do the set them to our db access object
if (_ParamNames != null)
{
// set the parameter names
db.ParameterNames = _ParamNames;
// set the parameter values
db.ParameterValues = _ParamVals;
// set the parameter data types
db.ParameterDataTypes = _ParamDTs;
}
// start using our db access :) Fire off the GetResults method and return back a SqlDataReader to work on
using (SqlDataReader r = db.GetResults())
{
// make sure the data reader actually exists and contains some results
if (r != null && r.HasRows)
{
// map the data reader to our strongly type(s)
_Query = Map<T>(r);
}
}
// check if we should cache the results
if (_ShouldCache)
{
// if so, set the query object to the cache
_Cache.Set<IEnumerable<T>>(_Query, _CacheID);
}
// return our strongly typed list
return _Query;
}
catch (Exception ex)
{
// Catch an exception if any, an write it out to our logging mechanism, in addition to adding it our returnable message property
_Msg += "Wrapper.GetResults Exception: " + ex.Message + db.Message;
ErrorReporting.WriteEm.WriteItem(ex, "o7th.Class.Library.Data.Wrapper.GetResults", _Msg);
// make sure this method returns a default List
return default(IList<T>);
}
}
}
}
}
public static class Extensions
{
/// <summary>
/// Take a collection and split it into smaller collections
/// </summary>
/// <typeparam name="T">The Type</typeparam>
/// <param name="collection">The collection to split</param>
/// <param name="batchSize">The size of each batch</param>
/// <returns></returns>
public static IEnumerable<IEnumerable<T>> Batch<T>(this IEnumerable<T> collection, int batchSize)
{
var nextbatch = new List<T>(batchSize);
if (collection == null)
{
yield break;
}
foreach (T item in collection)
{
nextbatch.Add(item);
if (nextbatch.Count != batchSize)
{
continue;
}
yield return nextbatch;
nextbatch = new List<T>(batchSize);
}
if (nextbatch.Count > 0)
{
yield return nextbatch;
}
}
}
}
db.GetResults()
是一个简单的ExecuteReader,通过使用SqlClient.SqlDataReader
附言 这是我的第一个 c# 项目。 我是一个长期的基本/qbasic/vb 程序员 =)
这是我的测试控制台应用:
测试
using o7th.Class.Library.Data;
using System;
using System.Collections.Generic;
using System.Threading;
namespace Testing
{
class Program
{
static void Main(string[] args)
{
long startTime = DateTime.Now.Ticks;
IList<Typing> _T = Wrapper.GetResults<Typing>("List.ZipSearch",
System.Data.CommandType.StoredProcedure,
new string[]{"@ZipCode", "@RadiusMile"},
new object[] { "01020", 10000 },
new System.Data.SqlDbType[] { System.Data.SqlDbType.VarChar, System.Data.SqlDbType.Float},
true, "TestCache1");
long endTime = DateTime.Now.Ticks;
TimeSpan timeTaken = new TimeSpan(endTime - startTime);
Console.WriteLine("Task Took: " + timeTaken + " for: " + _T.Count + " records.");
Thread.Sleep(2000);
long startTime2 = DateTime.Now.Ticks;
IEnumerable<Typing> _T2 = WrapperTest.GetResults<Typing>("List.ZipSearch",
System.Data.CommandType.StoredProcedure,
new string[] { "@ZipCode", "@RadiusMile" },
new object[] { "01020", 10000 },
new System.Data.SqlDbType[] { System.Data.SqlDbType.VarChar, System.Data.SqlDbType.Float },
true, "TestCache2");
long endTime2 = DateTime.Now.Ticks;
TimeSpan timeTaken2 = new TimeSpan(endTime2 - startTime2);
Console.WriteLine("Task Took: " + timeTaken2 + " for: " + _T2 + " records.");
Console.WriteLine("");
Console.WriteLine("Press any key to continue...");
Console.ReadKey();
}
partial class Typing {
public long ZipID { get; set; }
public string ZipCode { get; set; }
public string City { get; set; }
public string State { get; set; }
public string County { get; set; }
public double Mileage { get; set; }
}
}
}
如果我使用该代码,我会做的一个小更改是将你的 if 更改为仅在需要时使用 PropertyInfo
进行设置(newObject 已经是默认值(T)):
if ((info != null) && info.CanWrite && !(_Rdr.GetValue(i) is DBNull))
{
info.SetValue(newObject, _Rdr.GetValue(i), null);
break;
}
这将为您节省对 default(T) 的额外调用,并且还可以节省您用自己的默认值覆盖 newObject。 这是一个微小的优化。 另外,您看到多次覆盖 newObject,所以这让我觉得您的if
只为真一次,所以我添加了一个中断来为您节省额外的枚举,假设数据集很大,这也可以节省一些时间。
怎么样?
var readerValue = _Rdr.GetValue(i);
if ((info != null) && info.CanWrite && !(readerValue is DBNull))
{
info.SetValue(newObject, readerValue, null);
break;
}
*编辑以添加更多代码。
不确定这是否会改善情况:
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Linq;
namespace ConsoleApplication1
{
internal class Program
{
private static readonly SqlToObjectReflectionMappingService MappingService = new SqlToObjectReflectionMappingService();
private static void Main(string[] args)
{
// Call ConvertTable here...
}
private static IEnumerable<T> ConvertTable<T>(DataTable dataTable) where T : new()
{
return MappingService.DataTableToObjects<T>(dataTable);
}
public class SqlToObjectReflectionMappingService : ISqlToObjectMappingService
{
public T DataRowToObject<T>(DataRow row, PropertyDescriptorCollection propertyDescriptorCollection)
where T : new()
{
var obj = new T();
foreach (PropertyDescriptor propertyDescriptor in propertyDescriptorCollection)
{
propertyDescriptor.SetValue(obj, row[propertyDescriptor.Name]);
}
return obj;
}
public IEnumerable<T> DataTableToObjects<T>(DataTable table) where T : new()
{
var obj = new T();
var props = TypeDescriptor.GetProperties(obj);
return table.AsEnumerable().AsParallel().Select(m => DataRowToObject<T>(m, props));
}
}
public interface ISqlToObjectMappingService
{
T DataRowToObject<T>(DataRow row, PropertyDescriptorCollection propertyDescriptorCollection) where T : new();
IEnumerable<T> DataTableToObjects<T>(DataTable table) where T : new();
}
}
}
*编辑以添加更多代码。
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Data.Common;
using System.Data.SqlClient;
using System.Linq;
using System.Reflection;
using System.Threading.Tasks;
namespace ConsoleApplication1
{
internal class Program
{
private static void Main(string[] args)
{
// Call ConvertTable here
}
private static IEnumerable<T> Map<T>(SqlDataReader dr) where T : new()
{
var enumerableDataReader = dr.Cast<DbDataRecord>().AsEnumerable();
var tObj = new T();
PropertyInfo[] propertyInfo = tObj.GetType().GetProperties();
var batches = enumerableDataReader.Batch(10000);
var resultCollection = new ConcurrentBag<List<T>>();
Parallel.ForEach(batches, batch => resultCollection.Add(MapThis<T>(propertyInfo, batch)));
return resultCollection.SelectMany(m => m.Select(x => x));
}
private static List<T> MapThis<T>(PropertyInfo[] propertyInfo, IEnumerable<DbDataRecord> batch) where T : new()
{
var list = new List<T>();
batch.AsParallel().ForAll(record =>
{
T obj = new T();
foreach (PropertyInfo prop in propertyInfo)
{
var dbVal = record[prop.Name];
if (!Equals(dbVal, DBNull.Value))
{
prop.SetValue(obj, dbVal, null);
}
}
list.Add(obj);
});
return list;
}
}
public static class Extensions
{
/// <summary>
/// Take a collection and split it into smaller collections
/// </summary>
/// <typeparam name="T">The Type</typeparam>
/// <param name="collection">The collection to split</param>
/// <param name="batchSize">The size of each batch</param>
/// <returns></returns>
public static IEnumerable<IEnumerable<T>> Batch<T>(this IEnumerable<T> collection, int batchSize)
{
var nextbatch = new List<T>(batchSize);
if (collection == null)
{
yield break;
}
foreach (T item in collection)
{
nextbatch.Add(item);
if (nextbatch.Count != batchSize)
{
continue;
}
yield return nextbatch;
nextbatch = new List<T>(batchSize);
}
if (nextbatch.Count > 0)
{
yield return nextbatch;
}
}
}
}
- 另一种方法
您是否知道每次调用String.ToUpper()
时,您都在创建一个新字符串只是为了丢弃?对于每条记录?
我想你使用的是HashTable
,你可能会更好:
_ht = new Dictionary<string, PropertyInfo>(StringComparer.OrdinalIgnoreCase);
然后你可以像这样使用它:
PropertyInfo info = _ht[_Rdr.GetName(i)];
如果要并行化,可能需要查看Parallel.For
或Parallel.ForEach
。
但所有这些都无法避免大量使用反射。
但我真正认为你应该做的是构建一个映射器(并且可能缓存它)。
如果不想执行发出 IL 的路线,则可能需要使用表达式树:
- 使用表达式树对对象进行补位 - 第一部分
- 使用表达式树水化对象 - 第二部分
- 使用表达式树水化对象 - 第三部分