ReactiveExtensions:阻止一个可观察对象在它剥离的任务完成之前返回

本文关键字:剥离 任务 返回 对象 观察 一个 ReactiveExtensions | 更新日期: 2023-09-27 18:01:59

我是Rx . net的新手,但我有一个我认为值得使用的业务场景。然而,我仍然无法理解最初的设计。

  • 我有一个很大的项目集,比如600k。
  • 我有一种方法从数据库中批量提取这些(让我们说1000次)
  • 我会在这些项目上并行运行一个进程,一次最多x个数量(假设一次50个)
  • 当我们完成时,我需要知道,因为我需要吐出一些额外的统计数据,并确保长时间运行的进程返回。

这似乎是响应式扩展的理想选择——我有:

  • 随着时间的推移提供列表的东西
  • 当这些条目进入时,我想对它们做的一系列操作
  • 需要处理错误
  • 需要处理完成。

从哪里开始

这看起来就像我有一个项目列表作为一个可观察对象,我的"循环"进程从数据库中获取这些项目,将它们"推"到这个可观察对象中,然后对该可观察对象的订阅将接管。

我被卡住的地方

  • 我对语法有点不确定
  • 我有点不确定如何处理x度的并行限制
  • 我不确定我怎么知道我什么时候点击完成。我的循环是从数据库调用"OnComplete()"而不是"OnNext"在那一点?

我希望有人能帮我从概念上打破我正在寻找的东西,这样我就能更好地理解它。谢谢!

代码v3——好多了,但是方法仍然退出得太快。

我开始感觉好多了,但我知道还不够。e

public override async Task ProcessAsync(DataLoadRequest dataLoadRequest, Func<string, Task> createTrackingPayload)
{
    _requestParameters = Deserialize<SchoolETLRequestParameters>(dataLoadRequest.DataExtractorParams);
    WireUpDependencies();
    //This is the new retriever which allows records to be "paged" (e.g. returns empty list for pageNum > 0 on the ones that don't have paging.)
    _recordsToProcessRetriever = new SettingBasedRecordsRetriever(_propertyRepository, _requestParameters.RunType, _requestParameters.ResidentialProfileIDOverrides, _processorSettings.MaxBatchesToProcess, _etlLogger);
    var query = Observable.Range(0, int.MaxValue)
        .Select(pageNum => _recordsToProcessRetriever.GetResProfIDsToProcess(pageNum, _processorSettings.BatchSize))
        .TakeWhile(resProfList => resProfList.Any())
        .SelectMany(records => records)
        .Select(resProf => Observable.Start(() => Task.Run(()=> _schoolDataProcessor.ProcessSchoolsAsync(resProf)).Result))
        .Merge(maxConcurrent: _processorSettings.ParallelProperties);
    var subscription = query.Subscribe(async trackingRequests =>
        {
            await CreateRequests(trackingRequests, createTrackingPayload);
            var numberOfAttachments = SumOfRequestType(trackingRequests, TrackingRecordRequestType.AttachSchool);
            var numberOfDetachments = SumOfRequestType(trackingRequests, TrackingRecordRequestType.DetachSchool);
            var numberOfAssignmentTypeUpdates = SumOfRequestType(trackingRequests, TrackingRecordRequestType.UpdateAssignmentType);
            _etlLogger.Info("Extractor generated {0} attachments, {1} detachments, and {2} assignment type changes.",
                numberOfAttachments, numberOfDetachments, numberOfAssignmentTypeUpdates);
        },
        () =>
        {
            _etlLogger.Info("Finished! Woohoo!");
        });
}

v3的问题

    ProcessAsync方法仍然在所有项目在后台处理之前完成。通常我会很好,但在我们的情况下,我使用的框架需要等到所有的跟踪请求已经创建(例如,直到CreateTrackingRequests已被调用的每一批结果)。

是否可以在此范围内等待所有操作完成?

更新:关于问题的附加信息

