设置一组任务,每次只运行X个任务

本文关键字:任务 运行 一组 设置 | 更新日期: 2023-09-27 17:49:43

假设我有100个任务,每个任务耗时10秒。现在我想一次只运行10个任务,比如当这10个任务中的一个完成时,另一个任务被执行,直到所有任务都完成。

现在我总是使用ThreadPool.QueueUserWorkItem()来完成这样的任务,但我读到这样做是不好的做法,我应该使用任务来代替。

我的问题是,我没有找到一个很好的例子为我的场景,所以你能告诉我如何实现这个目标与任务?

设置一组任务,每次只运行X个任务

SemaphoreSlim maxThread = new SemaphoreSlim(10);
for (int i = 0; i < 115; i++)
{
    maxThread.Wait();
    Task.Factory.StartNew(() =>
        {
            //Your Works
        }
        , TaskCreationOptions.LongRunning)
    .ContinueWith( (task) => maxThread.Release() );
}

TPL Dataflow非常适合做这样的事情。你可以很容易地创建一个100%异步版本的Parallel.Invoke:

async Task ProcessTenAtOnce<T>(IEnumerable<T> items, Func<T, Task> func)
{
    ExecutionDataflowBlockOptions edfbo = new ExecutionDataflowBlockOptions
    {
         MaxDegreeOfParallelism = 10
    };
    ActionBlock<T> ab = new ActionBlock<T>(func, edfbo);
    foreach (T item in items)
    {
         await ab.SendAsync(item);
    }
    ab.Complete();
    await ab.Completion;
}

您有几个选择。您可以使用Parallel.Invoke作为启动器:

public void DoWork(IEnumerable<Action> actions)
{
    Parallel.Invoke(new ParallelOptions() { MaxDegreeOfParallelism = 10 }
        , actions.ToArray());
}

这里有一个替代选项,它将更加努力地运行正好10个任务(尽管线程池中处理这些任务的线程数量可能不同),并且返回一个Task指示何时完成,而不是阻塞直到完成。

public Task DoWork(IList<Action> actions)
{
    List<Task> tasks = new List<Task>();
    int numWorkers = 10;
    int batchSize = (int)Math.Ceiling(actions.Count / (double)numWorkers);
    foreach (var batch in actions.Batch(actions.Count / 10))
    {
        tasks.Add(Task.Factory.StartNew(() =>
        {
            foreach (var action in batch)
            {
                action();
            }
        }));
    }
    return Task.WhenAll(tasks);
}

如果你没有MoreLinq,对于Batch函数,这里是我更简单的实现:

public static IEnumerable<IEnumerable<T>> Batch<T>(this IEnumerable<T> source, int batchSize)
{
    List<T> buffer = new List<T>(batchSize);
    foreach (T item in source)
    {
        buffer.Add(item);
        if (buffer.Count >= batchSize)
        {
            yield return buffer;
            buffer = new List<T>();
        }
    }
    if (buffer.Count >= 0)
    {
        yield return buffer;
    }
}

你可以创建一个这样的方法:

public static async Task RunLimitedNumberAtATime<T>(int numberOfTasksConcurrent, 
    IEnumerable<T> inputList, Func<T, Task> asyncFunc)
{
    Queue<T> inputQueue = new Queue<T>(inputList);
    List<Task> runningTasks = new List<Task>(numberOfTasksConcurrent);
    for (int i = 0; i < numberOfTasksConcurrent && inputQueue.Count > 0; i++)
        runningTasks.Add(asyncFunc(inputQueue.Dequeue()));
    while (inputQueue.Count > 0)
    {
        Task task = await Task.WhenAny(runningTasks);
        runningTasks.Remove(task);
        runningTasks.Add(asyncFunc(inputQueue.Dequeue()));
    }
    await Task.WhenAll(runningTasks);
}

你可以调用任何异步方法n次,限制如下:

Task task = RunLimitedNumberAtATime(10,
    Enumerable.Range(1, 100),
    async x =>
    {
        Console.WriteLine($"Starting task {x}");
        await Task.Delay(100);
        Console.WriteLine($"Finishing task {x}");
    });

或者如果你想运行长时间运行的非异步方法,你可以这样做:

Task task = RunLimitedNumberAtATime(10,
    Enumerable.Range(1, 100),
    x => Task.Factory.StartNew(() => {
        Console.WriteLine($"Starting task {x}");
        System.Threading.Thread.Sleep(100);
        Console.WriteLine($"Finishing task {x}");
    }, TaskCreationOptions.LongRunning));

也许在框架的某个地方有类似的方法,但是我还没有找到。

我想使用我能想到的最简单的解决方案,因为我认为使用TPL:

string[] urls={};
Parallel.ForEach(urls, new ParallelOptions() { MaxDegreeOfParallelism = 2}, url =>
{
   //Download the content or do whatever you want with each URL
});