当使用LimitedConcurrencyLevelTaskScheduler时,Continuation任务挂起

本文关键字:Continuation 任务 挂起 LimitedConcurrencyLevelTaskScheduler | 更新日期: 2023-09-27 18:15:42

我一直致力于在c#中使用TPL。NET 4.0)。

我已经创建了一个自定义API来简化web请求的创建和下载内容(异步的,使用延续任务)。那部分工作得很好。

当我尝试使用延迟任务创建的LimitedConcurrencyLevelTaskScheduler(在并行编程示例和任务的MSDN文档中找到)时,我遇到的问题发生了。如果您不熟悉这个类,它所做的就是将调度任务的并发程度限制为任意数量。

基本上,我想把web请求任务链的创建推迟到LimitedConcurrencyLevelTaskScheduler调度的任务中,这样我就可以限制并发下载的数量。

正如圣人Stephen Toub所建议的,当推迟Task的创建时,最好的方法是设计您的API以返回Func<Task>Func<Task<TResult>>。我已经这样做了。

不幸的是,我的程序在调度第一组并发任务后挂起。假设我将我的任务限制为4级并发性。在这种情况下,将启动4个任务,然后程序将挂起。这些任务永远不会完成。

我已经创建了一个最小的例子来简单地说明这个问题。我使用文件读取而不是使用WebRequest。我将并发度限制为1。
class Program
{
    static Func<Task> GetReadTask()
    {
        return () =>
        {
            Console.WriteLine("Opening file.");
            FileStream fileStream = File.Open("C:''Users''Joel''Desktop''1.txt", FileMode.Open);
            byte[] buffer = new byte[32];
            Console.WriteLine("Beginning read.");
            return Task<int>.Factory.FromAsync(fileStream.BeginRead, fileStream.EndRead, buffer, 0, 32, null).ContinueWith(task => fileStream.Close());
        };
    }
    static void Main()
    {
        LimitedConcurrencyLevelTaskScheduler ts = new LimitedConcurrencyLevelTaskScheduler(1);
        TaskFactory factory = new TaskFactory(ts);
        int[] range = {1, 2, 3};
        var tasks = range.Select(number =>
        {
            Func<Task> getTask = GetReadTask();
            return factory.StartNew(() =>
            {
                var task = getTask();
                task.Wait();
            });
        });
        Task.WaitAll(tasks.ToArray());
    }
}

为了澄清我所说的"它挂起"是什么意思,下面是输出的样子。

Opening file.
Beginning read.

然后不打印其他内容…永远。

有什么线索吗?

当使用LimitedConcurrencyLevelTaskScheduler时,Continuation任务挂起

好问题!

首先,我不确定LimitedConcurrencyLevelTaskScheduler是学术上正确的解决方案。为了将并发请求的数量限制在N,您必须阻塞N个任务,这从一开始就违背了使用APM异步调用的目的。

话虽如此,它比另一种方法更容易实现。您需要有一个工作队列,并记录正在运行的请求的数量,然后根据需要创建工作任务。如果N个并发请求的数量很小,那么有N个阻塞线程并不是世界末日。

所以,代码的问题是在其他任务中创建的任务使用来自父任务的调度程序。实际上,用FromAsync创建的任务不是这样的,因为它们使用底层APM实现,所以有点不同。

Main中创建任务:

return factory.StartNew( () =>
    {
        var task = getTask();
        task.Wait();
    }
);

factory使用LimitedConcurrencyLevelTaskScheduler( 1 ),因此这些任务中只有一个可以并发执行,而另一个正在等待getTask()返回的任务。

所以在GetReadTask中调用Task<int>.Factory.FromAsync。这是因为FromAsync不尊重父任务的调度程序。

然后使用.ContinueWith(task => fileStream.Close())创建延续。这将创建一个尊重其父进程调度程序的任务。由于LimitedConcurrencyLevelTaskScheduler已经在执行一个任务(Main中被阻塞的任务),因此continuation无法运行,并且出现死锁。

解决方案是使用TaskScheduler.Default在普通线程池线程上运行延续。然后它开始运行,死锁被打破。

这是我的解决方案:

static Task QueueReadTask( TaskScheduler ts, int number )
{
    Output.Write( "QueueReadTask( " + number + " )" );
    return Task.Factory.StartNew( () =>
        {
            Output.Write( "Opening file " + number + "." );
            FileStream fileStream = File.Open( "D:''1KB.txt", FileMode.Open, FileAccess.Read, FileShare.Read );
            byte[] buffer = new byte[ 32 ];
            var tRead = Task<int>.Factory.FromAsync( fileStream.BeginRead, fileStream.EndRead, buffer, 0, 32, null );
            var tClose = tRead.ContinueWith( task =>
                    {
                        Output.Write( "Closing file " + number + ". Read " + task.Result + " bytes." );
                        fileStream.Close();
                    }
                    , TaskScheduler.Default
                );
            tClose.Wait();
        }
        , CancellationToken.None
        , TaskCreationOptions.None
        , ts
    );
}

Main现在看起来像这样:

static void Main()
{
    LimitedConcurrencyLevelTaskScheduler ts = new LimitedConcurrencyLevelTaskScheduler( 1 );
    int[] range = { 1, 2, 3 };
    var tasks = range.Select( number =>
        {
            var task = QueueReadTask( ts, number );
            return task.ContinueWith( t => Output.Write( "Number " + number + " completed" ) );
        }
    )
    .ToArray();
    Output.Write( "Waiting for " + tasks.Length + " tasks: " + String.Join( " ", tasks.Select( t => t.Status ).ToArray() ) );
    Task.WaitAll( tasks );
    Output.Write( "WaitAll complete for " + tasks.Length + " tasks: " + String.Join( " ", tasks.Select( t => t.Status ).ToArray() ) );
}

有两件事需要注意:

task.Wait()移到QueueReadTask中会更明显地表明您正在阻塞任务。您可以删除FromAsync调用和延续,并将它们替换为正常的同步调用,因为您无论如何都是阻塞的。

QueueReadTask返回的任务可以有continuation。默认情况下,它们在默认调度器下运行,因为它们继承父任务的调度器而不是前一个任务的调度器。在本例中,没有父任务,因此使用默认调度器。