在这种情况下,我们不知道什么将产生可观察对象,直到运行时。应用程序通过命令传递,该命令相当于:

  • "New Records":点击返回特定进程结果的方法
  • "特定记录":用于检测;
  • "All Records":点击进入连续分页循环的方法,循环x页中的600k条记录(由设置定义)。

前两种情况听起来像是我可以很容易地将它们直接传递给一个可观察对象而没有问题。然而,在这种情况下,最后一个似乎必须循环遍历一堆可观察对象集,这不是我想要的行为(我希望所有600,000个项目最终在一个大队列中结束,一次处理50个)。

我的希望是我可以有一个方法"把东西扔到队列上",并让处理任务连续地从50个批次中提取。

注意:所有调用进程的方法返回完全相同的东西——IThing列表(必要时混淆)。

我已经将所有这些存储库函数等连接到我的处理器AS依赖项中,因此调用ProcessStuffForMyThing(List<IThing>)可以处理整个过程,并且可以使用相同的对象并行工作(不需要每次都新建)。

ReactiveExtensions:阻止一个可观察对象在它剥离的任务完成之前返回

您的代码有许多需要修复的问题。你犯的错误我已经见过很多次了——每个人似乎都在走同样的路。这实际上可以归结为将你的思维从过程性转变为功能性。

首先,Rx有许多操作符,旨在使您的工作更轻松。其中之一是Observable.Using。它的工作是启动一个可丢弃的资源,构建一个可观察对象,并在可观察对象完成时处置该资源。非常适合从数据库中读取记录。

你的代码似乎有一个已经打开的数据库连接,你正在通过一个主题泵出记录。您应该避免使用外部状态(数据处理器),并且应该避免使用主题。几乎总有一个可观察操作符可以使用。

另一件你可能不应该做的事情是混合你的单子——或者更具体地说,是可观察对象和任务。在Rx中有一些操作符用于将任务转换为可观察对象,但它们是用于与现有代码进行接口的,不应该在可观察对象中用作工具。规则是尝试进入一个可观察对象,并呆在那里,直到你准备好订阅你的数据。

我觉得你的代码有点碎片化,无法准确理解在哪里调用了什么,所以我写了一段通用的代码,我认为它涵盖了你需要的东西。下面是查询:

var pageSize = 4;
Func<Record, Result> process = r =>
{
    Thread.Sleep(100); // Only here to demonstrate parallelism
    return new Result(r.ID);
};
var query =
    Observable
        .Using(
            () => new DataProcessor(),
            dc =>
                Observable
                    .Range(0, int.MaxValue)
                    .Select(n => dc.GetRecords(n, pageSize))
                    .TakeWhile(rs => rs.Any())
                    .SelectMany(rs => rs)
                    .Select(r => Observable.Start(() => process(r)))
                    .Merge(maxConcurrent: 4));
var subscription =
    query
        .Subscribe(
            r => Console.WriteLine(r.ID),
            () => Console.WriteLine("Done."));

我显然对你的代码采取了一些捷径,但本质上是一样的(我希望)。

如果添加以下类,此代码可运行:

public class DataProcessor : IDisposable
{
    public DataProcessor() { Console.WriteLine("Opened."); }
    public void Dispose() { Console.WriteLine("Closed."); }
    public IEnumerable<Record> GetRecords(int page, int count)
    {
        Console.WriteLine("Reading.");
        Thread.Sleep(100);
        var records = page <= 5
            ? Enumerable
                .Range(0, count < 5 ? count : count / 2)
                .Select(x => new Record())
                .ToArray()
            : new Record[] { };
        Console.WriteLine("Read.");
        return records;
    }
}
public class Record
{
    private static int __counter = 0;
    public Record() { this.ID = __counter++; }
    public int ID { get; private set; }
}
public class Result
{
    public Result(int id) { this.ID = id; }
    public int ID { get; private set; } 
}

当我运行它时,我得到这样的结果:

