同步消息队列
本文关键字:队列 消息 同步 | 更新日期: 2023-09-27 18:16:52
我有一个需要完成一组操作的系统,每个操作都有一个类型和一个实体。每个实体的操作可以独立完成,但在实体内部,类型的顺序很重要。只有在前一个类型的所有操作完成后,才能开始对该类型的操作。操作通常是从数据库中取出一些数据,对其进行处理,然后将结果再次存储在数据库中。
假设大量的操作需要并行使用许多机器,我如何才能达到这个效果(最好是在。net世界中)?
注:现在我使用的是服务总线,每条消息都是操作-实体类型的组合。这根本不能满足我的需求,因为这个顺序没有被强制执行,并且不清楚与消息连接的操作何时实际完成。
编辑:我需要找到一种合乎逻辑的方法,以正确的顺序处理这些操作,保持所有(可能的)并行。
的例子:我需要为2个实体(E1, E2)做3种操作(T1-T3)。可以有许多具有相同类型和实体的操作。
-- Entity 1
[T1, E1][T1, E1][T1, E1][T1, E1] - 4 operations Type 1, Entity 1
[T2, E1][T2, E1] - 2 ops, Type 2, Entity 1
[T3, E1] - 1 operation, Type 3, Entity 1
-- Entity 2
[T2, E2][T2, E2]
[T3, E2]
通常我可以并行处理实体1和实体2。我也可以并行处理每个特定的[Tx, Ex](例如所有[T2, E1])。我需要保持类型的顺序,T1->T2->T3。T2操作需要等待,直到所有T1操作在实体内完成。
也许您应该为每种类型的操作(消息)使用单独的队列。仅当前一个队列为空时,才按顺序处理这些队列。如。有3个队列A, B和c。消费者检查队列中是否有消息。如果是——消费者得到它并处理。如果没有,则检查B队列。等等......但是来自a队列的最后一条消息仍然可能有问题——当来自B队列的消息开始时,它可能仍然在处理。
类似的解决方案-使用消息优先级。但是最后一条消息的问题仍然存在。
在实体中保留一个状态如何,该状态诱导允许执行哪些类型的操作。
您将有一些worker,它将操作分发到不同的线程上,并且当没有更多特定类型的操作要完成时,它将更改状态。
我认为可以检查启动的线程是否完成,但你甚至可能想要为操作添加状态(例如Queued, InProgress, Done)。
你可以只查询一个实体是否还有某种类型的操作没有"完成",当它们都完成后,更新实体的状态,允许下一种状态为Queued的操作被取消队列并处理。
注意
只有当不变量保持不可能从一个状态转到前一个状态时才有效,这意味着只有当实体在完成T2操作后不可能处理T1操作时才有效。在实践中,这意味着不应该允许实体在开始处理之后获得新的操作分配。