暂停/恢复异步任务的模式

本文关键字:模式 任务 异步 恢复 暂停 | 更新日期: 2023-09-27 18:37:02

我有一个主要受IO约束的连续任务(一个与拼写检查服务器对话的后台拼写检查器)。有时,此任务需要暂停并在以后恢复,具体取决于用户活动。

虽然挂起/恢复本质上是async/await的作用,但我发现有关如何为异步方法实现实际暂停/播放逻辑的信息很少。是否有推荐的模式?

我也考虑过使用Stephen Toub的AsyncManualResetEvent,但认为这可能是矫枉过正。

暂停/恢复异步任务的模式

2019 年更新,我最近有机会重新审视这段代码,下面是作为控制台应用程序的完整示例(警告:PauseTokenSource需要良好的单元测试)。

请注意,在我的例子中,要求是当消费者端代码(请求暂停)继续时,生产者端代码应该已经达到暂停状态。因此,当 UI 准备好反映暂停状态时,所有后台活动都应已暂停。

using System;
using System.Threading.Tasks;
using System.Threading;
namespace Console_19613444
{
    class Program
    {
        // PauseTokenSource
        public class PauseTokenSource
        {
            bool _paused = false;
            bool _pauseRequested = false;
            TaskCompletionSource<bool> _resumeRequestTcs;
            TaskCompletionSource<bool> _pauseConfirmationTcs;
            readonly SemaphoreSlim _stateAsyncLock = new SemaphoreSlim(1);
            readonly SemaphoreSlim _pauseRequestAsyncLock = new SemaphoreSlim(1);
            public PauseToken Token { get { return new PauseToken(this); } }
            public async Task<bool> IsPaused(CancellationToken token = default(CancellationToken))
            {
                await _stateAsyncLock.WaitAsync(token);
                try
                {
                    return _paused;
                }
                finally
                {
                    _stateAsyncLock.Release();
                }
            }
            public async Task ResumeAsync(CancellationToken token = default(CancellationToken))
            {
                await _stateAsyncLock.WaitAsync(token);
                try
                {
                    if (!_paused)
                    {
                        return;
                    }
                    await _pauseRequestAsyncLock.WaitAsync(token);
                    try
                    {
                        var resumeRequestTcs = _resumeRequestTcs;
                        _paused = false;
                        _pauseRequested = false;
                        _resumeRequestTcs = null;
                        _pauseConfirmationTcs = null;
                        resumeRequestTcs.TrySetResult(true);
                    }
                    finally
                    {
                        _pauseRequestAsyncLock.Release();
                    }
                }
                finally
                {
                    _stateAsyncLock.Release();
                }
            }
            public async Task PauseAsync(CancellationToken token = default(CancellationToken))
            {
                await _stateAsyncLock.WaitAsync(token);
                try
                {
                    if (_paused)
                    {
                        return;
                    }
                    Task pauseConfirmationTask = null;
                    await _pauseRequestAsyncLock.WaitAsync(token);
                    try
                    {
                        _pauseRequested = true;
                        _resumeRequestTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
                        _pauseConfirmationTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
                        pauseConfirmationTask = WaitForPauseConfirmationAsync(token);
                    }
                    finally
                    {
                        _pauseRequestAsyncLock.Release();
                    }
                    await pauseConfirmationTask;
                    _paused = true;
                }
                finally
                {
                    _stateAsyncLock.Release();
                }
            }
            private async Task WaitForResumeRequestAsync(CancellationToken token)
            {
                using (token.Register(() => _resumeRequestTcs.TrySetCanceled(), useSynchronizationContext: false))
                {
                    await _resumeRequestTcs.Task;
                }
            }
            private async Task WaitForPauseConfirmationAsync(CancellationToken token)
            {
                using (token.Register(() => _pauseConfirmationTcs.TrySetCanceled(), useSynchronizationContext: false))
                {
                    await _pauseConfirmationTcs.Task;
                }
            }
            internal async Task PauseIfRequestedAsync(CancellationToken token = default(CancellationToken))
            {
                Task resumeRequestTask = null;
                await _pauseRequestAsyncLock.WaitAsync(token);
                try
                {
                    if (!_pauseRequested)
                    {
                        return;
                    }
                    resumeRequestTask = WaitForResumeRequestAsync(token);
                    _pauseConfirmationTcs.TrySetResult(true);
                }
                finally
                {
                    _pauseRequestAsyncLock.Release();
                }
                await resumeRequestTask;
            }
        }
        // PauseToken - consumer side
        public struct PauseToken
        {
            readonly PauseTokenSource _source;
            public PauseToken(PauseTokenSource source) { _source = source; }
            public Task<bool> IsPaused() { return _source.IsPaused(); }
            public Task PauseIfRequestedAsync(CancellationToken token = default(CancellationToken))
            {
                return _source.PauseIfRequestedAsync(token);
            }
        }
        // Basic usage
        public static async Task DoWorkAsync(PauseToken pause, CancellationToken token)
        {
            try
            {
                while (true)
                {
                    token.ThrowIfCancellationRequested();
                    Console.WriteLine("Before await pause.PauseIfRequestedAsync()");
                    await pause.PauseIfRequestedAsync();
                    Console.WriteLine("After await pause.PauseIfRequestedAsync()");
                    await Task.Delay(1000);
                }
            }
            catch (Exception e)
            {
                Console.WriteLine("Exception: {0}", e);
                throw;
            }
        }
        static async Task Test(CancellationToken token)
        {
            var pts = new PauseTokenSource();
            var task = DoWorkAsync(pts.Token, token);
            while (true)
            {
                token.ThrowIfCancellationRequested();
                Console.WriteLine("Press enter to pause...");
                Console.ReadLine();
                Console.WriteLine("Before pause requested");
                await pts.PauseAsync();
                Console.WriteLine("After pause requested, paused: " + await pts.IsPaused());
                Console.WriteLine("Press enter to resume...");
                Console.ReadLine();
                Console.WriteLine("Before resume");
                await pts.ResumeAsync();
                Console.WriteLine("After resume");
            }
        }
        static async Task Main()
        {
            await Test(CancellationToken.None);
        }
    }
}
考虑到

