如何使用异步/并行处理迭代地执行深度优先搜索

本文关键字:执行 深度优先搜索 迭代 并行处理 何使用 异步 | 更新日期: 2023-09-27 18:19:10

这里有一个方法,它执行DFS搜索并返回给定顶级项目id的所有项目的列表。我如何修改它以利用并行处理呢?目前,获取子项的调用是针对堆栈中的每个子项逐个进行的。如果我能同时获得堆栈中多个项目的子项目,并更快地填充返回列表,那就太好了。我该如何以线程安全的方式做到这一点(使用async/await或TPL或其他任何东西)?

private async Task<IList<Item>> GetItemsAsync(string topItemId)
{
    var items = new List<Item>();   
    var topItem = await GetItemAsync(topItemId);
    Stack<Item> stack = new Stack<Item>();           
    stack.Push(topItem);
    while (stack.Count > 0)
    {
        var item = stack.Pop();
        items.Add(item);                   
        var subItems = await GetSubItemsAsync(item.SubId);
        foreach (var subItem in subItems)
        {
            stack.Push(subItem);
        }
    }
    return items;   
}

编辑:我一直在想这方面的东西,但没有结合在一起:

var tasks = stack.Select(async item =>
{
    items.Add(item);           
    var subItems = await GetSubItemsAsync(item.SubId);
    foreach (var subItem in subItems)
    {
        stack.Push(subItem);
    }   
}).ToList();
if (tasks.Any())
    await Task.WhenAll(tasks);

更新:如果我想把任务分批处理,像这样的方法可以吗?

foreach (var batch in items.BatchesOf(100))
{
    var tasks = batch.Select(async item =>
    {
        await DoSomething(item);
    }).ToList();
    if (tasks.Any())
    {
        await Task.WhenAll(tasks);
    }
}  

我使用的语言是c#

如何使用异步/并行处理迭代地执行深度优先搜索

这里有一个方法,您可以使用它来异步地并行遍历树:

public static async Task<IEnumerable<T>> TraverseAsync<T>(
    this IEnumerable<T> source,
    Func<T, Task<IEnumerable<T>>> childSelector)
{
    var results = new ConcurrentBag<T>();
    Func<T, Task> foo = null;
    foo = async next =>
    {
        results.Add(next);
        var children = await childSelector(next);
        await Task.WhenAll(children.Select(child => foo(child)));
    };
    await Task.WhenAll(source.Select(child => foo(child)));
    return results;
}

该方法需要一个方法来异步获取每个节点的子节点,您已经有了这个方法。它不会在特殊情况下生成根节点,所以如果您想使用该方法,您必须将它们置于该方法的作用域之外,并将它们作为该方法的第一个参数提供。

调用代码可能看起来像这样:

var allNodes = await new[]{await GetItemAsync(topItemId)}
    .TraverseAsync(item => GetSubItemsAsync(item.SubId));

该方法并行地、异步地获取每个节点的子节点,当它们全部完成时,将自己标记为完成。然后,每个节点递归地并行计算其子节点的所有

你提到你担心使用递归,因为它会消耗堆栈空间,但这不是一个问题,因为方法是异步的。每次你在递归中移动一层,方法就不会在堆栈上再移动一层;相反,它只是将递归方法调用安排在稍后的时间点运行,因此每个级别总是从堆栈上的一个固定点开始。


如果您正在寻找一种限制并行性数量的方法,因为担心会有太多,我建议您先尝试一下。如果您将这里的所有调用都指向线程池,那么线程池本身可能会根据它认为可能表现最佳的并行度有一个上限。它只会停止创建更多的线程,并在某个点之后将待处理的项目保留在队列中,并且线程池比您更有可能拥有有效的算法来确定适当的并行度。也就是说,如果您迫切需要人为地限制线程池之外的并行性,那么当然有一些方法。一种选择是创建自己的同步上下文,人为地将挂起操作的数量限制在某个固定数量:

