以有限的并发性并行执行
本文关键字:并行执行 并发 | 更新日期: 2023-09-27 18:18:42
我想在Silverlight 5中并行执行异步操作,限制并发性。
我的代码是:
public async void btn_click(object s, RoutedEventArgs e)
{
await DoAllWork();
}
private async Task DoAllWork()
{
//Get work to do
int[] wrk = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
//Start the tasks
Task[] tasks = new Task[wrk.Length];
for (int i = 0; i < wrk.Length; i++)
{
int item = wrk[i];
tasks[i] = WorkSingleItem(item);
}
await TaskEx.WhenAll(tasks);
}
private async Task WorkSingleItem(int item)
{
//a very long operation
var response = await Request(item);
await Handle(response);
}
我找到了这篇文章:http://msdn.microsoft.com/en-us/library/ee789351(v=vs.110).aspx
我怎么能等待我的工作方法,开始我所有的长操作与"有限的并发调度程序",并与每个项目的工作不依赖于同步上下文,以避免代码在UI线程执行…
由于您的长操作以异步方式处理I/O,并且有限并发性的目的是避免DDoS,因此TaskScheduler
是一个不正确的解决方案。这是因为TaskScheduler
只控制活动任务(运行或阻塞);当一个任务通过await
返回到它的调度程序时,它不再被认为是"活动的"。因此,如果您的I/O是异步的,则TaskScheduler
不能用于防止DDoS。
正确的解决方案是使用async
兼容的信号量:
public async void btn_click(object s, RoutedEventArgs e)
{
await Task.Run(() => DoAllWork());
}
private async Task DoAllWork()
{
int[] wrk = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
var semaphore = new AsyncSemaphore(4);
var tasks = wrk.Select(x => WorkSingleItem(x, semaphore));
await TaskEx.WhenAll(tasks);
}
private async Task WorkSingleItem(int item, AsyncSemaphore semaphore)
{
await semaphore.WaitAsync();
try
{
var response = await Request(item);
await Handle(response);
}
finally
{
semaphore.Release();
}
}
您可以创建一种类型的队列,它将只允许在同一时间执行特定数量的给定任务:
public class FixedParallelismQueue
{
private SemaphoreSlim semaphore;
public FixedParallelismQueue(int maxDegreesOfParallelism)
{
semaphore = new SemaphoreSlim(maxDegreesOfParallelism,
maxDegreesOfParallelism);
}
public async Task<T> Enqueue<T>(Func<Task<T>> taskGenerator)
{
await semaphore.WaitAsync();
try
{
return await taskGenerator();
}
finally
{
semaphore.Release();
}
}
public async Task Enqueue(Func<Task> taskGenerator)
{
await semaphore.WaitAsync();
try
{
await taskGenerator();
}
finally
{
semaphore.Release();
}
}
}
你可以这样写:
private Task DoAllWork()
{
int[] work = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
var queue = new FixedParallelismQueue(maxDegreesOfParallelism);
var tasks = work.Select(n => queue.Enqueue(() => WorkSingleItem(n));
return TaskEx.WhenAll(tasks);
}