.NET任务并行库
本文关键字:并行 任务 NET | 更新日期: 2023-09-27 18:30:01
我已经阅读了关于TPL的文档和许多教程,但没有一个涵盖我想要实现的模型。
某些算法的迭代次数总是固定的。
我需要不断运行的线程(尽可能多):
而(真实)
- 从MAIN线程获取数据
- 执行繁重耗时的任务(在单独的线程中)
- 更新MAIN线程信息
另外,我需要一种能够设置闹钟的机制(例如5秒)。五秒钟后,所有工作必须暂停一段时间,然后再继续。
我应该使用Task.ContinueWith相同的任务吗?但我并没有处理上一次任务启动的结果,而是更新MAIN线程中的数据结构,然后决定新任务迭代的输入。。。
我如何才能让TPL决定应该创建多少任务以获得最佳效率?
不,我使用的是BackgroundWorkers,因为他们有很好的RunEventCompleted事件-在里面我在我的主线程上,所以我可以更新我的main结构,检查时间限制,然后最终在完成的BackgroundWorker上再次调用StartAsync。它很好,很清楚,但可能非常低效。我需要让它在多处理器、多核服务器上高效运行。
一个问题是计算总是在线的,永远不会停止。还有一些网络,可以远程询问MAIN结构的当前状态。
第二个问题是关键时间控制(我必须有精确的定时器——当它停止时,没有线程可以重新启动)。然后是特殊的高优先级任务,在它结束后,所有的工作都恢复了。
第三个问题是操作没有上限
从我观察到的情况来看,这三个约束并不符合TPL——我不能使用类似Parallel的东西。因为集合是由任务本身的结果实时修改的。。。我也不知道如何组合:
- 让TPL决定应该创建多少线程的能力
- 具有某种线程的生存期运行(在连续重新启动之间具有暂停和同步点)
- 一开始只创建一次线程(它们应该只使用不断更新的参数重新启动)
有人能给我线索吗?我知道如何用不好的、无效的方式去做。我描述了一些小的要求,这些要求使我无法正确地完成这项工作。我有点困惑。
你需要使用消息传递+参与者+调度程序imo。然后你需要使用一种能够实现它的语言。看看这个代码,它异步地从Azure服务总线接收,在共享队列中排队,并通过参与者管理运行时状态。
内联:
我应该使用Task.ContinueWith相同的任务吗?
不,ContinueWith将根据每个continuation传递内部的异常处理来终止您的程序;在TPL中没有好的方法将失败的状态封送到调用端/主线程中。
但我并没有处理上一次任务启动的结果,但相反,我在中更新数据结构MAIN线程,然后决定新任务的输入迭代。。。
除非你愿意花很多时间在这个问题上,否则你需要超越线程。
我如何才能让TPL决定应该为多少任务创建最佳效率?
这是由运行异步工作流的框架来处理的。
不,我用的是BackgroundWorkers,因为他们有很好的RunEventCompleted事件-在它里面我在我的主线程上,所以我可以更新我的MAIN结构,检查时间限制,然后最终在已完成的BackgroundWorker上再次调用StartAsync。是的很好,很清楚,但可能非常低效。我需要去在多处理器、多核服务器上高效。
一个问题是计算总是在线的,永远不会停止。那里还有一些网络,可以远程询问电流MAIN结构的状态。第二个问题是关键时间控制(I必须有精确的计时器-当它停止时,没有线程可以重新启动)。
如果你异步运行所有内容,你可以将消息传递给挂起它的actor。你的调度actor负责用他们的调度消息调用所有订阅者;查看链接代码中的paused
状态。如果您有未完成的请求,您可以向它们传递一个取消令牌,并以这种方式处理"硬"取消/套接字中止。
然后是特殊的高优先级任务结束后,所有恢复工作。根据我的观察,这两个约束条件好好地进行TPL——我不能用Parallel这样的东西。因为该集合由任务本身的结果实时修改。。。
您可能需要一种称为管道和过滤器的模式。你把你的投入输送到一系列工人(演员)中;每个工人消耗另一个工人的产出。信号发送是使用控制信道完成的(在我的情况下,这是演员的收件箱)。
我认为你应该阅读
MSDN:如何实现生产者/消费者数据流模式
我也遇到了同样的问题:一个生产商生产商品,而几个消费者消费了它们,并决定把它们寄给其他消费者。每个使用者都是异步工作的,并且独立于其他使用者。
你的主要任务是制作人。他会生成您的其他任务应该处理的项目。包含主任务代码的类有一个函数:
public async Task ProduceOutputAsync(...)
您的主程序启动此任务时使用:
var producerTask = Task.Run( () => MyProducer.ProduceOutputAsync(...)
一旦这被调用,生产者任务就开始产生输出。同时,你的主程序可以继续做其他事情,比如启动消费者。
但让我们首先关注Producer任务。
生产者任务生成要由其他任务处理的T类型的项目。它们使用实现ITargetBlock的对象转移到其他任务。
每次生产者任务完成创建T类型的对象时,它都会使用ITargetBlock.Post,或者最好是异步版本:将其发送到目标块
while (continueProducing())
{
T product = await CreateProduct(...)
bool accepted = await this.TargetBlock(product)
// process the return value
}
// if here, nothing to produce anymore. Notify the consumers:
this.TargetBlock.Complete();
生产者需要ITargetBlock <T
>。在我的应用程序中,一个BufferBlock <T
>就足够了。请查看MSDN以了解其他可能的目标。
无论如何,数据流块也应该实现ISourceBlock <T
>。接收器等待输入到达源,获取并处理它。完成后,它可以将结果发送到自己的目标块,并等待下一个输入,直到不再需要输入为止。当然,如果你的消费者没有产出,就不必向目标发送任何东西。
等待输入的步骤如下:
ISourceBlock`<T`> mySource = ...;
while (await mySource.ReceiveAsync())
{ // a object of type T is available at the source
T objectToProcess = await mySource.ReceiveAsync();
// keep in mind that someone else might have fetched your object
// so only process it if you've got it.
if (objectToProcess != null)
{
await ProcessAsync(objectToProcess);
// if your processing produces output send the output to your target:
var myOutput = await ProduceOutput(objectToprocess);
await myTarget.SendAsync(myOutput);
}
}
// if here, no input expected anymore, notify my consumers:
myTarget.Complete();
- 构建你的生产者
- 构造所有消费者
- 给生产者一个BufferBlock以将其输出发送到
- 启动生产者MyProducer。ProduceOutputAsync(…)
- 当生产者产生输出并将其发送到缓冲块时:
- 为使用者提供相同的BufferBlock
- 将使用者作为单独的任务启动
- wait Task。WhenAll(…)等待所有任务完成
每个消费者一旦听到不再需要输入,就会立即停止。所有任务完成后,您的主功能可以读取结果并返回