public class FixedDegreeSynchronizationContext : SynchronizationContext
{
    private SemaphoreSlim semaphore;
    public FixedDegreeSynchronizationContext(int maxDegreeOfParallelism)
    {
        semaphore = new SemaphoreSlim(maxDegreeOfParallelism,
            maxDegreeOfParallelism);
    }
    public override async void Post(SendOrPostCallback d, object state)
    {
        await semaphore.WaitAsync().ConfigureAwait(false);
        try
        {
            base.Send(d, state);
        }
        finally
        {
            semaphore.Release();
        }
    }
    public override void Send(SendOrPostCallback d, object state)
    {
        semaphore.Wait();
        try
        {
            base.Send(d, state);
        }
        finally
        {
            semaphore.Release();
        }
    }
}

你可以创建一个上下文的实例,在调用TraverseAsync之前将其设置为当前上下文,或者创建另一个接受maxDegreesOfParallelism的重载,并在方法中设置上下文。

另一种变化是限制调用子选择器的次数,而不限制这里正在进行的任何其他异步操作的数量。(其他的都不应该特别昂贵,所以我不认为这有什么关系,但这确实值得尝试。)要做到这一点,我们可以创建一个任务队列,它以固定的并行度处理给定的项目,但这不会人为地限制未传递给该队列的任何内容。队列本身非常简单,作为同步上下文的直接变体:

public class FixedParallelismQueue
{
    private SemaphoreSlim semaphore;
    public FixedParallelismQueue(int maxDegreesOfParallelism)
    {
        semaphore = new SemaphoreSlim(maxDegreesOfParallelism,
            maxDegreesOfParallelism);
    }
    public async Task<T> Enqueue<T>(Func<Task<T>> taskGenerator)
    {
        await semaphore.WaitAsync();
        try
        {
            return await taskGenerator();
        }
        finally
        {
            semaphore.Release();
        }
    }
    public async Task Enqueue(Func<Task> taskGenerator)
    {
        await semaphore.WaitAsync();
        try
        {
            await taskGenerator();
        }
        finally
        {
            semaphore.Release();
        }
    }
}

在这里,当调用该方法时,您可以将此队列用作子选择器的一部分:

ar taskQueue = new FixedParallelismQueue(degreesOfParallelism);
var allNodes = await new[]{await GetItemAsync(topItemId)}
    .TraverseAsync(item => 
        taskQueue.Enqueue(() => GetSubItemsAsync(item.SubId)));

你为什么把异步和任务混在一起?似乎其中一个就足够了。

private void MainFunction(int id)
{
  var main = await GetItemAsync(id);
  await PopulateChildren(main);
}
private async void PopulateChildren(Item parent)
{
  var children = GetChildren(Item parent);
  foreach(var child in children) 
  { 
    parent.ChildCollection.Add(child); 
    PopulateChildren(child); 
  }
}
private IEnumerable<Item> GetChildren(Item parent)
{
  // I/O code
}

如果每个项都不保留其子项,那么您只需将它们保存在不同的集合中而不是父集合中。,但是代码应该是一样的。这并不一定是深度优先,但我不确定这是一个要求,因为并行你不能保证一个特定的顺序。


好的,让我们试试没有递归的东西…不过会是伪代码

ConcurrentBag allItems;
ConcurrentBag itemsToProcess = new ConcurrentBag(initial);
// As long as it has an item...
while(itemsToProcess.TryPeek())
{
  var tasksCurrentlyProcessing;
  // Process all the items
  while(itemsToProcess.TryTake())
  {
    tasksCurrentlyProcessing = item.GetChildren();
  }
  Task.WaitAll(tasksCurrentlyProcessing);
}
public void Task GetChildren()
{
  // get children, add to allItems and itemsToProcess
}

这是一个BFS而不是DFS。不确定这是否适合你。我能想到的唯一方法就是不用递归也不用把它弄得很复杂