RavenDB 流提供无限结果 - 连接弹性

本文关键字:连接 结果 无限 RavenDB | 更新日期: 2023-09-27 18:35:42

我们使用 RavenDB 中的 Stream 功能在 2 个数据库之间加载、转换和迁移数据,如下所示:

var query = originSession.Query<T>(IndexForQuery);
using (var stream = originSession.Advanced.Stream(query))
{
    while (stream.MoveNext())
    {
        var streamedDocument = stream.Current.Document;
        OpenSessionAndMigrateSingleDocument(streamedDocument);
    }
}

问题是其中一个集合有数百万行,并且我们不断收到以下格式的IOException

Application: MigrateToNewSchema.exe
Framework Version: v4.0.30319
Description: The process was terminated due to an unhandled exception.
Exception Info: System.IO.IOException
Stack:
   at System.Net.ConnectStream.Read(Byte[], Int32, Int32)
   at System.IO.Compression.DeflateStream.Read(Byte[], Int32, Int32)
   at System.IO.Compression.GZipStream.Read(Byte[], Int32, Int32)
   at System.IO.StreamReader.ReadBuffer(Char[], Int32, Int32, Boolean ByRef)
   at System.IO.StreamReader.Read(Char[], Int32, Int32)
   at Raven.Imports.Newtonsoft.Json.JsonTextReader.ReadData(Boolean, Int32)
   at Raven.Imports.Newtonsoft.Json.JsonTextReader.ReadStringIntoBuffer(Char)
   at Raven.Imports.Newtonsoft.Json.JsonTextReader.ParseString(Char)
   at Raven.Imports.Newtonsoft.Json.JsonTextReader.ParseValue()
   at Raven.Imports.Newtonsoft.Json.JsonTextReader.ReadInternal()
   at Raven.Imports.Newtonsoft.Json.JsonTextReader.Read()
   at Raven.Json.Linq.RavenJObject.Load(Raven.Imports.Newtonsoft.Json.JsonReader)
   at Raven.Json.Linq.RavenJObject.Load(Raven.Imports.Newtonsoft.Json.JsonReader)
   at Raven.Json.Linq.RavenJToken.ReadFrom(Raven.Imports.Newtonsoft.Json.JsonReader)
   at Raven.Client.Connection.ServerClient+<YieldStreamResults>d__6b.MoveNext()
   at Raven.Client.Document.DocumentSession+<YieldQuery>d__c`1[[System.__Canon, mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089]].MoveNext()
   at MigrateToNewSchema.Migrator.DataMigratorBase`1[[System.__Canon, mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089]].MigrateCollection()
   at MigrateToNewSchema.Program.MigrateData(MigrateToNewSchema.Enums.CollectionToMigrate, Raven.Client.IDocumentStore, Raven.Client.IDocumentStore)
   at MigrateToNewSchema.Program.Main(System.String[])

这在流媒体中发生了很长一段路,当然在这种时间段内会出现暂时性连接问题(需要几个小时才能完成)。

但是,当我们重试时,由于我们正在使用Query我们必须从头开始。 因此,最终如果在整个Stream期间出现连接故障,那么我们必须再试一次,直到它端到端地工作。

我知道您可以将 ETag 与 stream 一起使用以在某个点有效地重新启动,但是使用我们需要过滤正在迁移的结果并指定正确集合的Query执行此操作不会过载。

那么,在RavenDB中,有没有办法提高连接的内部弹性(连接字符串属性,内部设置等)或在出现错误时有效地"恢复"流?

RavenDB 流提供无限结果 - 连接弹性

根据@StriplingWarrior的建议,我使用数据订阅重新创建了解决方案。

使用这种方法,我能够遍历所有 200 万行(尽管不可否认,每个项目的处理要少得多); 当我们尝试使用 Streams 实现相同的逻辑时,这里有 2 点会有所帮助:

  1. 批处理仅在确认后才会从订阅"队列"中删除(与大多数标准队列一样)
    1. 订阅的IObserver<T>必须成功完成才能设置此确认。
    2. 此信息由服务器而不是客户端处理,因此允许客户端重新启动,而不会影响订阅中处理的最后一个成功位置
    3. 有关更多详细信息,请参阅此处
  2. 如@StriplingWarrior所示,由于您可以使用筛选器创建订阅,直到属性级别,因此在订阅本身发生异常时,可以使用较小的结果集重播。
    1. 第一点确实取代了这一点;但它为我们提供了Stream API中看不到的额外灵活性。

测试环境是一个 RavenDB 3.0 数据库(本地计算机,作为 Windows 服务运行),对 200 万条记录的集合具有默认设置。

生成虚拟记录的代码:

using (IDocumentStore store = GetDocumentStore())
{
    store.Initialize();
    using (var bulkInsert = store.BulkInsert())
    {
        for (var i = 0; i != recordsToCreate; i++)
        {
            var person = new Person
            {
                Id = Guid.NewGuid(),
                Firstname = NameGenerator.GenerateFirstName(),
                Lastname = NameGenerator.GenerateLastName()
            };
            bulkInsert.Store(person);
        }
    }
}

订阅此集合的情况如下:

using (IDocumentStore store = GetDocumentStore())
{
    store.Initialize();
    var subscriptionId = store.Subscriptions.Create(new SubscriptionCriteria<Person>());
    var personSubscription = store.Subscriptions.Open<Person>(
        subscriptionId, new SubscriptionConnectionOptions()
    {
        BatchOptions = new SubscriptionBatchOptions()
        {
            // Max number of docs that can be sent in a single batch
            MaxDocCount = 16 * 1024,  
            // Max total batch size in bytes
            MaxSize = 4 * 1024 * 1024,
            // Max time the subscription needs to confirm that the batch
            // has been successfully processed
            AcknowledgmentTimeout = TimeSpan.FromMinutes(3)
        },
        IgnoreSubscribersErrors = false,
        ClientAliveNotificationInterval = TimeSpan.FromSeconds(30)
    });
    personSubscription.Subscribe(new PersonObserver());
    while (true)
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(500));
    }
}

请注意PersonObserver;这只是IObserver的基本实现,如下所示:

public class PersonObserver : IObserver<Person>
{
    public void OnCompleted()
    {
        Console.WriteLine("Completed");
    }
    public void OnError(Exception error)
    {
        Console.WriteLine("Error occurred: " + error.ToString());
    }
    public void OnNext(Person person)
    {
        Console.WriteLine($"Received '{person.Firstname} {person.Lastname}'");
    }
}