处理后流式传输一个IConnectableObservable
本文关键字:一个 IConnectableObservable 传输 处理 | 更新日期: 2023-09-27 18:16:17
我试图写一个方法,它需要一个IConnectableObservable,对它做一些处理,并返回一个新的IConnectableObservable,流处理的数据加上一些额外的项目。被流式传输的序列是有限的,但它有副作用,所以它只需要运行一次。然而,我正试图用它做两件事:
- 使用Select查询转换流中的每个元素。
- 将流中的每个元素收集到一个数组中,并对数组进行一些处理并将结果流式传输。
下面是我最好的尝试,但我觉得可能有一种更好的方法,我还没有想到。
protected override IConnectableObservable<ITestResult<ITestOutput<double, double>, ITestLimit<double>>> ProcessOutput(
IConnectableObservable<ITestOutput<double, double>> output, InputVerificationTestCase testCase)
{
var obsResults = output.Select(o =>
{
var limits = GenerateDcAccuracyLimits.CalculateAbsoluteLimits(o.Input, testCase.FullScaleRange, testCase.Limits);
return new TestResult<ITestOutput<double, double>, ITestLimit<double>>
{
Component = "Variable Gain Module Input",
Description = "Measurement Accuracy",
Limits = limits,
Output = o,
Passed = _validationService.Validate(o.Result, limits)
};
});
var observable = Observable.Create<ITestResult<ITestOutput<double, double>, ITestLimit<double>>>(obs =>
{
var resultTask = obsResults.ForEachAsync(obs.OnNext);
var fitTask = output.ToArray().ForEachAsync(arr =>
{
resultTask.Wait();
var fit = ComputeErrorFit(arr, testCase);
obs.OnNext(GetGainErrorResult(fit.Item2, testCase));
});
output.Connect();
Task.WaitAll(resultTask, fitTask);
obs.OnCompleted();
return Disposable.Empty;
});
return observable.Publish();
}
编辑10/7/2015:
下面是剩下的代码:private ITestResult<ITestOutput<double, double>, ITestLimit<double>> GetGainErrorResult(double gainError, InputVerificationTestCase testCase)
{
var gainErrorLimit = GenerateDcAccuracyLimits.CalculateGainErrorLimits(testCase.FullScaleRange, testCase.Limits);
return new TestResult<ITestOutput<double, double>, ITestLimit<double>>
{
Component = "Variable Gain Module Input",
Description = "Gain Error",
Passed = _validationService.Validate(gainError, gainErrorLimit),
Output = new TestOutput<double, double> { Input = 0, Result = gainError },
Limits = gainErrorLimit
};
}
private Tuple<double, double> ComputeErrorFit(ITestOutput<double, double>[] outputs, InputChannelTestCase testCase)
{
var input = outputs.Select(o => o.Input);
var vErr = outputs.Select(o => o.Result - o.Input);
return Fit.Line(input.ToArray(), vErr.ToArray());
}
同样在抽象基类中,我有以下内容:
public IConnectableObservable<TOutput> RunSingleChannel(TCase testCase)
{
dut.Acquisition.SampleRate.Value = SampleRate;
dut.AnalogChannels[testCase.Channel].InputRange.Value = testCase.FullScaleRange;
var testOutput = CreateTestProcedure(testCase.Channel).RunAsync(testCase.InputVoltages);
return ProcessOutput(testOutput.Replay(), testCase);
}
protected abstract IConnectableObservable<TOutput> ProcessOutput(IConnectableObservable<ITestOutput<double, TAcq>> output, TCase testCase);
看来你要用Rx来做一些困难的事情。你真的需要避免把任务和可观察对象混在一起。它们使你的代码难以推理,并经常导致死锁和其他并发问题。
你应该试试这样做:
protected override IConnectableObservable<ITestResult<ITestOutput<double, double>, ITestLimit<double>>> ProcessOutput(
IConnectableObservable<ITestOutput<double, double>> output, InputVerificationTestCase testCase)
{
var source = output.RefCount();
return
source
.Select(o =>
{
var limits = GenerateDcAccuracyLimits.CalculateAbsoluteLimits(o.Input, testCase.FullScaleRange, testCase.Limits);
return new TestResult<ITestOutput<double, double>, ITestLimit<double>>
{
Component = "Variable Gain Module Input",
Description = "Measurement Accuracy",
Limits = limits,
Output = o,
Passed = _validationService.Validate(o.Result, limits)
};
})
.Merge(
source
.ToArray()
.Select(arr => GetGainErrorResult(ComputeErrorFit(arr, testCase).Item2, testCase)))
.Publish();
}
您正在使用可连接的可观察对象,这有点奇怪,但上面应该大致完成了您需要的工作。
我使用下面的示例测试了代码:
public IConnectableObservable<int> ProcessOutput(IConnectableObservable<int> output)
{
var source = output.RefCount();
return
source
.Merge(source.ToArray().Select(arr => arr.Sum()))
.Publish();
}
void Main()
{
var output = Observable.Range(1, 10).Publish();
var processed = ProcessOutput(output);
processed.Subscribe(x => Console.WriteLine(x));
processed.Connect();
}
输出:1
2
3
4
5
6
7
8
9
10
55
我还检查了原始的可观察值只产生一次