您当前的代码有多混乱,AsyncManualResetEvent正是您所需要的。但稍微好一点的解决方案是使用斯蒂芬·图布的另一种方法:PauseToken 。它的工作原理与AsyncManualResetEvent类似,只是它的界面是专门为此而设计的。

涉及到异步/等待编程时,所有其他答案似乎要么很复杂,要么缺少标记,因为线程占用 CPU 昂贵并可能导致死锁。 经过大量的试验、错误和许多僵局,这终于适用于我的高使用率测试。

var isWaiting = true;
while (isWaiting)
{
    try
    {
        //A long delay is key here to prevent the task system from holding the thread.
        //The cancellation token allows the work to resume with a notification 
        //from the CancellationTokenSource.
        await Task.Delay(10000, cancellationToken);
    }
    catch (TaskCanceledException)
    {
        //Catch the cancellation and it turns into continuation
        isWaiting = false;
    }
}

它对我有用

        using System;
        using System.Threading;
        using System.Threading.Tasks;
        namespace TaskTest2
        {
            class Program
            {
                static ManualResetEvent mre = new ManualResetEvent(false);
                static void Main(string[] args)
                {
                   mre.Set();
                   Task.Factory.StartNew(() =>
                    {
                        while (true)
                        {
                            Console.WriteLine("________________");
                            mre.WaitOne();
                        }
                    } );
                    Thread.Sleep(10000);
                    mre.Reset();
                    Console.WriteLine("Task Paused");
                    Thread.Sleep(10000);
                    Console.WriteLine("Task Will Resume After 1 Second");
                    Thread.Sleep(1000);
                    mre.Set();
                    Thread.Sleep(10000);
                    mre.Reset();
                    Console.WriteLine("Task Paused");

                    Console.Read();
                }
            }
        }

好吧,也许这值得一个答案,但我对 C# 不是很熟悉,这里也没有 MonoDevelop,现在是凌晨 3 点,所以请怜悯。

我建议这样的事情

class Spellchecker
{
  private CancellationTokenSource mustStop = null;
  private volatile Task currentTask = null;
  //TODO add other state variables as needed
  public void StartSpellchecker()
  {
    if (currentTask != null)
    {
      /*
      * A task is already running,
      * you can either throw an exception
      * or silently return
      */
    }
    mustStop = new CancellationTokenSource();
    currentTask = SpellcheckAsync(mustStop.Token);
    currentTask.Start();
  }
  private async Task SpellcheckAsync(CancellationToken ct)
  {
    while (!ct.IsCancellationRequested))
    {
      /*
      * TODO perform spell check
      * This method must be the only one accessing
      * the spellcheck-related state variables
      */
    }
    currentTask = null;
  }
  public async Task StopSpellchecker()
  {
    if (currentTask == null)
    {
      /*
      * There is no task running
      * you can either throw an exception
      * or silently return
      */
    }
    else
    {
      /*
      * A CancelAfter(TimeSpan) method
      * is also available, which might interest you
      */
      mustStop.Cancel();
      //Remove the following lines if you don't want to wait for the task to actually stop
      var task = currentTask;
      if (task != null)
      {
        await task;
      }
    }
  }
}