如何使用任务并行库执行多个任务,并在第一个任务之后返回以实际返回数据

本文关键字:任务 返回 之后 第一个 数据 并行 何使用 执行 | 更新日期: 2023-09-27 18:28:19

我正在开发一个应用程序,在该应用程序中,尽快将数据返回到串行进程很重要,但可以有多个来源来获取数据。同样,有时一个源比另一个快,但你不知道会是哪个源。我使用ContinueWhenAny(…).Wait()来等待第一个Task结束,以便继续并从调用方法返回。但是,我需要首先检查数据的有效性,然后才返回(或者如果所有任务都已完成,并且没有有效的数据)。现在,如果任务首先完成,我的代码将返回甚至无效的数据。

是否有一种方法可以执行类似"ContinueWhenAny"的操作,但只能在Task.Result满足特定条件时执行,否则请等待下一个任务等。。直到最后一项任务完成?

同样,我需要确保在一个结果有效后,其他线程取消。这部分已经运行良好。

目前,我的代码看起来是这样的(去掉了异常处理,只有螺母和螺栓):

        ResultObject result = null;
        var tokenSource = new CancellationTokenSource();
        var tasks = listOfSources
                .Select(i => Task.Factory.StartNew(
                    () =>
                        {
                            i.CancellationToken = tokenSource.Token;
                            //Database Call
                            return i.getData(inputparameters);
                        }, tokenSource.Token));
        Task.Factory.ContinueWhenAny(
                tasks.ToArray(),
                firstCompleted =>
                    {
                        //This is the "result" I need to validate before setting and canceling the other threads
                        result = firstCompleted.Result;
                        tokenSource.Cancel();
                    }).Wait();
        return result;

有什么想法吗?我不想使用ContinueWhenAll,因为如果第一个调用需要2秒,第二个调用需要10秒,如果第一个呼叫返回有效数据,我希望在2秒内返回串行进程,否则等待10秒,希望结果具有有效数据,并且只有在所有任务都已完成并返回无效结果的情况下才返回无效数据。

---------更新----谢谢zmbq的好主意。下面是更新的(工作的)代码,它满足了我的所有要求。然而,需要注意的是,此代码与前一代码之间的区别在于,如果没有任务产生有效结果,则此代码将返回null结果,而不是前一代码本身返回无效结果。改变这个版本也不难,但出于我的目的,我完全满足于在这种情况下返回null。

        ResultObject result = null;
        var tokenSource = new CancellationTokenSource();
        var tasks = listOfSources
                .Select(i => Task.Factory.StartNew(
                    () =>
                        {
                            i.CancellationToken = tokenSource.Token;
                            //Database Call
                            return i.getData(inputparameters);
                        }, tokenSource.Token)).ToArray();
        result = GetFirstValidResult(tokenSource,tasks);
        return result;

   private ResultObject GetFirstValidResult(CancellationTokenSource tokenSource, Task<ResultObject>[] tasks)
    {
        ResultObject result = null;
        Task.Factory.ContinueWhenAny(
            tasks,
            firstCompleted =>
                {
                    var testResult = firstCompleted.Result;
                    if(testResult != null && testResult.IsValid())
                    {
                        result = testResult;
                        tokenSource.Cancel();
                    }
                    else
                    {
                        var remainingTasks = tasks.Except(new[]{firstCompleted}).ToArray();
                        if(remainingTasks.Any())
                        {
                            result = GetFirstValidResult(tokenSource, remainingTasks);
                        } 
                    }
                }).Wait();
        return result;
    }

如何使用任务并行库执行多个任务,并在第一个任务之后返回以实际返回数据

如果您的firstCompleted回调将检查结果,并在剩余任务上调用ContinueWhenAny以防结果非法,那么您就完了。

和往常一样,我建议您查看ZeroMQ。如果结果合法,则激发任务并让每个任务向输出队列写入一条消息。当出现有效消息时,或者当所有任务都已完成时,主线程将阻塞队列并返回。