处理后流式传输一个IConnectableObservable

本文关键字:一个 IConnectableObservable 传输 处理 | 更新日期: 2023-09-27 18:16:17

我试图写一个方法,它需要一个IConnectableObservable,对它做一些处理,并返回一个新的IConnectableObservable,流处理的数据加上一些额外的项目。被流式传输的序列是有限的,但它有副作用,所以它只需要运行一次。然而,我正试图用它做两件事:

  1. 使用Select查询转换流中的每个元素。
  2. 将流中的每个元素收集到一个数组中,并对数组进行一些处理并将结果流式传输。

下面是我最好的尝试,但我觉得可能有一种更好的方法,我还没有想到。

protected override IConnectableObservable<ITestResult<ITestOutput<double, double>, ITestLimit<double>>> ProcessOutput(
    IConnectableObservable<ITestOutput<double, double>> output, InputVerificationTestCase testCase)
{
    var obsResults = output.Select(o =>
    {
        var limits = GenerateDcAccuracyLimits.CalculateAbsoluteLimits(o.Input, testCase.FullScaleRange, testCase.Limits);
        return new TestResult<ITestOutput<double, double>, ITestLimit<double>>
        {
            Component = "Variable Gain Module Input",
            Description = "Measurement Accuracy",
            Limits = limits,
            Output = o,
            Passed = _validationService.Validate(o.Result, limits)
        };
    });
    var observable = Observable.Create<ITestResult<ITestOutput<double, double>, ITestLimit<double>>>(obs =>
    {
        var resultTask = obsResults.ForEachAsync(obs.OnNext);
        var fitTask = output.ToArray().ForEachAsync(arr =>
        {
            resultTask.Wait();
            var fit = ComputeErrorFit(arr, testCase);
            obs.OnNext(GetGainErrorResult(fit.Item2, testCase));
        });
        output.Connect();
        Task.WaitAll(resultTask, fitTask);
        obs.OnCompleted();
        return Disposable.Empty;
    });
    return observable.Publish();
}

编辑10/7/2015:

下面是剩下的代码:
private ITestResult<ITestOutput<double, double>, ITestLimit<double>> GetGainErrorResult(double gainError, InputVerificationTestCase testCase)
{
    var gainErrorLimit = GenerateDcAccuracyLimits.CalculateGainErrorLimits(testCase.FullScaleRange, testCase.Limits);
    return new TestResult<ITestOutput<double, double>, ITestLimit<double>>
    {
        Component = "Variable Gain Module Input",
        Description = "Gain Error",
        Passed = _validationService.Validate(gainError, gainErrorLimit),
        Output = new TestOutput<double, double> { Input = 0, Result = gainError },
        Limits = gainErrorLimit
    };
}
private Tuple<double, double> ComputeErrorFit(ITestOutput<double, double>[] outputs, InputChannelTestCase testCase)
{
    var input = outputs.Select(o => o.Input);
    var vErr = outputs.Select(o => o.Result - o.Input);
    return Fit.Line(input.ToArray(), vErr.ToArray());
}

同样在抽象基类中,我有以下内容:

public IConnectableObservable<TOutput> RunSingleChannel(TCase testCase)
{
    dut.Acquisition.SampleRate.Value = SampleRate;
    dut.AnalogChannels[testCase.Channel].InputRange.Value = testCase.FullScaleRange;
    var testOutput = CreateTestProcedure(testCase.Channel).RunAsync(testCase.InputVoltages);
    return ProcessOutput(testOutput.Replay(), testCase);
}
protected abstract IConnectableObservable<TOutput> ProcessOutput(IConnectableObservable<ITestOutput<double, TAcq>> output, TCase testCase);

处理后流式传输一个IConnectableObservable

看来你要用Rx来做一些困难的事情。你真的需要避免把任务和可观察对象混在一起。它们使你的代码难以推理,并经常导致死锁和其他并发问题。

你应该试试这样做:

protected override IConnectableObservable<ITestResult<ITestOutput<double, double>, ITestLimit<double>>> ProcessOutput(
    IConnectableObservable<ITestOutput<double, double>> output, InputVerificationTestCase testCase)
{
    var source = output.RefCount();
    return
        source
            .Select(o =>
            {
                var limits = GenerateDcAccuracyLimits.CalculateAbsoluteLimits(o.Input, testCase.FullScaleRange, testCase.Limits);
                return new TestResult<ITestOutput<double, double>, ITestLimit<double>>
                {
                    Component = "Variable Gain Module Input",
                    Description = "Measurement Accuracy",
                    Limits = limits,
                    Output = o,
                    Passed = _validationService.Validate(o.Result, limits)
                };
            })
            .Merge(
                source
                    .ToArray()
                    .Select(arr => GetGainErrorResult(ComputeErrorFit(arr, testCase).Item2, testCase)))
            .Publish();
}

您正在使用可连接的可观察对象,这有点奇怪,但上面应该大致完成了您需要的工作。

我使用下面的示例测试了代码:

public IConnectableObservable<int> ProcessOutput(IConnectableObservable<int> output)
{
    var source = output.RefCount();
    return
        source
            .Merge(source.ToArray().Select(arr => arr.Sum()))
            .Publish();
}
void Main()
{
    var output = Observable.Range(1, 10).Publish();
    var processed = ProcessOutput(output);
    processed.Subscribe(x => Console.WriteLine(x));
    processed.Connect();
}
输出:

1
2
3
4
5
6
7
8
9
10
55

我还检查了原始的可观察值只产生一次

相关文章: