在 IEnumerable 上创建一个可变宽度滚动窗口

本文关键字:窗口 滚动 一个 IEnumerable 创建 | 更新日期: 2023-09-27 18:33:42

>我在 IEnumerable 中有时间序列数据,采样不均匀(即我可能在前 10 秒内有 5 个样本,然后在接下来的 10 秒内有 10 个样本等(

我想在 30 秒滚动窗口中创建一个滚动平均值、最大值和最小值

我相信跳过每次都从头开始枚举。

是否可以从 skip 中获取结果并在不调用它的情况下再次使用它?

在 c# 中是否可以复制迭代器,因为我希望有一个开始窗口和结束窗口迭代器,然后在它们之间枚举,这意味着我不必每次都从头开始迭代,

我的代码目前有效,看起来很行

class Data
{
    float Value;
    DateTime Time;
};

BufferData = new IEnumerable<...>
int index = 0;
TimeSpan windowWidth = new TimeSpan(0,0,30);
DateTime currentStart;
while (index < BufferData.Count)
{
    currentStart = BufferData.Skip(index).First().Time;
    var window = BufferData.Skip(index).TakeWhile(x => x.Time<= currentStart + windowWidth);
    DateTime centre = currentStart + new TimeSpan((window.Last().Time- currentStart).Ticks / 2);
    float min = window.Min(x => x.Value);
    float max = window.Max(x => x.Value);
    ++index;
}

在 IEnumerable 上创建一个可变宽度滚动窗口

如果你

很乐意使用Microsoft的反应式框架团队的"交互式扩展"(NuGet "Ix-Main"(,那么这是一个相当简单的解决方案:

var windows =
    BufferData
        .Scan(new List<Data>(), (accumulator, item) =>
            accumulator
                .Where(x => x.Time.AddSeconds(30.0) >= item.Time)
                .Concat(new[] { item })
                .ToList())
        .Select(xs => new
        {
            Centre = xs.First().Time.AddSeconds(
                xs.Last().Time.Subtract(xs.First().Time).TotalSeconds / 2.0),
            Max = xs.Max(x => x.Value),
            Mix = xs.Min(x => x.Value),
        });

扫描运算符与标准.Aggregate运算符一样使用 和累加器,但为每个输入生成一个值。

这应该与当前代码的结果相同。

它也只迭代原始源一次(尽管在 30 秒的窗口中有多次迭代(。

编辑 - 正如@PeterDuniho指出的那样,这不是真正的"滚动平均值"。它不会随着添加的每个新项目而重新计算。它只是每 30 秒(或任何WindowWidth设置为(提供统计信息快照。我将暂时保留这个答案,以防它可能有用,但这并不是真正要求的。


我相信以下解决方案对于大型数据集应该可以很快执行(它应该是 O(n((。作为概念证明,我在 100 万个项目列表上运行了此测试,它在 LINQPad 4 中在 0.782 秒内完成(在绝对不是最先进的笔记本电脑上(。

public IEnumerable<Stats> CalculateStats(
    List<Data> bufferData,
    DateTime startTime,
    TimeSpan windowWidth)
{
    var finishTime = bufferData.Last().Time;
    return bufferData
        .Select(x => new
        { 
            x.Value,
            WindowIndex = GetWindowIndex(x.Time, startTime, windowWidth)
        })
        .GroupBy(
            x => x.WindowIndex,
            (i, items) => new Stats
            { 
                StartTime = GetWindowTime(startTime, windowWidth, i),
                FinishTime = GetWindowTime(startTime, windowWidth, i + 1),
                Mean = (float)items.Average(x => x.Value),
                Max = (float)items.Max(x => x.Value),
                Min = (float)items.Min(x => x.Value)
            });
}
private int GetWindowIndex(DateTime time, DateTime startTime, TimeSpan windowWidth)
{
    var timeSinceStart = time - startTime;
    var secondsSinceStart = timeSinceStart.TotalSeconds;
    return (int)Math.Ceiling(secondsSinceStart / windowWidth.TotalSeconds);
}
private DateTime GetWindowTime(DateTime startTime, TimeSpan windowWidth, int windowIndex)
{
    return startTime + TimeSpan.FromSeconds(windowWidth.TotalSeconds * windowIndex);
}
public class Stats
{
    public DateTime StartTime { get; set; }
    public DateTime FinishTime { get; set; }
    public float Mean { get; set; }
    public float Max { get; set; }
    public float Min { get; set; }
}
public class Data
{
    public float Value { get; set; }
    public DateTime Time { get; set; }
}

使用队列,您可以最大限度地减少内存分配的数量:

static IEnumerable<TimeSample> TimeRollingWindow (IEnumerable<Data> data, TimeSpan interval)
{

    Queue<Data> buffer = new Queue<Data>();
    foreach(var item in data) 
    {
        buffer.Enqueue(item);
        // Remove the old data
        while (buffer.Count > 0 && (item.Time - buffer.Peek().Time > interval))
        {
            buffer.Dequeue();
        }
        float max = float.MinValue;
        float min = float.MaxValue;
        double sum = 0;
        foreach(var h in buffer)
        {
            sum += h.Value;
            max = Math.Max(max, h.Value);
            min = Math.Min(min, h.Value);
        }
        // spit it out
        yield return new TimeSample(buffer.Peek().Time, item.Time, min, max, (float)(sum / buffer.Count));
    }
}

class TimeSample 
{
    public TimeSample(DateTime startTime, DateTime endTime,  float min, float max, float mean)
    {
        StartTime = startTime;
        EndTime = endTime;
        Min = min;
        Max = max;
        Mean = mean;
    }
    public readonly DateTime StartTime;
    public readonly DateTime EndTime;
    public readonly float Min;
    public readonly float Max;
    public readonly float Mean;
}
class Data
{
    public Data(DateTime time, float value)
    {
        Time = time;
        Value = value;
    }
    public readonly DateTime Time;
    public readonly float Value;
}

不确定我是否完全理解所需的输出,但这是我的尝试。

// Some mock data...
var data = new List<Sample>
{
    new Sample { Time = new DateTime(2016, 1, 1, 0,  1, 00), Value = 10 },
    new Sample { Time = new DateTime(2016, 1, 1, 0,  2, 00), Value = 11 },
    new Sample { Time = new DateTime(2016, 1, 1, 0,  2, 20), Value = 17 },
    new Sample { Time = new DateTime(2016, 1, 1, 0,  2, 30), Value = 13 },
    new Sample { Time = new DateTime(2016, 1, 1, 0,  3, 00), Value = 18 },
    new Sample { Time = new DateTime(2016, 1, 1, 0,  3, 10), Value = 12 },
    new Sample { Time = new DateTime(2016, 1, 1, 0,  4, 00), Value = 19 },
    new Sample { Time = new DateTime(2016, 1, 1, 0,  4, 25), Value = 12 },
    new Sample { Time = new DateTime(2016, 1, 1, 0,  4, 55), Value = 11 },
    new Sample { Time = new DateTime(2016, 1, 1, 0,  5, 00), Value = 12 },
    new Sample { Time = new DateTime(2016, 1, 1, 0,  6, 00), Value = 14 },
    new Sample { Time = new DateTime(2016, 1, 1, 0,  8, 03), Value = 13 },
    new Sample { Time = new DateTime(2016, 1, 1, 0,  8, 44), Value = 17 },
    new Sample { Time = new DateTime(2016, 1, 1, 0,  9, 01), Value = 18 },
    new Sample { Time = new DateTime(2016, 1, 1, 0, 10, 32), Value = 19 },
    new Sample { Time = new DateTime(2016, 1, 1, 0, 10, 54), Value = 15 },
    new Sample { Time = new DateTime(2016, 1, 1, 0, 11, 00), Value = 10 },
    new Sample { Time = new DateTime(2016, 1, 1, 0, 11, 05), Value = 16 },
    new Sample { Time = new DateTime(2016, 1, 1, 0, 11, 10), Value = 14 },
    new Sample { Time = new DateTime(2016, 1, 1, 0, 11, 13), Value = 16 },
    new Sample { Time = new DateTime(2016, 1, 1, 0, 11, 32), Value = 15 },
};
// The code...
var range = new TimeSpan(0, 0, 0, 30);
var results = data
    .Select(sample => new
    {
        Time = sample.Time,
        Set = data.Where(relatedSample => relatedSample.Time >= (sample.Time - range) && relatedSample.Time <= (sample.Time + range))
                    .Select(relatedSample => relatedSample.Value)
    })
    .Select(stat => new
    {
        Time = stat.Time,
        Avg = stat.Set.Average(),
        Min = stat.Set.Min(),
        Max = stat.Set.Max(),
        Count = stat.Set.Count()
    });

这将返回一个枚举对象,其中包含每个样本,以及前后 30 秒的最小值、最大值、平均值和样本计数。这可能不是最有效的方法,但它非常简单。它将样本的"窗口"检索到临时列表中,然后对其执行统计信息。因此,至少它不会针对每个样本对整个列表运行超过一次。不过,它绝对可以加起来,每个窗口中都有很多示例。

看看 NuGet 上的 MoreLINQ 库。它将允许您通过编写累加器函数来计算运行总计和平均值。

任何"滚动窗口"聚合类型的诀窍都是简单地编写一个累加器函数,该函数将序列中的值保留在队列缓冲区中,只要它们保持在所需的窗口范围内。当序列元素不再满足范围条件时,将从缓冲区中取消排队,并且其值将从任何聚合或总计中删除(取消累积(。

在我进入任何代码之前,我需要发布一个免责声明,说明下面的所有内容都直接输入到回复窗口中,这意味着它甚至可能无法编译。总体概念是合理的,但这是我所能保证的。

这样,使用您的Data类和BufferData种子以及 MoreLINQ 中的 .Scan() 函数:

//First we need a type to hold the results:
class Result
{
   double min;
   double max;
   DateTime first; //needed for centre
   DateTime centre;
  //Important because this is what really defines the window range:
  //   this sample and everything 30s prior (or as determined by the InWindow predicate)
   DateTime last;  
   //for fun, because once we have the others, these are easy and fast to do at the same time
   double sum;
   int count;
   double avg;
}
// we also want to define our window range
// For this example, the head of the queue is still part of the range if it's within 30 seconds of the current sample
Func<Data, Data, bool> InWindow = (head, cur) => (head.Time.AddSeconds(30) >= cur.Time);
// and a place to accumulate our buffer (hurray for closures!)
var accBuffer = new Queue<Data>();
// now get the data
IEnumerable<Data> BufferData = ...;
// let's get to it!
var results = BufferData.Scan(new Result() {min = double.MaxValue, max = double.MinValue}, 
(acc, data) => {
    //Use flags to avoid iterating the queue if possible
    bool minmaxValid = true;
    while (accBuffer.Count > 0 && !InWindow(accBuffer.Peek(), data)
    {
       var old = accBuffer.Dequeue();
       acc.sum -= old.Value;
       acc.count--;
       //once an old min or max falls out of the window, we'll have to re-check the entire window :(
       if (old.Value == acc.min) minmaxValid = false;
       if (old.Value == acc.max) minmaxValid = false;
    }
    accBuffer.Enqueue(data);
    acc.count++;
    acc.sum += data.Value;
    acc.first = accBuffer.Peek().Time;
    acc.last = data.Time;
    acc.centre = acc.First.AddTicks( (new TimeSpan(data.Time- acc.first)).Ticks / 2);
    if (minmaxValid && data.Value < acc.min) acc.min = data.Value;
    if (minmaxValid && data.Value > acc.max) acc.max = data.Value;      
    // have to check the whole queue :(
    if (!minValid || !maxValid)
    {
        acc.min = double.MaxValue;
        acc.max = double.MinValue;
        //could use accBuffer.Max() and accBuffer.Min, but this avoids iterating the queue twice
        foreach (var d in accBuffer)
        {
           if (d.Value < acc.min) acc.min = d.Value;
           if (d.Value > acc.max) acc.max = d.Value;
        }
    }
    acc.avg = acc.sum / acc.count; 
});

该解决方案的特别之处在于它非常高效。它不完全是O(n( - 但它非常接近!

达到 O(n( 目标的其余障碍是,当(并且仅当(最大值或最小值落出窗口时,需要迭代窗口队列。我认为不可能完全消除这种情况,但我觉得这里仍有改进的余地,如果你能找到一种方法来避免这种情况。根据您需要保留的元素数量和每个元素的相对大小,您实际上可以通过使用某种排序算法做得更好......但我对此表示怀疑。与总和、计数和平均值不同,在这种情况下,最小值和最大值很难有效。


最后,起初我不知道,但是由于@Enigmativity的回答,我现在看到我正在使用的Scan((运算符现在已合并到MS维护的库中。它几乎是一个直接的替代品,可以通过NuGet使用它而不是MoreLINQ。我在这里发布的代码根本没有真正改变,只需将正确的using指令放入文件中即可。

嗯......当我检查这个时,无论如何,MoreLINQ 是最近更新的,所以也许没关系。

我找不到完全线性执行此操作的方法,但至少无论什么重申都只在窗口内,而不是在整个数据集上。此方法提供数据样本在 30 秒内的所有滚动(重叠(窗口。

您可以将其作为扩展方法,也可以仅作为常规方法。为了简单起见,我使用了扩展方法。

static IEnumerable<IEnumerable<T>> Windows<T>(this IEnumerable<T> self, Func<T, DateTime> selector, TimeSpan span) {
    var enumerator = self.GetEnumerator();
    var samples = new List<T>();
    var start = DateTime.MinValue;
    while (enumerator.MoveNext()) {
        var end = selector(enumerator.Current);
        if (end > start + span) {
            start = end - span;
        }
        samples = samples.SkipWhile(i => selector(i) < start).ToList();
        samples.Add(enumerator.Current);
        yield return samples;
    }
}

使用示例:在 30 秒内滚动平均值1

var rollingAverages = BufferData
    .Windows(d => d.Time, new TimeSpan(0, 0, 30))
    .Select(win => win.Average());

这通过将当前元素视为窗口的结束来工作,因此窗口开始时小于给定的范围,但随着时间的推移增长到该最大值。

  • 初始化空示例容器
  • 对于数据集的每个项:
    • 将窗口结束设置为当前项目的时间。
    • 如果可能,请扩大窗口的时间范围。
    • 从样品容器中删除过期的物品
    • 将当前项添加到示例
    • 产量样品容器

1:我的英语统计术语有点生疏,也许这是一个滚动平均值?