以有限的并发性并行执行

本文关键字:并行执行 并发 | 更新日期: 2023-09-27 18:18:42

我想在Silverlight 5中并行执行异步操作,限制并发性。

我的代码是:

    public async void btn_click(object s, RoutedEventArgs e)
    {
        await DoAllWork();
    }
    private async Task DoAllWork()
    {
        //Get work to do
        int[] wrk = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
        //Start the tasks
        Task[] tasks = new Task[wrk.Length];
        for (int i = 0; i < wrk.Length; i++)
        {
            int item = wrk[i];
            tasks[i] = WorkSingleItem(item);
        }
        await TaskEx.WhenAll(tasks);
    }
    private async Task WorkSingleItem(int item)
    {
        //a very long operation
        var response = await Request(item);
        await Handle(response);
    }

我找到了这篇文章:http://msdn.microsoft.com/en-us/library/ee789351(v=vs.110).aspx

我怎么能等待我的工作方法,开始我所有的长操作与"有限的并发调度程序",并与每个项目的工作不依赖于同步上下文,以避免代码在UI线程执行…

以有限的并发性并行执行

由于您的长操作以异步方式处理I/O,并且有限并发性的目的是避免DDoS,因此TaskScheduler是一个不正确的解决方案。这是因为TaskScheduler只控制活动任务(运行或阻塞);当一个任务通过await返回到它的调度程序时,它不再被认为是"活动的"。因此,如果您的I/O是异步的,则TaskScheduler不能用于防止DDoS。

正确的解决方案是使用async兼容的信号量:

public async void btn_click(object s, RoutedEventArgs e)
{
  await Task.Run(() => DoAllWork());
}
private async Task DoAllWork()
{
  int[] wrk = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
  var semaphore = new AsyncSemaphore(4);
  var tasks = wrk.Select(x => WorkSingleItem(x, semaphore));
  await TaskEx.WhenAll(tasks);
}
private async Task WorkSingleItem(int item, AsyncSemaphore semaphore)
{
  await semaphore.WaitAsync();
  try
  {
    var response = await Request(item);
    await Handle(response);
  }
  finally
  {
    semaphore.Release();
  }
}

您可以创建一种类型的队列,它将只允许在同一时间执行特定数量的给定任务:

public class FixedParallelismQueue
{
    private SemaphoreSlim semaphore;
    public FixedParallelismQueue(int maxDegreesOfParallelism)
    {
        semaphore = new SemaphoreSlim(maxDegreesOfParallelism,
            maxDegreesOfParallelism);
    }
    public async Task<T> Enqueue<T>(Func<Task<T>> taskGenerator)
    {
        await semaphore.WaitAsync();
        try
        {
            return await taskGenerator();
        }
        finally
        {
            semaphore.Release();
        }
    }
    public async Task Enqueue(Func<Task> taskGenerator)
    {
        await semaphore.WaitAsync();
        try
        {
            await taskGenerator();
        }
        finally
        {
            semaphore.Release();
        }
    }
}

你可以这样写:

private Task DoAllWork()
{
    int[] work = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
    var queue = new FixedParallelismQueue(maxDegreesOfParallelism);
    var tasks = work.Select(n => queue.Enqueue(() => WorkSingleItem(n));
    return TaskEx.WhenAll(tasks);
}