多个工作线程与一个具有异步/等待的工作线程
本文关键字:线程 工作 异步 等待 一个 | 更新日期: 2023-09-27 18:35:11
我的代码当前有以下 10 个工作线程。每个工作线程继续从队列中轮询作业,然后处理长时间运行的作业。
for (int k=0; k<10; k++)
{
Task.Factory.StartNew(() => DoPollingThenWork(), TaskCreationOptions.LongRunning);
}
void DoPollingThenWork()
{
while (true)
{
var msg = Poll();
if (msg != null)
{
Thread.Sleep(3000); // process the I/O bound job
}
}
}
我正在重构底层代码以使用异步/等待模式。我想我可以将上面的代码重写为以下内容。它使用一个不断创建异步任务的主线程,并使用 SemaphoreSlim 将并发任务数限制为 10。
Task.Factory.StartNew(() => WorkerMainAsync(), TaskCreationOptions.LongRunning);
async Task WorkerMainAsync()
{
SemaphoreSlim ss = new SemaphoreSlim(10);
while (true)
{
await ss.WaitAsync();
Task.Run(async () =>
{
await DoPollingThenWorkAsync();
ss.Release();
});
}
}
async Task DoPollingThenWorkAsync()
{
var msg = Poll();
if (msg != null)
{
await Task.Delay(3000); // process the I/O-bound job
}
}
两者的行为应该相同。但我认为第二个选项似乎更好,因为它不会阻塞线程。但缺点是我不能做等待(优雅地停止任务),因为任务就像火一样忘记。第二个选项是取代传统工作线程模式的正确方法吗?
当你有异步代码时,你通常没有理由使用Task.Run()
(或者更糟糕的是,Task.Factory.StartNew()
)。这意味着您可以将代码更改为如下所示的内容:
await WorkerMainAsync();
async Task WorkerMainAsync()
{
SemaphoreSlim ss = new SemaphoreSlim(10);
while (true)
{
await ss.WaitAsync();
// you should probably store this task somewhere and then await it
var task = DoPollingThenWorkAsync();
}
}
async Task DoPollingThenWorkAsync(SemaphoreSlim semaphore)
{
var msg = Poll();
if (msg != null)
{
await Task.Delay(3000); // process the I/O-bound job
}
// this assumes you don't have to worry about exceptions
// otherwise consider try-finally
semaphore.Release();
}
通常,您不会在 CPU 密集型任务中使用async/await
。启动此类任务的方法(WorkerMainAsync
)可以使用async/await
,但您应该跟踪待处理的任务:
async Task WorkerMainAsync()
{
SemaphoreSlim ss = new SemaphoreSlim(10);
List<Task> trackedTasks = new List<Task>();
while (DoMore())
{
await ss.WaitAsync();
trackedTasks.Add(Task.Run(() =>
{
DoPollingThenWorkAsync();
ss.Release();
}));
}
await Task.WhenAll(trackedTasks);
}
void DoPollingThenWorkAsync()
{
var msg = Poll();
if (msg != null)
{
Thread.Sleep(2000); // process the long running CPU-bound job
}
}
另一个练习是在任务完成时从trackedTasks
中删除任务。例如,您可以使用ContinueWith
删除已完成的任务(在这种情况下,请记住使用lock
来保护trackedTasks
免受同时访问)。
如果你真的需要在DoPollingThenWorkAsync
内部使用await
,代码不会改变太多:
trackedTasks.Add(Task.Run(async () =>
{
await DoPollingThenWorkAsync();
ss.Release();
}));
请注意,在这种情况下,您将在此处处理异步 lambda 的嵌套任务,Task.Run
会自动为您解包。