.NET任务并行库

本文关键字:并行 任务 NET | 更新日期: 2023-09-27 18:30:01

我已经阅读了关于TPL的文档和许多教程,但没有一个涵盖我想要实现的模型。

某些算法的迭代次数总是固定的。

我需要不断运行的线程(尽可能多):

而(真实)

  • 从MAIN线程获取数据
  • 执行繁重耗时的任务(在单独的线程中)
  • 更新MAIN线程信息

另外,我需要一种能够设置闹钟的机制(例如5秒)。五秒钟后,所有工作必须暂停一段时间,然后再继续。

我应该使用Task.ContinueWith相同的任务吗?但我并没有处理上一次任务启动的结果,而是更新MAIN线程中的数据结构,然后决定新任务迭代的输入。。。

我如何才能让TPL决定应该创建多少任务以获得最佳效率?

不,我使用的是BackgroundWorkers,因为他们有很好的RunEventCompleted事件-在里面我在我的主线程上,所以我可以更新我的main结构,检查时间限制,然后最终在完成的BackgroundWorker上再次调用StartAsync。它很好,很清楚,但可能非常低效。我需要让它在多处理器、多核服务器上高效运行。

一个问题是计算总是在线的,永远不会停止。还有一些网络,可以远程询问MAIN结构的当前状态。

第二个问题是关键时间控制(我必须有精确的定时器——当它停止时,没有线程可以重新启动)。然后是特殊的高优先级任务,在它结束后,所有的工作都恢复了。

第三个问题是操作没有上限

从我观察到的情况来看,这三个约束并不符合TPL——我不能使用类似Parallel的东西。因为集合是由任务本身的结果实时修改的。。。我也不知道如何组合:

  • 让TPL决定应该创建多少线程的能力
  • 具有某种线程的生存期运行(在连续重新启动之间具有暂停和同步点)
  • 一开始只创建一次线程(它们应该只使用不断更新的参数重新启动)

有人能给我线索吗?我知道如何用不好的、无效的方式去做。我描述了一些小的要求,这些要求使我无法正确地完成这项工作。我有点困惑。

.NET任务并行库

你需要使用消息传递+参与者+调度程序imo。然后你需要使用一种能够实现它的语言。看看这个代码,它异步地从Azure服务总线接收,在共享队列中排队,并通过参与者管理运行时状态。

内联:

我应该使用Task.ContinueWith相同的任务吗?

不,ContinueWith将根据每个continuation传递内部的异常处理来终止您的程序;在TPL中没有好的方法将失败的状态封送到调用端/主线程中。

但我并没有处理上一次任务启动的结果,但相反,我在中更新数据结构MAIN线程,然后决定新任务的输入迭代。。。

除非你愿意花很多时间在这个问题上,否则你需要超越线程。

我如何才能让TPL决定应该为多少任务创建最佳效率?

这是由运行异步工作流的框架来处理的。

不,我用的是BackgroundWorkers,因为他们有很好的RunEventCompleted事件-在它里面我在我的主线程上,所以我可以更新我的MAIN结构,检查时间限制,然后最终在已完成的BackgroundWorker上再次调用StartAsync。是的很好,很清楚,但可能非常低效。我需要去在多处理器、多核服务器上高效。

一个问题是计算总是在线的,永远不会停止。那里还有一些网络,可以远程询问电流MAIN结构的状态。第二个问题是关键时间控制(I必须有精确的计时器-当它停止时,没有线程可以重新启动)。

如果你异步运行所有内容,你可以将消息传递给挂起它的actor。你的调度actor负责用他们的调度消息调用所有订阅者;查看链接代码中的paused状态。如果您有未完成的请求,您可以向它们传递一个取消令牌,并以这种方式处理"硬"取消/套接字中止。

然后是特殊的高优先级任务结束后,所有恢复工作。根据我的观察,这两个约束条件好好地进行TPL——我不能用Parallel这样的东西。因为该集合由任务本身的结果实时修改。。。

您可能需要一种称为管道和过滤器的模式。你把你的投入输送到一系列工人(演员)中;每个工人消耗另一个工人的产出。信号发送是使用控制信道完成的(在我的情况下,这是演员的收件箱)。

我认为你应该阅读

MSDN:如何实现生产者/消费者数据流模式

我也遇到了同样的问题:一个生产商生产商品,而几个消费者消费了它们,并决定把它们寄给其他消费者。每个使用者都是异步工作的,并且独立于其他使用者。

你的主要任务是制作人。他会生成您的其他任务应该处理的项目。包含主任务代码的类有一个函数:

public async Task ProduceOutputAsync(...)

您的主程序启动此任务时使用:

var producerTask = Task.Run( () => MyProducer.ProduceOutputAsync(...)

一旦这被调用,生产者任务就开始产生输出。同时,你的主程序可以继续做其他事情,比如启动消费者。

但让我们首先关注Producer任务。

生产者任务生成要由其他任务处理的T类型的项目。它们使用实现ITargetBlock的对象转移到其他任务。

每次生产者任务完成创建T类型的对象时,它都会使用ITargetBlock.Post,或者最好是异步版本:将其发送到目标块

while (continueProducing())
{
    T product = await CreateProduct(...)
    bool accepted = await this.TargetBlock(product)
    // process the return value
}
// if here, nothing to produce anymore. Notify the consumers:
this.TargetBlock.Complete();

生产者需要ITargetBlock <T>。在我的应用程序中,一个BufferBlock <T>就足够了。请查看MSDN以了解其他可能的目标。

无论如何,数据流块也应该实现ISourceBlock <T>。接收器等待输入到达源,获取并处理它。完成后,它可以将结果发送到自己的目标块,并等待下一个输入,直到不再需要输入为止。当然,如果你的消费者没有产出,就不必向目标发送任何东西。

等待输入的步骤如下:

ISourceBlock`<T`> mySource = ...;
while (await mySource.ReceiveAsync())
{   // a object of type T is available at the source
    T objectToProcess = await mySource.ReceiveAsync();
    // keep in mind that someone else might have fetched your object
    // so only process it if you've got it.
    if (objectToProcess != null)
    {
        await ProcessAsync(objectToProcess);
        // if your processing produces output send the output to your target:
        var myOutput = await ProduceOutput(objectToprocess);
        await myTarget.SendAsync(myOutput);
    }
}
// if here, no input expected anymore, notify my consumers:
myTarget.Complete();
  • 构建你的生产者
  • 构造所有消费者
  • 给生产者一个BufferBlock以将其输出发送到
  • 启动生产者MyProducer。ProduceOutputAsync(…)
  • 当生产者产生输出并将其发送到缓冲块时:
  • 为使用者提供相同的BufferBlock
  • 将使用者作为单独的任务启动
  • wait Task。WhenAll(…)等待所有任务完成

每个消费者一旦听到不再需要输入,就会立即停止。所有任务完成后,您的主功能可以读取结果并返回