正在查询写入模型中是否存在重复的聚合根属性

本文关键字:属性 存在 是否 查询 模型 | 更新日期: 2023-09-27 17:59:29

我正在使用事件源实现CQRS模式,我正在使用NServiceBus、NEventStore和NES(NSB和NEventStore之间的连接)。

我的应用程序将定期检查web服务中是否有任何要下载和处理的文件。当找到一个文件时,一个命令(DownloadFile)被发送到总线,并由FileCommandHandler接收,FileCommandHandler创建一个新的聚合根(file)并处理消息。

现在,在(File aggregate root)中,我必须检查文件的内容是否与任何其他文件内容不匹配(因为web服务保证只有文件名是唯一的,并且内容可能会用不同的名称重复),方法是对其进行哈希并与哈希内容列表进行比较。

问题是我必须将哈希代码列表保存在哪里?是否允许查询读取模型?

public class File : AggregateBase
{
    public File(DownloadFile cmd, IFileService fileDownloadService, IClaimSerializerService serializerService, IBus bus)
            : this()
        {
        // code to download the file content, deserialize it, and publish an event.
        }
}
public class FileCommandHandler : IHandleMessages<DownloadFile>, IHandleMessages<ExtractFile>
{
        public void Handle(DownloadFile command)
        {
             //for example, is it possible to do this (honestly, I feel it is not, since read model should always considered stale !)
            var file = readModelContext.GetFileByHashCode (Hash(command.FileContent));
            if (file != null)
                throw new Exception ("File content matched with another already downloaded file");
            // Since there is no way to query the event source for file content like:
            // eventSourceRepository.Find<File>(c=>c.HashCode == Hash(command.FileContent));
        }
}

正在查询写入模型中是否存在重复的聚合根属性

您似乎正在寻找重复数据消除。

你的指挥端是你希望事情保持一致的地方。查询会让您随时了解比赛情况。因此,我不运行查询,而是颠倒逻辑,将散列实际写入数据库表(任何具有ACID保证的数据库)。如果写入成功,请处理该文件。如果写入哈希失败,请跳过处理。

将这种逻辑放入处理程序是没有意义的,因为在失败的情况下重试消息(即多次存储哈希)不会使其成功。您还会在错误q中收到重复文件的消息。

重复数据消除逻辑的一个好位置可能位于web服务客户端中。一些伪逻辑

  1. 获取文件
  2. 未结交易
  3. 将哈希插入数据库&catch失败(不是任何失败,只是插入失败)
  4. 如果在步骤3中插入的记录数不为零,则向进程文件发送消息
  5. 提交事务

NServiceBus网关中的一些重复数据消除代码示例

编辑:
从他们的代码来看,我实际上认为session.Get<DeduplicationMessage>是不必要的。CCD_ 2应该足够并且是一致性边界。

只有当失败率很高时,执行查询才有意义,这意味着您有很多重复的内容文件。如果99%以上的插入成功,则重复项确实可以被视为异常。

这取决于很多事情。。。吞吐量就是其中之一。但是,由于您无论如何都是以"基于拉的"方式来处理这个问题(您正在查询Web服务以轮询工作(下载和分析文件)),因此您可以使整个过程串行化,而不必担心冲突。现在,这可能不会给出你想要处理"工作"的期望速度,但更重要的是。。。你量好了吗?让我们暂时回避一下,假设这部连续剧不会奏效。我们在谈论多少文件?100、1000、。。。数百万?根据这一点,散列可能适合内存,并且可以在进程停止时重建。也可能有机会沿着时间轴或上下文来划分你的问题。从黎明开始或今天开始的每一份文件,或者可能是本月的文件?真的,我认为你应该更深入地挖掘你的问题空间。除此之外,这感觉像是一个使用事件来源解决的尴尬问题,但YMMV。

当您的域中有一个真正的唯一性约束时,您可以将唯一性测试仪设置为域服务,其实现是基础设施的一部分——类似于存储库,其接口是域的一部分,其实现也是基础设施的一部分。对于实现,您可以使用内存中的散列或根据需要更新/查询的数据库。