用于自行取消和重新启动任务的模式

本文关键字:任务 模式 重新启动 取消 用于 | 更新日期: 2023-09-27 18:32:39

是否有推荐的自取消和重新启动任务的既定模式?

例如,我正在研究后台拼写检查器的 API。拼写检查会话包装为 Task 。每个新会话都应取消前一个会话并等待其终止(以正确重用拼写检查服务提供商等资源)。

我想出了这样的东西:

class Spellchecker
{
    Task pendingTask = null; // pending session
    CancellationTokenSource cts = null; // CTS for pending session
    // SpellcheckAsync is called by the client app
    public async Task<bool> SpellcheckAsync(CancellationToken token)
    {
        // SpellcheckAsync can be re-entered
        var previousCts = this.cts;
        var newCts = CancellationTokenSource.CreateLinkedTokenSource(token);
        this.cts = newCts;
        if (IsPendingSession())
        {
            // cancel the previous session and wait for its termination
            if (!previousCts.IsCancellationRequested)
                previousCts.Cancel();
            // this is not expected to throw
            // as the task is wrapped with ContinueWith
            await this.pendingTask; 
        }
        newCts.Token.ThrowIfCancellationRequested();
        var newTask = SpellcheckAsyncHelper(newCts.Token);
        this.pendingTask = newTask.ContinueWith((t) => {
            this.pendingTask = null;
            // we don't need to know the result here, just log the status
            Debug.Print(((object)t.Exception ?? (object)t.Status).ToString());
        }, TaskContinuationOptions.ExecuteSynchronously);
        return await newTask;
    }
    // the actual task logic
    async Task<bool> SpellcheckAsyncHelper(CancellationToken token)
    {
        // do not start a new session if the the previous one still pending
        if (IsPendingSession())
            throw new ApplicationException("Cancel the previous session first.");
        // do the work (pretty much IO-bound)
        try
        {
            bool doMore = true;
            while (doMore)
            {
                token.ThrowIfCancellationRequested();
                await Task.Delay(500); // placeholder to call the provider
            }
            return doMore;
        }
        finally
        {
            // clean-up the resources
        }
    }
    public bool IsPendingSession()
    {
        return this.pendingTask != null &&
            !this.pendingTask.IsCompleted &&
            !this.pendingTask.IsCanceled &&
            !this.pendingTask.IsFaulted;
    }
}

客户端应用(UI)应该能够根据需要多次调用SpellcheckAsync,而无需担心取消挂起的会话。主doMore循环在 UI 线程上运行(因为它涉及 UI,而所有拼写检查服务提供程序调用都是 IO 绑定的)。

我对我不得不将 API 分成两部分这一事实感到有点不舒服,SpellcheckAsyncSpellcheckAsyncHelper ,但我想不出更好的方法来做到这一点,而且它还没有经过测试。

用于自行取消和重新启动任务的模式

我认为

一般概念非常好,尽管我建议您不要使用ContinueWith

我只是使用常规await编写它,并且不需要很多"我已经运行"的逻辑:

Task pendingTask = null; // pending session
CancellationTokenSource cts = null; // CTS for pending session
// SpellcheckAsync is called by the client app on the UI thread
public async Task<bool> SpellcheckAsync(CancellationToken token)
{
    // SpellcheckAsync can be re-entered
    var previousCts = this.cts;
    var newCts = CancellationTokenSource.CreateLinkedTokenSource(token);
    this.cts = newCts;
    if (previousCts != null)
    {
        // cancel the previous session and wait for its termination
        previousCts.Cancel();
        try { await this.pendingTask; } catch { }
    }
    newCts.Token.ThrowIfCancellationRequested();
    this.pendingTask = SpellcheckAsyncHelper(newCts.Token);
    return await this.pendingTask;
}
// the actual task logic
async Task<bool> SpellcheckAsyncHelper(CancellationToken token)
{
    // do the work (pretty much IO-bound)
    using (...)
    {
        bool doMore = true;
        while (doMore)
        {
            token.ThrowIfCancellationRequested();
            await Task.Delay(500); // placeholder to call the provider
        }
        return doMore;
    }
}

以下是我使用的取消和重新启动模式的最新版本:

