并行处理-管理父/子依赖关系

本文关键字:依赖 关系 管理 并行处理 | 更新日期: 2023-09-27 18:03:45

我正在处理一个文本文件,其中包含Parallel.ForEach中的雇员及其家属。文件中的顺序不能保证。在每次迭代中,我都创建一个雇员或依赖对象。在ConcurrentDictionary中添加员工对象。依赖项是雇员的属性,需要与雇员关联。问题是,如果一个雇员不在字典中,当我试图添加从属词时,从属词永远不会被添加。我可以缓存这些"孤立的"依赖项,并在ForEach完成时添加它们,但我认为有更好的方法。

是否有某种方式我可以等待/旋转/加入我的Parallel.ForEach,直到员工被添加,然后添加依赖。我还没有完全接受这个解决方案,所以欢迎其他的选择。

以下是我的代码(为了简洁而编辑):
var cx = System.IO.File.ReadLines(path);
var _employeeDictionary = new ConcurrentDictionary<string, Employee>();
Parallel.ForEach(cx, _options, line =>
{
    Employee employee = null;
    switch (line.Substring(0, 2))
    {
        case EmployeeLine:
            // Employee is created and added to dictionary....
            _employeeDictionary.GetOrAdd(winID, employee);
            break;
        case DependentLine:
            // Dependent is created
            // WILL NOT BE ADDED IF THE EMPLOYEE HASN'T BEED ADDED YET
            if (_employeeDictionary.TryGetValue(dependent.EmpID, out employee))
            {
                lock (locker)
                {
                    employee.AddDependent(ci, dependent);
                }
            }
            break;
    }
});

并行处理-管理父/子依赖关系

是否有办法我可以等待/旋转/加入我的并行。ForEach

是的,但是它们都将降低您的性能并引入死锁的可能性。所以这不是正确的方法。

当然,最后一个if (Trygetvalue...))需要else分支,否则您将丢失数据。将Dependants存储在列表中,以便在第一个parallel.ForEach()之后进行处理。

然后你不妨把所有的依赖项都存储在那里,而不是在第一次运行时查找它们。更简单,甚至可能更快。

使并发工作对您有利而不是对您不利的关键是使您的代码在功能上更有效。不要用命令式术语来思考("对每一行都这样做"),而是考虑如何转换数据。并行地进行数据转换:如果不修改状态,则本质上是线程安全的。然后只对真正需要命令式编程的部分使用命令式编程。

var cx = System.IO.File.ReadAllLines(path);
var lineInfo = cx.AsParallel()
    .Select(line => new {
        lineCode = line.Substring(0, 2),
        line
    })
    .ToList()
    .AsParallel();
var employeeDictionary = lineInfo
    .Where(e => e.lineCode == EmployeeLine)
    .Select(e => ParseEmployee(e.line))
    .ToDictionary(e => e.winId);
var dependentLookup = lineInfo
    .Where(e => e.lineCode == DependentLine)
    .Select(e => ParseDependent(e.line))
    .ToLookup(d => d.EmpId);
Parallel.ForEach(employeeDictionary.Values, _options, employee => 
{
    foreach(var dependent in dependentLookup[employee.winId])
    {
        // It's even better if you can have an "AddDependents" method
        // to avoid the foreach, and leverage the efficiencies of "AddRange"-type
        // methods.
        employee.AddDependent(dependent);
    }
});

还值得注意的是,在这段代码中进行并行处理实际上可能并不值得。我建议在有和没有并行性的情况下对其进行基准测试,如果您没有看到明显的改进,请不要麻烦。

你拼出了你自己的答案。他们是孤儿,因为员工还不存在。你必须等到最后才能确保员工在那里。锁定会降低性能。只需将孤儿添加到并发列表中,并在初始文件读取完成后对该列表进行并行foreach。

假设您的行数超过了您正在使用的机器上的处理器数量,那么通过先执行父集,然后执行子集,不会显著降低性能。这甚至可以避免检查这些值的开销,特别是当您必须为这些选项锁定字典时。

另一个选项是实现一个unknowemployee,它派生自Employee,表示一个条目,它的依赖项是已知的,但雇员是未知的。当父类创建的依赖项未知时,创建一个unknowemployee条目并将其设置为依赖项。每当创建Employee时,检查其键是否已经存在,如果存在,确保在添加到字典之前复制Dependents。