在Rx和LINQ之间共享查询(或者一般来说,流和列表)

本文关键字:一般来说 列表 或者 查询 LINQ Rx 之间 共享 | 更新日期: 2023-09-27 18:17:25

如何在LINQ和Rx之间共享查询逻辑?例如,如果我有时需要查询IObservable流,有时需要查询IEnumerable流,但每个流中都有完全相同的逻辑,是否有任何方法可以共享该逻辑?

也许一个例子会有帮助。在下面的Queries类中,我想将人员和购买的序列组合起来以产生"通知"字符串。注意,我必须复制完全相同的逻辑;唯一不同的是一个是IEnumerable,一个是IObservable。这两个功能有没有办法合并?我试过使用ToObservableToEnumerable的各种组合,但我所尝试的一切似乎要么挂起要么产生没有结果。

(高等类型的问题:这就是创造高等类型的想法要解决的问题吗?也就是说,这在Haskell或Scala中不会是一个问题吗?)

static class Queries {
    static IObservable<string> GetPurchaseNotices(IObservable<Person> people, IObservable<Purchase> purchases) {
        return from person in people
               from purchase in purchases
               where person.Id == purchase.PurchaserId
               select person.Name + " purchased a " + purchase.ItemName;
    }
    static IEnumerable<string> GetPurchaseNotices(IEnumerable<Person> people, IEnumerable<Purchase> purchases) {
        return from person in people
               from purchase in purchases
               where person.Id == purchase.PurchaserId
               select person.Name + " purchased a " + purchase.ItemName;
    }
}
class Person {
    public Person(int id, string name) {
        Id = id;
        Name = name;
    }
    public string Name;
    public int Id;
}
class Purchase {
    public Purchase(int purchaserId, string itemName) {
        PurchaserId = purchaserId;
        ItemName = itemName;
    }
    public int PurchaserId;
    public string ItemName;
}

在Rx和LINQ之间共享查询(或者一般来说,流和列表)

据我所知,这在c#(或f#)中是不可能的。问题是,虽然您可以在IEnumerable<T>IObservable<T>中抽象泛型参数T,但您不能抽象"类型"IEnumerableIObservable

对于更高级的类型,如Haskell或Scala中的类型,您可以为IEnumerbleIObservable提供合适的接口。在Haskell中,它看起来像:

getPurchases :: MonadPlus m => m Person -> m Purchaser -> m String
getPurchases people purchases = do
  person <- people
  purchase <- purchases
  if (personId person) == (purchaserId purchase)
    then return $ (name person) ++ "purchased a " ++ (itemName purchase)
    else mzero

可以同时用于IEnumerableIObservable

我不知道IObservable是否真的满足MonadPlus的要求,所以这只是一个例子。

从可观察对象切换到可枚举对象并不是一个好主意,因为可枚举对象会阻塞,所以反过来问题会少一些。您应该能够使用单个函数来过滤IObservable<Person>

下面是演示这个想法的简单程序。注意,它使用一个方法来过滤IObservable<int>。在IEnumerable<int>的情况下,它在调用方法之前切换ToObservable,然后在获得结果后切换回ToEnumerable
static void Main(string[] args)
{
    var observableNums = Observable.Interval(TimeSpan.FromSeconds(1))
                                   .Select(x => (int)x);
    var observableOdds = FilterOdds(observableNums);
    observableOdds.Subscribe(Console.WriteLine);
    var enumerableNums = new[] { 1, 2, 3, 4, 5, 6 };
    var enumerableOdds = FilterOdds(enumerableNums.ToObservable());
    foreach (var i in enumerableOdds.ToEnumerable())
        Console.WriteLine(i);
    Console.ReadKey();
}
static IObservable<int> FilterOdds(IObservable<int> nums)
{
    return nums.Where(i => i % 2 == 1);
}