取消令牌源和嵌套任务

本文关键字:嵌套 任务 令牌 取消 | 更新日期: 2023-09-27 18:32:24

我对我正在使用的取消令牌源有疑问,如下面的代码所示:

    void Process()
    {
        //for the sake of simplicity I am taking 1, in original implementation it is more than 1
        var cancellationToken = _cancellationTokenSource.Token;
        Task[] tArray = new Task[1];
        tArray[0] = Task.Factory.StartNew(() =>
        {
            cancellationToken.ThrowIfCancellationRequested();
            //do some work here
            MainTaskRoutine();
        }, cancellationToken);
        try
        {
            Task.WaitAll(tArray);
        }
        catch (Exception ex)
        {
            //do error handling here
        }
    }
    void MainTaskRoutine()
    {
        //for the sake of simplicity I am taking 1, in original implementation it is more than 1
        //this method shows that a nested task is created 
        var cancellationToken = _cancellationTokenSource.Token;
        Task[] tArray = new Task[1];
        tArray[0] = Task.Factory.StartNew(() =>
        {
            cancellationToken.ThrowIfCancellationRequested();
            //do some work here
        }, cancellationToken);
        try
        {
            Task.WaitAll(tArray);
        }
        catch (Exception ex)
        {
         //do error handling here
        }
    }


编辑:进一步阐述

最终目标是:当用户取消操作时,所有立即挂起的任务(子项或孙项)都应取消。

场景:根据上面的代码:1. 我首先检查用户是否要求取消2. 如果用户没有要求取消,则只继续执行任务(请参阅处理方法)。示例代码在此处仅显示一个任务,但实际上可以有三个或更多

假设 CPU 开始处理 Task1,而其他任务仍在任务队列中等待某个 CPU 来执行它们。用户请求取消:处理方法中的任务 2,3 将立即取消,但任务 1 将继续工作,因为它已在处理中。

在任务 1 中,它调用方法 MainTaskRoutine,这反过来又会创建更多任务。

在 MainTaskRoutine 的函数中,我写了:cancelToken.ThrowIfCancelRequest();

所以问题是:使用CancelTokenSource的正确方法,因为它依赖于Task.WaitAll()?

取消令牌源和嵌套任务

[已编辑] 当您在代码中使用数组时,我假设可能有多个任务,而不仅仅是一个。我还假设在你从Process开始的每个任务中,你想先做一些 CPU 密集型工作(//do some work here),然后运行 MainTaskRoutine

如何处理任务取消异常由项目设计工作流决定。 例如,您可以在Process方法中执行此操作,或者从调用Process的位置执行此操作。如果您唯一关心的是从跟踪挂起任务的数组中删除 Task 对象,则可以使用 Task.ContinueWith 完成此操作。无论任务的完成状态如何(CancelledFaultedRanToCompletion),都将执行延续:

Task Process(CancellationToken cancellationToken)
{
    var tArray = new List<Task>();
    var tArrayLock = new Object();
    var task = Task.Run(() =>
    {
        cancellationToken.ThrowIfCancellationRequested();
        //do some work here
        return MainTaskRoutine(cancellationToken);
    }, cancellationToken);
    // add the task to the array,
    // use lock as we may remove tasks from this array on a different thread
    lock (tArrayLock)
        tArray.Add(task);
    task.ContinueWith((antecedentTask) =>
    {
        if (antecedentTask.IsCanceled || antecedentTask.IsFaulted)
        {
            // handle cancellation or exception inside the task
            // ...
        }
        // remove task from the array,
        // could be on a different thread from the Process's thread, use lock
        lock (tArrayLock)
            tArray.Remove(antecedentTask);
    }, TaskContinuationOptions.ExecuteSynchronously);
    // add more tasks like the above
    // ...
    // Return aggregated task
    Task[] allTasks = null;
    lock (tArrayLock)
        allTasks = tArray.ToArray();
    return Task.WhenAll(allTasks);
}

您的MainTaskRoutine可以以与Process完全相同的方式构建,并具有相同的方法签名(返回Task)。

然后,您可能希望对 Process 返回的聚合任务执行阻塞等待,或者异步处理其完成,例如:

// handle the completion asynchronously with a blocking wait
void RunProcessSync()
{
    try
    {
        Process(_cancellationTokenSource.Token).Wait();
        MessageBox.Show("Process complete");
    }
    catch (Exception e)
    {
        MessageBox.Show("Process cancelled (or faulted): " + e.Message);
    }
}
// handle the completion asynchronously using ContinueWith
Task RunProcessAync()
{
    return Process(_cancellationTokenSource.Token).ContinueWith((task) =>
    {
        // check task.Status here
        MessageBox.Show("Process complete (or cancelled, or faulted)");
    }, TaskScheduler.FromCurrentSynchronizationContext());
}
// handle the completion asynchronously with async/await
async Task RunProcessAync()
{
    try
    {
        await Process(_cancellationTokenSource.Token);
        MessageBox.Show("Process complete");
    }
    catch (Exception e)
    {
        MessageBox.Show("Process cancelled (or faulted): " + e.Message);
    }
}

经过一些研究,我发现了这个链接。

代码现在如下所示:请参阅下面的代码中CancelTokenSource.CreateLinkedTokenSource的用法

    void Process()
    {
        //for the sake of simplicity I am taking 1, in original implementation it is more than 1
        var cancellationToken = _cancellationTokenSource.Token;
        Task[] tArray = new Task[1];
        tArray[0] = Task.Factory.StartNew(() =>
        {
            cancellationToken.ThrowIfCancellationRequested();
            //do some work here
            MainTaskRoutine(cancellationToken);
        }, cancellationToken);
        try
        {
            Task.WaitAll(tArray);
        }
        catch (Exception ex)
        {
            //do error handling here
        }
    }
    void MainTaskRoutine(CancellationToken cancellationToken)
    {
        //for the sake of simplicity I am taking 1, in original implementation it is more than 1
        //this method shows that a nested task is created 
        using (var cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken))
        {
            var cancelToken = cancellationTokenSource.Token;
            Task[] tArray = new Task[1];
            tArray[0] = Task.Factory.StartNew(() =>
            {
                cancelToken.ThrowIfCancellationRequested();
                //do some work here
            }, cancelToken);
            try
            {
                Task.WaitAll(tArray);
            }
            catch (Exception ex)
            {
                //do error handling here
            } 
        }
    }

注意:我没有使用它,但一旦完成,我会通知您:)