当使用LimitedConcurrencyLevelTaskScheduler时,Continuation任务挂起
本文关键字:Continuation 任务 挂起 LimitedConcurrencyLevelTaskScheduler | 更新日期: 2023-09-27 18:15:42
我一直致力于在c#中使用TPL。NET 4.0)。
我已经创建了一个自定义API来简化web请求的创建和下载内容(异步的,使用延续任务)。那部分工作得很好。
当我尝试使用延迟任务创建的LimitedConcurrencyLevelTaskScheduler
(在并行编程示例和任务的MSDN文档中找到)时,我遇到的问题发生了。如果您不熟悉这个类,它所做的就是将调度任务的并发程度限制为任意数量。
基本上,我想把web请求任务链的创建推迟到LimitedConcurrencyLevelTaskScheduler
调度的任务中,这样我就可以限制并发下载的数量。
正如圣人Stephen Toub所建议的,当推迟Task
的创建时,最好的方法是设计您的API以返回Func<Task>
或Func<Task<TResult>>
。我已经这样做了。
不幸的是,我的程序在调度第一组并发任务后挂起。假设我将我的任务限制为4级并发性。在这种情况下,将启动4个任务,然后程序将挂起。这些任务永远不会完成。
我已经创建了一个最小的例子来简单地说明这个问题。我使用文件读取而不是使用WebRequest
。我将并发度限制为1。
class Program
{
static Func<Task> GetReadTask()
{
return () =>
{
Console.WriteLine("Opening file.");
FileStream fileStream = File.Open("C:''Users''Joel''Desktop''1.txt", FileMode.Open);
byte[] buffer = new byte[32];
Console.WriteLine("Beginning read.");
return Task<int>.Factory.FromAsync(fileStream.BeginRead, fileStream.EndRead, buffer, 0, 32, null).ContinueWith(task => fileStream.Close());
};
}
static void Main()
{
LimitedConcurrencyLevelTaskScheduler ts = new LimitedConcurrencyLevelTaskScheduler(1);
TaskFactory factory = new TaskFactory(ts);
int[] range = {1, 2, 3};
var tasks = range.Select(number =>
{
Func<Task> getTask = GetReadTask();
return factory.StartNew(() =>
{
var task = getTask();
task.Wait();
});
});
Task.WaitAll(tasks.ToArray());
}
}
为了澄清我所说的"它挂起"是什么意思,下面是输出的样子。
Opening file.
Beginning read.
然后不打印其他内容…永远。
有什么线索吗?
好问题!
首先,我不确定LimitedConcurrencyLevelTaskScheduler
是学术上正确的解决方案。为了将并发请求的数量限制在N,您必须阻塞N个任务,这从一开始就违背了使用APM异步调用的目的。
话虽如此,它比另一种方法更容易实现。您需要有一个工作队列,并记录正在运行的请求的数量,然后根据需要创建工作任务。如果N个并发请求的数量很小,那么有N个阻塞线程并不是世界末日。
所以,代码的问题是在其他任务中创建的任务使用来自父任务的调度程序。实际上,用FromAsync
创建的任务不是这样的,因为它们使用底层APM实现,所以有点不同。
在Main
中创建任务:
return factory.StartNew( () =>
{
var task = getTask();
task.Wait();
}
);
factory
使用LimitedConcurrencyLevelTaskScheduler( 1 )
,因此这些任务中只有一个可以并发执行,而另一个正在等待getTask()
返回的任务。
所以在GetReadTask
中调用Task<int>.Factory.FromAsync
。这是因为FromAsync
不尊重父任务的调度程序。
然后使用.ContinueWith(task => fileStream.Close())
创建延续。这将创建一个尊重其父进程调度程序的任务。由于LimitedConcurrencyLevelTaskScheduler
已经在执行一个任务(Main
中被阻塞的任务),因此continuation无法运行,并且出现死锁。
解决方案是使用TaskScheduler.Default
在普通线程池线程上运行延续。然后它开始运行,死锁被打破。
这是我的解决方案:
static Task QueueReadTask( TaskScheduler ts, int number )
{
Output.Write( "QueueReadTask( " + number + " )" );
return Task.Factory.StartNew( () =>
{
Output.Write( "Opening file " + number + "." );
FileStream fileStream = File.Open( "D:''1KB.txt", FileMode.Open, FileAccess.Read, FileShare.Read );
byte[] buffer = new byte[ 32 ];
var tRead = Task<int>.Factory.FromAsync( fileStream.BeginRead, fileStream.EndRead, buffer, 0, 32, null );
var tClose = tRead.ContinueWith( task =>
{
Output.Write( "Closing file " + number + ". Read " + task.Result + " bytes." );
fileStream.Close();
}
, TaskScheduler.Default
);
tClose.Wait();
}
, CancellationToken.None
, TaskCreationOptions.None
, ts
);
}
和Main
现在看起来像这样:
static void Main()
{
LimitedConcurrencyLevelTaskScheduler ts = new LimitedConcurrencyLevelTaskScheduler( 1 );
int[] range = { 1, 2, 3 };
var tasks = range.Select( number =>
{
var task = QueueReadTask( ts, number );
return task.ContinueWith( t => Output.Write( "Number " + number + " completed" ) );
}
)
.ToArray();
Output.Write( "Waiting for " + tasks.Length + " tasks: " + String.Join( " ", tasks.Select( t => t.Status ).ToArray() ) );
Task.WaitAll( tasks );
Output.Write( "WaitAll complete for " + tasks.Length + " tasks: " + String.Join( " ", tasks.Select( t => t.Status ).ToArray() ) );
}
有两件事需要注意:
将task.Wait()
移到QueueReadTask
中会更明显地表明您正在阻塞任务。您可以删除FromAsync
调用和延续,并将它们替换为正常的同步调用,因为您无论如何都是阻塞的。
QueueReadTask
返回的任务可以有continuation。默认情况下,它们在默认调度器下运行,因为它们继承父任务的调度器而不是前一个任务的调度器。在本例中,没有父任务,因此使用默认调度器。