如何获取 N 个热可观察<十进制>实例的“最后”项的总和

本文关键字:实例 十进制 最后 观察 获取 何获取 | 更新日期: 2023-09-27 18:31:13

>编辑: 在 09/15/2013 - 我正在描述我的场景,进一步分为几个步骤,以帮助每个人更好地了解我的情况。也添加了整个应用程序下载的源代码。如果要跳转到原始问题,请向下滚动到最后一个标题。请让我知道问题。谢谢

总结

阿拉斯加州首府朱诺有一个AST(阿拉斯加州警察)总部大楼,他们希望在那里显示一个大屏幕,显示和自动更新一个数字。这个数字被称为(犯罪商数指数)或CQI

CQI基本上是一个计算的数字,以显示该州当前的犯罪情况......

它是如何计算的?

运行屏幕的程序是一个 .NET WPF 应用程序,该应用程序通过热 IObservable 流不断接收 CrimeReport 对象。

CQI 是按城市计算的,然后取所有城市的 Sum(),称为状态 CQI以下是计算状态 CQI 的步骤

步骤 1 - 接收犯罪数据

每次报告犯罪

时,犯罪报告都会发送到 .NET 应用程序。它具有以下组件

日期犯罪时间

城市 - 辖区城市/县

严重性级别 - 严重/非严重

估计解决时间 - AST 确定解决犯罪所需的估计天数。

因此,在此步骤中,我们订阅 IObservable 并创建 MainViewModel 的实例

IObservable<CrimeReport> reportSource = mainSource.Publish();
  MainVM = new MainViewModel(reportSource);
reportSource.Connect();

第 2 步 - 按城市分组并按城市进行数学运算

收到报告后,请按城市对其进行分组,以便

var cities = reportSource.GroupBy(k => k.City)
                  .Select(g => new CityDto(g.Key, g);

CityDto 是一个 DTO 类,它获取当前城市的所有报告并计算城市的 CQI。

城市 CQI 的计算由以下公式完成

如果严重犯罪总数与非严重犯罪总数的比率小于1

然后

城市的 CQI = 比率 x 估计求解时间的最小值

城市的 CQI = 比率 x 最大估计求解时间

这是CityDto的类定义

internal class CityDto 
{
 public string CityName { get; set; }
 public IObservable<decimal> CityCqi {get; set;}
 public CityDto(string cityName, IObservable<CrimeReport> cityReports)
 {
   CityName = cityName;
   // Get all serious and non serious crimes
   //
     var totalSeriousCrimes = cityReports.Where(c => c.Severity == CrimeSeverity.Serious)
     .Scan(0, (p, _) => p++);
     var totalnonSeriousCrimes = cityReports.Where(c => c.Severity == CrimeSeverity.NonSerious)
     .Scan(0, (p, _) => p++);
     // Get the ratio
     //
     var ratio = Observable.CombineLatest(totalSeriousCrimes, totalnonSeriousCrimes, 
                   (s, n) => n ==  0? s : s/n); // Avoding DivideByZero here
     // Get the minimum and maximum estimated solve time
     //
       var minEstimatedSolveTime = cityReports.Select(c => c.EstimatedSolveTime) 
                     .Scan(5000, (p, n) => n < p? n : p);
       var maxEstimatedSolveTime = cityReports.Select(c=>c.EstimatedSolveTime)
                     .Scan(0, (p, n) => n > p? n : p);
    //Time for the City's CQI
    // 
      CityCqi = Observable.CombineLatest(ratio, minEstimatedSolveTime, maxEstimatedSolveTime, (r, n, x) => r < 1.0? r * n : r * m);
 }
}

现在我们有城市 DTO 对象维护城市的 CQI 值并通过 IObservable 公开该实时 CQI,阿拉斯加州首府希望汇总() 所有城市的 CQI,以将其显示为阿拉斯加的 CQI 并在屏幕上实时显示,并且参与 CQI 计划的城市/县任何地方报告的每起犯罪都应立即对该州的 CQI 产生影响

步骤 3 - 汇总州的城市数据

现在我们必须计算整个州的CQI,它在大屏幕上实时更新,我们有状态的视图模型,称为MainViewModel

internal class MainViewModel
{
    public MainViewModel(IObservable<CrimeReport> mainReport)
    {
         /// Here is the snippet also mentioned in Step 2
         //
           var cities = mainReport.GroupBy(k => k.City)
                  .Select(g => new CityDto(g.Key, g));
         ///// T h i s ///// Is //// Where //// I /// am /// Stuck
         //
          var allCqis = cities.Select(c => c.CityCqi); // gives you IObservable<IObservable<decimal>> ,
         /// Need to use latest of each observable in allCqi and sum them up
           //// How do I do it ?
     }
}

约束

  • 目前并非阿拉斯加的所有城市都参加该州的 CQI 计划,但城市每天都在注册,所以我不能列出并且无论注册人数如何添加所有城市也不切实际。因此,IObservable仅维护那些不仅参与而且至少发送了一个犯罪报告对象的城市。

完整源代码

源可以点击这里下载

最初提出的问题

我有一个多个热可观察量的单个热可观察量...

IObservable<IObservable<decimal>>

我想要一个可观察量,当订阅时,将使其观察者了解内部所有可观察量中所有"最新"十进制数的总和。

我怎样才能做到这一点?我尝试了CombineLatest(...),但无法正确处理。

谢谢

如何获取 N 个热可观察<十进制>实例的“最后”项的总和

Rxx 库的重载为 CombineLatest(),需要IObservable<IObservable<T>>。 如果您使用此重载,则解决方案很简单:

var runningSum = allCqis
    .Select(cqi => cqi.StartWith(0)) // start each inner sequence off with 0
    .CombineLatest() // produces an IObservable<IList<decimal>>
    .Select(cqis => cqis.Sum()); // LINQ operator Sum(IEnumerable<decimal>)

查看Rxx.CombineLatest源代码可能有助于了解如何"在引擎盖下"解决问题

很多问题!也许是时候复习一下你的 Rx 技能了?您最近几乎所有的问题都包含在我的网站 IntroToRx.com 中。深入了解 Rx,将使您能够比在论坛上提问更快地回答这些相当简单的问题。您应该能够在不到 3 天的时间内阅读这本书。

无论如何。。。。

1 你想要一个运行的总和还是最后只有一个总和?

2 那么,您想要所有流的所有值的总和,还是每个流的总和?

若要获取序列的单个总和值,请使用 .Sum() 运算符。 http://introtorx.com/Content/v1.0.10621.0/07_Aggregation.html#MaxAndMin

若要获取汇总,请使用 Scan 运算符。 http://introtorx.com/Content/v1.0.10621.0/07_Aggregation.html#Scan

所以答案可能是这样的(未经测试):

sources.Select(source=>source.Scan(0m, (acc, value)=>acc+=value)).Merge();