Opened.
Reading.
Read.
Reading.
0
2
3
1
Read.
Reading.
7
Read.
5
6
4
Reading.
10
11
9
8
Read.
Reading.
15
12
Read.
14
Reading.
13
17
19
18
16
Read.
Reading.
21
Read.
20
22
23
Done.
Closed.

您可以看到它正在并行处理。你可以看到可观察对象正在完成。你还可以看到数据库正在打开,然后在可观察对象完成后关闭。

让我知道这是否有帮助。

首先,我不建议滚动您自己的枚举转换。如果您有IEnumerable<T>,您可以使用.ToObservable()扩展,它将为您处理枚举。

其次,您应该在Subscribe方法中处理Observable的结果,现在您的方法将在枚举之后立即返回,因为您似乎没有在async方法中实际等待任何东西。如果你必须使用当前方法签名,那么你可以利用可观察对象也是可等待的。

所以这是我建议的代码结构(warning untesting):
public override async Task ProcessAsync(Request theRequest,
                             Func<string,Task> createTrackingPayload) // not my design^TM
    {
        // ...do stuff with the request, wire up some dependencies, etc.
        //End goal is to call createTrackingPayload with some things.
            await items.ToObservable()
            .Select(thing => Observable.FromAsync(async () =>
            {
                var requests = await _dataProcessor.DoSomethingAsync(thing);
                if (requests != null && requests.Any())
                {
                    var numberOfType1 = SumOfRequestType(requests, TrackingRecordRequestType.Type1);
                    var numberOfType2 = SumOfRequestType(requests, TrackingRecordRequestType.DetachSchool);
                    var numberOfType3 = SumOfRequestType(requests, TrackingRecordRequestType.UpdateAssignmentType);

                    await CreateRequests(requests, createTrackingPayload); // something that will iterate over the list and call the function we need to call.
                    return requests.Count();
                }
                return 0;
            }
            }))
        .Merge(maxConcurrent: _processorSettings.DegreeofParallelism)
        .Do(x => _logger.Info("processed {0} items.", x))
        .Aggregate(0, (acc, x) => acc + x);
    }

基本上,这里的想法是等待Observable的完成,它实际上会给你Observable完成之前的最后一个值。通过添加DoAggregate,您可以将日志记录逻辑移出处理逻辑。

我在这里把功劳归功于Enigmativity,因为他们的答案引导我找到了(大部分)正确的地方。

我需要的代码如下,除了序列多次求值的一个小问题。

var query = Observable.Range(0, int.MaxValue)
    .Select(pageNum =>
        {
            _etlLogger.Info("Calling GetResProfIDsToProcess with pageNum of {0}", pageNum);
            return _recordsToProcessRetriever.GetResProfIDsToProcess(pageNum, _processorSettings.BatchSize);
        })
    .TakeWhile(resProfList => resProfList.Any())
    .SelectMany(records => records.Where(x=> _determiner.ShouldProcess(x)))
    .Select(resProf => Observable.Start(async () => await _schoolDataProcessor.ProcessSchoolsAsync(resProf)))
    .Merge(maxConcurrent: _processorSettings.ParallelProperties)
    .Do(async trackingRequests =>
    {
        await CreateRequests(trackingRequests.Result, createTrackingPayload);
        var numberOfAttachments = SumOfRequestType(trackingRequests.Result, TrackingRecordRequestType.AttachSchool);
        var numberOfDetachments = SumOfRequestType(trackingRequests.Result, TrackingRecordRequestType.DetachSchool);
        var numberOfAssignmentTypeUpdates = SumOfRequestType(trackingRequests.Result,
            TrackingRecordRequestType.UpdateAssignmentType);
        _etlLogger.Info("Extractor generated {0} attachments, {1} detachments, and {2} assignment type changes.",
            numberOfAttachments, numberOfDetachments, numberOfAssignmentTypeUpdates);
    });
var subscription = query.Subscribe(
trackingRequests =>
{
    //Nothing really needs to happen here. Technically we're just doing something when it's done.
}, 
() =>
{
    _etlLogger.Info("Finished! Woohoo!");
});
await query.Wait();