最有效的多线程对象重复筛选(使用自定义键)

本文关键字:自定义 筛选 有效 多线程 对象 | 更新日期: 2023-09-27 18:26:03

我正在编写一个服务客户端,它从远程服务中调用包含单个记录的巨大分隔字符串。由于这些字符串的大小,我将远程服务调用划分为块(日期范围),并在日期范围上并行循环,以调用远程服务并解析数据。问题是,50%以上的记录是重复的,所以我想把它们过滤掉。。。

这是我目前的方法:

// We want to filter out duplicate markets by using the MarketId field...
HashSet<ParsedMarketData> exchangeFixtures = 
    new HashSet<ParsedMarketData>(
        new GenericEqualityComparer<ParsedMarketData, int>(pmd => pmd.MarketId));
DateTime[][] splitTimes = 
    SplitDateRange(startDate, endDate, TimeSpan.FromDays(1));
// Effectively a Tasks.Parallel.ForEach call...
_parallel.ForEach(splitTimes, startEndTime =>
{
    DateTime start = startEndTime[0];
    DateTime end = startEndTime[1];
    string marketDataString = remoteServiceProxy.GetMarketData(start, end);
    IEnumerable<ParsedMarketData> rows = 
        _marketDataParser.ParseMarketData(marketDataString);
    foreach (ParsedMarketData marketDataRow in rows)
    {
        lock (_syncObj)
        {
            // Ignore the return value as we don't care 
            // if it gets added or not...
            marketDataList.Add(exchangeFixture);
        }
    }
});

从根本上说,锁定的数据结构(发现重复数据)是解决这个问题的最有效方法吗?还是可以改进?

可能值得知道的是,大多数(95%以上)"重复"项目都发生在每个时间段内。也就是说,如果我们并行检索"A天"answers"B天",那么A天和B天之间不会有很多(或任何)重复(但每天都有很多重复——在我的解决方案中,每个线程都有)。

最有效的多线程对象重复筛选(使用自定义键)

您需要调整代码以利用数据和服务中的并发机会。听起来每天一个线程可能是一种选择。

事实上,看到改善应该是罕见的。多线程购买的是更多的cpu周期,而不是更多的互联网连接、网卡或服务机器。只有两个线程是最佳的可能性很高。一个从服务获取数据,另一个处理数据。允许这两个操作重叠,在它们之间有一个线程安全的生产者/消费者队列。只有当处理线程比数据检索线程需要更多的时间时,才能从更多的线程中获得好处。还有一个场景可以让你轻松地评测代码,你可以加快处理速度,但不能加快检索速度。你甚至不需要一个探查器来进行第一次评估。如果数据处理线程没有100%烧核,那么就完了。

考虑到数据,每个线程内重复的可能性很高(线程之间重复的可能性也很低),我决定采用以下解决方案,它允许每个线程在不受锁阻碍的情况下执行其任务,并在调用线程的最后进行一点过滤,以确保过滤正确完成。

它还有一个额外的好处,即从服务调用返回对象的顺序(日期顺序)是跨线程维护的,因此不需要在最后对其进行排序。

public IEnumerable<Stuff> GetStuffs(DateTime startDate, DateTime endDate)
{
    if (startDate >= endDate)
        throw new ArgumentException("startDate must be before endDate", "startDate");
    IDateRange dateRange = new DateRange(startDate, endDate);
    IDateRange[] dateRanges = _dateRangeSplitter.DivideRange(dateRange, TimeSpan.FromDays(1)).ToArray();
    IEnumerable<Stuff>[] resultCollections = new IEnumerable<Stuff>[dateRanges.Length];
    _parallel.For(0, dateRanges.Length, i =>
    {
        IDateRange splitRange = dateRanges[i];
        IEnumerable<Stuff> stuffs = GetMarketStuffs(splitRange);
        resultCollections[i] = stuffs;
    });
    Stuff[] marketStuffs = resultCollections.SelectMany(ef => ef).Distinct(ef => ef.EventId).ToArray();
    return marketStuffs;
}
private IEnumerable<Stuff> GetMarketStuffs(IDateRange splitRange)
{
    IList<Stuff> stuffs = new List<Stuff>();
    HashSet<int> uniqueStuffIds = new HashSet<int>();
    string marketStuffString = _slowStuffStringProvider.GetMarketStuffs(splitRange.Start, splitRange.End);
    IEnumerable<ParsedStuff> rows = _stuffParser.ParseStuffString(marketStuffString);
    foreach (ParsedStuff parsedStuff in rows)
    {
        if (!uniqueStuffIds.Add(parsedStuff.EventId))
        {
            continue;
        }
        stuffs.Add(new Stuff(parsedStuff));
    }
    return stuffs;
}