如何获取 N 个热可观察<十进制>实例的“最后”项的总和
本文关键字:实例 十进制 最后 观察 获取 何获取 | 更新日期: 2023-09-27 18:31:13
>编辑: 在 09/15/2013 - 我正在描述我的场景,进一步分为几个步骤,以帮助每个人更好地了解我的情况。也添加了整个应用程序下载的源代码。如果要跳转到原始问题,请向下滚动到最后一个标题。请让我知道问题。谢谢
总结
阿拉斯加州首府朱诺有一个AST(阿拉斯加州警察)总部大楼,他们希望在那里显示一个大屏幕,显示和自动更新一个数字。这个数字被称为(犯罪商数指数)或CQI
CQI基本上是一个计算的数字,以显示该州当前的犯罪情况......
它是如何计算的?
运行屏幕的程序是一个 .NET WPF 应用程序,该应用程序通过热 IObservable 流不断接收 CrimeReport 对象。
CQI 是按城市计算的,然后取所有城市的 Sum(),称为状态 CQI以下是计算状态 CQI 的步骤
步骤 1 - 接收犯罪数据
每次报告犯罪时,犯罪报告都会发送到 .NET 应用程序。它具有以下组件
日期犯罪时间
城市 - 辖区城市/县
严重性级别 - 严重/非严重
估计解决时间 - AST 确定解决犯罪所需的估计天数。
因此,在此步骤中,我们订阅 IObservable 并创建 MainViewModel 的实例
IObservable<CrimeReport> reportSource = mainSource.Publish();
MainVM = new MainViewModel(reportSource);
reportSource.Connect();
第 2 步 - 按城市分组并按城市进行数学运算
收到报告后,请按城市对其进行分组,以便
var cities = reportSource.GroupBy(k => k.City)
.Select(g => new CityDto(g.Key, g);
CityDto 是一个 DTO 类,它获取当前城市的所有报告并计算城市的 CQI。
城市 CQI 的计算由以下公式完成
如果严重犯罪总数与非严重犯罪总数的比率小于1
然后
城市的 CQI = 比率 x 估计求解时间的最小值
还
城市的 CQI = 比率 x 最大估计求解时间
这是CityDto的类定义
internal class CityDto
{
public string CityName { get; set; }
public IObservable<decimal> CityCqi {get; set;}
public CityDto(string cityName, IObservable<CrimeReport> cityReports)
{
CityName = cityName;
// Get all serious and non serious crimes
//
var totalSeriousCrimes = cityReports.Where(c => c.Severity == CrimeSeverity.Serious)
.Scan(0, (p, _) => p++);
var totalnonSeriousCrimes = cityReports.Where(c => c.Severity == CrimeSeverity.NonSerious)
.Scan(0, (p, _) => p++);
// Get the ratio
//
var ratio = Observable.CombineLatest(totalSeriousCrimes, totalnonSeriousCrimes,
(s, n) => n == 0? s : s/n); // Avoding DivideByZero here
// Get the minimum and maximum estimated solve time
//
var minEstimatedSolveTime = cityReports.Select(c => c.EstimatedSolveTime)
.Scan(5000, (p, n) => n < p? n : p);
var maxEstimatedSolveTime = cityReports.Select(c=>c.EstimatedSolveTime)
.Scan(0, (p, n) => n > p? n : p);
//Time for the City's CQI
//
CityCqi = Observable.CombineLatest(ratio, minEstimatedSolveTime, maxEstimatedSolveTime, (r, n, x) => r < 1.0? r * n : r * m);
}
}
现在我们有城市 DTO 对象维护城市的 CQI 值并通过 IObservable 公开该实时 CQI,阿拉斯加州首府希望汇总() 所有城市的 CQI,以将其显示为阿拉斯加的 CQI 并在屏幕上实时显示,并且参与 CQI 计划的城市/县任何地方报告的每起犯罪都应立即对该州的 CQI 产生影响
步骤 3 - 汇总州的城市数据
现在我们必须计算整个州的CQI,它在大屏幕上实时更新,我们有状态的视图模型,称为MainViewModel
internal class MainViewModel
{
public MainViewModel(IObservable<CrimeReport> mainReport)
{
/// Here is the snippet also mentioned in Step 2
//
var cities = mainReport.GroupBy(k => k.City)
.Select(g => new CityDto(g.Key, g));
///// T h i s ///// Is //// Where //// I /// am /// Stuck
//
var allCqis = cities.Select(c => c.CityCqi); // gives you IObservable<IObservable<decimal>> ,
/// Need to use latest of each observable in allCqi and sum them up
//// How do I do it ?
}
}
约束
- 目前并非阿拉斯加的所有城市都参加该州的 CQI 计划,但城市每天都在注册,所以我不能列出并且无论注册人数如何添加所有城市也不切实际。因此,IObservable仅维护那些不仅参与而且至少发送了一个犯罪报告对象的城市。
完整源代码
源可以点击这里下载
最初提出的问题
我有一个多个热可观察量的单个热可观察量...
IObservable<IObservable<decimal>>
我想要一个可观察量,当订阅时,将使其观察者了解内部所有可观察量中所有"最新"十进制数的总和。
我怎样才能做到这一点?我尝试了CombineLatest(...),但无法正确处理。
谢谢
Rxx 库的重载为 CombineLatest()
,需要IObservable<IObservable<T>>
。 如果您使用此重载,则解决方案很简单:
var runningSum = allCqis
.Select(cqi => cqi.StartWith(0)) // start each inner sequence off with 0
.CombineLatest() // produces an IObservable<IList<decimal>>
.Select(cqis => cqis.Sum()); // LINQ operator Sum(IEnumerable<decimal>)
查看Rxx.CombineLatest
源代码可能有助于了解如何"在引擎盖下"解决问题
很多问题!也许是时候复习一下你的 Rx 技能了?您最近几乎所有的问题都包含在我的网站 IntroToRx.com 中。深入了解 Rx,将使您能够比在论坛上提问更快地回答这些相当简单的问题。您应该能够在不到 3 天的时间内阅读这本书。
无论如何。。。。
1 你想要一个运行的总和还是最后只有一个总和?
2 那么,您想要所有流的所有值的总和,还是每个流的总和?
若要获取序列的单个总和值,请使用 .Sum()
运算符。 http://introtorx.com/Content/v1.0.10621.0/07_Aggregation.html#MaxAndMin
若要获取汇总,请使用 Scan
运算符。 http://introtorx.com/Content/v1.0.10621.0/07_Aggregation.html#Scan
所以答案可能是这样的(未经测试):
sources.Select(source=>source.Scan(0m, (acc, value)=>acc+=value)).Merge();