class AsyncWorker
{
    Task _pendingTask;
    CancellationTokenSource _pendingTaskCts;
    // the actual worker task
    async Task DoWorkAsync(CancellationToken token)
    {
        token.ThrowIfCancellationRequested();
        Debug.WriteLine("Start.");
        await Task.Delay(100, token);
        Debug.WriteLine("Done.");
    }
    // start/restart
    public void Start(CancellationToken token)
    {
        var previousTask = _pendingTask;
        var previousTaskCts = _pendingTaskCts;
        var thisTaskCts = CancellationTokenSource.CreateLinkedTokenSource(token);
        _pendingTask = null;
        _pendingTaskCts = thisTaskCts;
        // cancel the previous task
        if (previousTask != null && !previousTask.IsCompleted)
            previousTaskCts.Cancel();
        Func<Task> runAsync = async () =>
        {
            // await the previous task (cancellation requested)
            if (previousTask != null)
                await previousTask.WaitObservingCancellationAsync();
            // if there's a newer task started with Start, this one should be cancelled
            thisTaskCts.Token.ThrowIfCancellationRequested();
            await DoWorkAsync(thisTaskCts.Token).WaitObservingCancellationAsync();
        };
        _pendingTask = Task.Factory.StartNew(
            runAsync,
            CancellationToken.None,
            TaskCreationOptions.None,
            TaskScheduler.FromCurrentSynchronizationContext()).Unwrap();
    }
    // stop
    public void Stop()
    {
        if (_pendingTask == null)
            return;
        if (_pendingTask.IsCanceled)
            return;
        if (_pendingTask.IsFaulted)
            _pendingTask.Wait(); // instantly throw an exception
        if (!_pendingTask.IsCompleted)
        {
            // still running, request cancellation 
            if (!_pendingTaskCts.IsCancellationRequested)
                _pendingTaskCts.Cancel();
            // wait for completion
            if (System.Threading.Thread.CurrentThread.GetApartmentState() == ApartmentState.MTA)
            {
                // MTA, blocking wait
                _pendingTask.WaitObservingCancellation();
            }
            else
            {
                // TODO: STA, async to sync wait bridge with DoEvents,
                // similarly to Thread.Join
            }
        }
    }
}
// useful extensions
public static class Extras
{
    // check if exception is OperationCanceledException
    public static bool IsOperationCanceledException(this Exception ex)
    {
        if (ex is OperationCanceledException)
            return true;
        var aggEx = ex as AggregateException;
        return aggEx != null && aggEx.InnerException is OperationCanceledException;
    }
    // wait asynchrnously for the task to complete and observe exceptions
    public static async Task WaitObservingCancellationAsync(this Task task)
    {
        try
        {
            await task;
        }
        catch (Exception ex)
        {
            // rethrow if anything but OperationCanceledException
            if (!ex.IsOperationCanceledException())
                throw;
        }
    }
    // wait for the task to complete and observe exceptions
    public static void WaitObservingCancellation(this Task task)
    {
        try
        {
            task.Wait();
        }
        catch (Exception ex)
        {
            // rethrow if anything but OperationCanceledException
            if (!ex.IsOperationCanceledException())
                throw;
        }
    }
}

测试使用(仅生成一个"开始/完成"输出DoWorkAsync):

private void MainForm_Load(object sender, EventArgs e)
{
    var worker = new AsyncWorker();
    for (var i = 0; i < 10; i++)
        worker.Start(CancellationToken.None);
}
<</div> div class="answers">

希望这会有用 - 尝试创建可以重用的帮助程序类:

class SelfCancelRestartTask
{
    private Task _task = null;
    public CancellationTokenSource TokenSource { get; set; } = null;
    public SelfCancelRestartTask()
    {
    }
    public async Task Run(Action operation)
    {
        if (this._task != null &&
            !this._task.IsCanceled &&
            !this._task.IsCompleted &&
            !this._task.IsFaulted)
        {
            TokenSource?.Cancel();
            await this._task;
            TokenSource = new CancellationTokenSource();
        }
        else
        {
            TokenSource = new CancellationTokenSource();
        }
        this._task = Task.Run(operation, TokenSource.Token);
    }

当异步方法在彼此快速调用多次(例如四次)时,上面的示例似乎存在问题。然后,此方法的所有后续调用都会取消第一个任务,最后生成三个同时运行的新任务。所以我想出了这个:

    private List<Tuple<Task, CancellationTokenSource>> _parameterExtractionTasks = new List<Tuple<Task, CancellationTokenSource>>();
    /// <remarks>This method is asynchronous, i.e. it runs partly in the background. As this method might be called multiple times 
    /// quickly after each other, a mechanism has been implemented that <b>all</b> tasks from previous method calls are first canceled before the task is started anew.</remarks>
    public async void ParameterExtraction() {
        CancellationTokenSource newCancellationTokenSource = new CancellationTokenSource();
        // Define the task which shall run in the background.
        Task newTask = new Task(() => {
            // do some work here
                }
            }
        }, newCancellationTokenSource.Token);
        _parameterExtractionTasks.Add(new Tuple<Task, CancellationTokenSource>(newTask, newCancellationTokenSource));
        /* Convert the list to arrays as an exception is thrown if the number of entries in a list changes while 
         * we are in a for loop. This can happen if this method is called again while we are waiting for a task. */
        Task[] taskArray = _parameterExtractionTasks.ConvertAll(item => item.Item1).ToArray();
        CancellationTokenSource[] tokenSourceArray = _parameterExtractionTasks.ConvertAll(item => item.Item2).ToArray();
        for (int i = 0; i < taskArray.Length - 1; i++) { // -1: the last task, i.e. the most recent task, shall be run and not canceled. 
            // Cancel all running tasks which were started by previous calls of this method
            if (taskArray[i].Status == TaskStatus.Running) {
                tokenSourceArray[i].Cancel();
                await taskArray[i]; // wait till the canceling completed
            }
        }
        // Get the most recent task
        Task currentThreadToRun = taskArray[taskArray.Length - 1];
        // Start this task if, but only if it has not been started before (i.e. if it is still in Created state). 
        if (currentThreadToRun.Status == TaskStatus.Created) {
            currentThreadToRun.Start();
            await currentThreadToRun; // wait till this task is completed.
        }
        // Now the task has been completed once. Thus we can recent the list of tasks to cancel or maybe run.
        _parameterExtractionTasks = new List<Tuple<Task, CancellationTokenSource>>();
    }