对流进行自定义筛选

本文关键字:筛选 自定义 对流 | 更新日期: 2023-09-27 17:59:09

我需要在流上配置一个过滤器列表。

流最初没有任何过滤器,但在一段时间内,它有一个活动过滤器列表。

每个过滤器都有一个严格的有效期。

测试用例如下:

var scheduler = new TestScheduler();
var input = scheduler.CreateColdObservable<char>(
    ReactiveTest.OnNext(0100.Ms(), '1'),
    ReactiveTest.OnNext(0200.Ms(), 'A'),
    ReactiveTest.OnNext(0300.Ms(), '2'),
    ReactiveTest.OnNext(0400.Ms(), 'B'),
    ReactiveTest.OnNext(0500.Ms(), 'A'),
    ReactiveTest.OnNext(0600.Ms(), 'B'),
    ReactiveTest.OnNext(0700.Ms(), '5'),
    ReactiveTest.OnNext(0800.Ms(), 'A'),
    ReactiveTest.OnNext(0900.Ms(), 'C') );
// filters
// A between 70ms -> 550ms
// B between 330ms -> 400ms 
// modeled as a string observable where:
// first word is the char to filter
// second word are the msecs duration of the filter
var filters = scheduler.CreateColdObservable<string>(
    ReactiveTest.OnNext(0070.Ms(), "A 480"),
    ReactiveTest.OnNext(0330.Ms(), "B 70")
);
var expected = scheduler.CreateColdObservable<char>(
    ReactiveTest.OnNext(0100.Ms(), '1'),
    ReactiveTest.OnNext(0300.Ms(), '2'),
    ReactiveTest.OnNext(0600.Ms(), 'B'),
    ReactiveTest.OnNext(0700.Ms(), '5'),
    ReactiveTest.OnNext(0800.Ms(), 'A'),
    ReactiveTest.OnNext(0900.Ms(), 'C') );

你能建议我哪种是最好的处方解决方案吗?

p.s.:我使用以下扩展方法

public static class TickExtensions
{
    public static long Ms(this int ms)
    {
        return TimeSpan.FromMilliseconds(ms).Ticks;
    }
}

对流进行自定义筛选

首先,您的输入应该如下所示:

var input = scheduler.CreateColdObservable<char>(
    ReactiveTest.OnNext(0100.Ms(), '1'),
    ReactiveTest.OnNext(0200.Ms(), 'A'),
    ReactiveTest.OnNext(0300.Ms(), '2'),
    ReactiveTest.OnNext(0400.Ms(), 'B'), //You forgot this line
    ReactiveTest.OnNext(0500.Ms(), 'A'),
    ReactiveTest.OnNext(0600.Ms(), 'B'),
    ReactiveTest.OnNext(0700.Ms(), '5'),
    ReactiveTest.OnNext(0800.Ms(), 'A'),
    ReactiveTest.OnNext(0900.Ms(), 'C'));

此外,正如@LeeCampbell提到的,Ms扩展方法应该返回类型long,而不是TimeSpan

这是我想出的解决方案:

 var activeFilters = filters
    .Select(s => s.Split(' '))
    .Select(s => Tuple.Create(s[0][0], TimeSpan.FromMilliseconds(int.Parse(s[1]))))
    .Select(t => Observable.Timer(t.Item2, scheduler).Select(_ => t.Item1).StartWith(t.Item1))
    .MergeCombineLatest(true)
    .StartWith(new List<char>());
var output = activeFilters.Publish(_activeFilters => 
        input.Join(_activeFilters,
            _ => Observable.Return(1),
            t => _activeFilters,
            (c, filterList) => Tuple.Create(c, filterList)
        )
    )
    .Where(t => !t.Item2.Contains(t.Item1))
    .Select(t => t.Item1);
var observer = scheduler.CreateObserver<char>();
output.Subscribe(observer);
scheduler.Start();
ReactiveAssert.AreElementsEqual(
    expected.Messages,
    observer.Messages);

activeFilters是一个可观察的,它发出当前处于筛选之下的字符列表。当主动过滤的列表发生变化时,activeFilters会发出一个新的列表。请注意,由于同一个字符可以有多个筛选器,因此列表不一定是唯一的。

一旦你能在任何给定的时间弄清楚哪些过滤器是活动的,你就可以将该列表加入到输入中。

该代码需要此扩展方法,该方法使用System.Collections.Immutable Nuget包:

public static IObservable<IList<T>> MergeCombineLatest<T>(this IObservable<IObservable<T>> outer, bool removeCompleted)
{
    return outer
        .SelectMany((inner, i) => inner
            .Materialize()
            .SelectMany(nt => nt.Kind == NotificationKind.OnNext
                ? Observable.Return(Tuple.Create(i, nt.Value, true))
                : nt.Kind == NotificationKind.OnCompleted
                    ? removeCompleted
                        ? Observable.Return(Tuple.Create(i, default(T), false))
                        : Observable.Empty<Tuple<int, T, bool>>()
                    : Observable.Throw<Tuple<int, T, bool>>(nt.Exception)
            )
        )
        .Scan(ImmutableDictionary<int, T>.Empty, (dict, t) => t.Item3 ? dict.SetItem(t.Item1, t.Item2) : dict.Remove(t.Item1))
        .Select(dict => dict.Values.ToList());
}

MergeCombineLatest获取可观测的可观测值,并从每个子可观测值中发出最新值的列表。如果removeCompleted为真,那么当一个子可观察对象完成时,列表将缩小一。如果removeCompleted为false,则最后一个值将永远保留在列表和连续列表中。

如果有更友好的方式,我将不胜感激。