在 IEnumerable 上创建一个可变宽度滚动窗口
本文关键字:窗口 滚动 一个 IEnumerable 创建 | 更新日期: 2023-09-27 18:33:42
>我在 IEnumerable 中有时间序列数据,采样不均匀(即我可能在前 10 秒内有 5 个样本,然后在接下来的 10 秒内有 10 个样本等(
我想在 30 秒滚动窗口中创建一个滚动平均值、最大值和最小值
我相信跳过每次都从头开始枚举。
是否可以从 skip 中获取结果并在不调用它的情况下再次使用它?
在 c# 中是否可以复制迭代器,因为我希望有一个开始窗口和结束窗口迭代器,然后在它们之间枚举,这意味着我不必每次都从头开始迭代,
我的代码目前有效,看起来很行
class Data
{
float Value;
DateTime Time;
};
BufferData = new IEnumerable<...>
int index = 0;
TimeSpan windowWidth = new TimeSpan(0,0,30);
DateTime currentStart;
while (index < BufferData.Count)
{
currentStart = BufferData.Skip(index).First().Time;
var window = BufferData.Skip(index).TakeWhile(x => x.Time<= currentStart + windowWidth);
DateTime centre = currentStart + new TimeSpan((window.Last().Time- currentStart).Ticks / 2);
float min = window.Min(x => x.Value);
float max = window.Max(x => x.Value);
++index;
}
很乐意使用Microsoft的反应式框架团队的"交互式扩展"(NuGet "Ix-Main"(,那么这是一个相当简单的解决方案:
var windows =
BufferData
.Scan(new List<Data>(), (accumulator, item) =>
accumulator
.Where(x => x.Time.AddSeconds(30.0) >= item.Time)
.Concat(new[] { item })
.ToList())
.Select(xs => new
{
Centre = xs.First().Time.AddSeconds(
xs.Last().Time.Subtract(xs.First().Time).TotalSeconds / 2.0),
Max = xs.Max(x => x.Value),
Mix = xs.Min(x => x.Value),
});
扫描运算符与标准.Aggregate
运算符一样使用 和累加器,但为每个输入生成一个值。
这应该与当前代码的结果相同。
它也只迭代原始源一次(尽管在 30 秒的窗口中有多次迭代(。
编辑 - 正如@PeterDuniho指出的那样,这不是真正的"滚动平均值"。它不会随着添加的每个新项目而重新计算。它只是每 30 秒(或任何WindowWidth
设置为(提供统计信息快照。我将暂时保留这个答案,以防它可能有用,但这并不是真正要求的。
我相信以下解决方案对于大型数据集应该可以很快执行(它应该是 O(n((。作为概念证明,我在 100 万个项目列表上运行了此测试,它在 LINQPad 4 中在 0.782 秒内完成(在绝对不是最先进的笔记本电脑上(。
public IEnumerable<Stats> CalculateStats(
List<Data> bufferData,
DateTime startTime,
TimeSpan windowWidth)
{
var finishTime = bufferData.Last().Time;
return bufferData
.Select(x => new
{
x.Value,
WindowIndex = GetWindowIndex(x.Time, startTime, windowWidth)
})
.GroupBy(
x => x.WindowIndex,
(i, items) => new Stats
{
StartTime = GetWindowTime(startTime, windowWidth, i),
FinishTime = GetWindowTime(startTime, windowWidth, i + 1),
Mean = (float)items.Average(x => x.Value),
Max = (float)items.Max(x => x.Value),
Min = (float)items.Min(x => x.Value)
});
}
private int GetWindowIndex(DateTime time, DateTime startTime, TimeSpan windowWidth)
{
var timeSinceStart = time - startTime;
var secondsSinceStart = timeSinceStart.TotalSeconds;
return (int)Math.Ceiling(secondsSinceStart / windowWidth.TotalSeconds);
}
private DateTime GetWindowTime(DateTime startTime, TimeSpan windowWidth, int windowIndex)
{
return startTime + TimeSpan.FromSeconds(windowWidth.TotalSeconds * windowIndex);
}
public class Stats
{
public DateTime StartTime { get; set; }
public DateTime FinishTime { get; set; }
public float Mean { get; set; }
public float Max { get; set; }
public float Min { get; set; }
}
public class Data
{
public float Value { get; set; }
public DateTime Time { get; set; }
}
使用队列,您可以最大限度地减少内存分配的数量:
static IEnumerable<TimeSample> TimeRollingWindow (IEnumerable<Data> data, TimeSpan interval)
{
Queue<Data> buffer = new Queue<Data>();
foreach(var item in data)
{
buffer.Enqueue(item);
// Remove the old data
while (buffer.Count > 0 && (item.Time - buffer.Peek().Time > interval))
{
buffer.Dequeue();
}
float max = float.MinValue;
float min = float.MaxValue;
double sum = 0;
foreach(var h in buffer)
{
sum += h.Value;
max = Math.Max(max, h.Value);
min = Math.Min(min, h.Value);
}
// spit it out
yield return new TimeSample(buffer.Peek().Time, item.Time, min, max, (float)(sum / buffer.Count));
}
}
class TimeSample
{
public TimeSample(DateTime startTime, DateTime endTime, float min, float max, float mean)
{
StartTime = startTime;
EndTime = endTime;
Min = min;
Max = max;
Mean = mean;
}
public readonly DateTime StartTime;
public readonly DateTime EndTime;
public readonly float Min;
public readonly float Max;
public readonly float Mean;
}
class Data
{
public Data(DateTime time, float value)
{
Time = time;
Value = value;
}
public readonly DateTime Time;
public readonly float Value;
}
不确定我是否完全理解所需的输出,但这是我的尝试。
// Some mock data...
var data = new List<Sample>
{
new Sample { Time = new DateTime(2016, 1, 1, 0, 1, 00), Value = 10 },
new Sample { Time = new DateTime(2016, 1, 1, 0, 2, 00), Value = 11 },
new Sample { Time = new DateTime(2016, 1, 1, 0, 2, 20), Value = 17 },
new Sample { Time = new DateTime(2016, 1, 1, 0, 2, 30), Value = 13 },
new Sample { Time = new DateTime(2016, 1, 1, 0, 3, 00), Value = 18 },
new Sample { Time = new DateTime(2016, 1, 1, 0, 3, 10), Value = 12 },
new Sample { Time = new DateTime(2016, 1, 1, 0, 4, 00), Value = 19 },
new Sample { Time = new DateTime(2016, 1, 1, 0, 4, 25), Value = 12 },
new Sample { Time = new DateTime(2016, 1, 1, 0, 4, 55), Value = 11 },
new Sample { Time = new DateTime(2016, 1, 1, 0, 5, 00), Value = 12 },
new Sample { Time = new DateTime(2016, 1, 1, 0, 6, 00), Value = 14 },
new Sample { Time = new DateTime(2016, 1, 1, 0, 8, 03), Value = 13 },
new Sample { Time = new DateTime(2016, 1, 1, 0, 8, 44), Value = 17 },
new Sample { Time = new DateTime(2016, 1, 1, 0, 9, 01), Value = 18 },
new Sample { Time = new DateTime(2016, 1, 1, 0, 10, 32), Value = 19 },
new Sample { Time = new DateTime(2016, 1, 1, 0, 10, 54), Value = 15 },
new Sample { Time = new DateTime(2016, 1, 1, 0, 11, 00), Value = 10 },
new Sample { Time = new DateTime(2016, 1, 1, 0, 11, 05), Value = 16 },
new Sample { Time = new DateTime(2016, 1, 1, 0, 11, 10), Value = 14 },
new Sample { Time = new DateTime(2016, 1, 1, 0, 11, 13), Value = 16 },
new Sample { Time = new DateTime(2016, 1, 1, 0, 11, 32), Value = 15 },
};
// The code...
var range = new TimeSpan(0, 0, 0, 30);
var results = data
.Select(sample => new
{
Time = sample.Time,
Set = data.Where(relatedSample => relatedSample.Time >= (sample.Time - range) && relatedSample.Time <= (sample.Time + range))
.Select(relatedSample => relatedSample.Value)
})
.Select(stat => new
{
Time = stat.Time,
Avg = stat.Set.Average(),
Min = stat.Set.Min(),
Max = stat.Set.Max(),
Count = stat.Set.Count()
});
这将返回一个枚举对象,其中包含每个样本,以及前后 30 秒的最小值、最大值、平均值和样本计数。这可能不是最有效的方法,但它非常简单。它将样本的"窗口"检索到临时列表中,然后对其执行统计信息。因此,至少它不会针对每个样本对整个列表运行超过一次。不过,它绝对可以加起来,每个窗口中都有很多示例。
看看 NuGet 上的 MoreLINQ 库。它将允许您通过编写累加器函数来计算运行总计和平均值。
任何"滚动窗口"聚合类型的诀窍都是简单地编写一个累加器函数,该函数将序列中的值保留在队列缓冲区中,只要它们保持在所需的窗口范围内。当序列元素不再满足范围条件时,将从缓冲区中取消排队,并且其值将从任何聚合或总计中删除(取消累积(。
在我进入任何代码之前,我需要发布一个免责声明,说明下面的所有内容都直接输入到回复窗口中,这意味着它甚至可能无法编译。总体概念是合理的,但这是我所能保证的。
这样,使用您的Data
类和BufferData
种子以及 MoreLINQ 中的 .Scan()
函数:
//First we need a type to hold the results:
class Result
{
double min;
double max;
DateTime first; //needed for centre
DateTime centre;
//Important because this is what really defines the window range:
// this sample and everything 30s prior (or as determined by the InWindow predicate)
DateTime last;
//for fun, because once we have the others, these are easy and fast to do at the same time
double sum;
int count;
double avg;
}
// we also want to define our window range
// For this example, the head of the queue is still part of the range if it's within 30 seconds of the current sample
Func<Data, Data, bool> InWindow = (head, cur) => (head.Time.AddSeconds(30) >= cur.Time);
// and a place to accumulate our buffer (hurray for closures!)
var accBuffer = new Queue<Data>();
// now get the data
IEnumerable<Data> BufferData = ...;
// let's get to it!
var results = BufferData.Scan(new Result() {min = double.MaxValue, max = double.MinValue},
(acc, data) => {
//Use flags to avoid iterating the queue if possible
bool minmaxValid = true;
while (accBuffer.Count > 0 && !InWindow(accBuffer.Peek(), data)
{
var old = accBuffer.Dequeue();
acc.sum -= old.Value;
acc.count--;
//once an old min or max falls out of the window, we'll have to re-check the entire window :(
if (old.Value == acc.min) minmaxValid = false;
if (old.Value == acc.max) minmaxValid = false;
}
accBuffer.Enqueue(data);
acc.count++;
acc.sum += data.Value;
acc.first = accBuffer.Peek().Time;
acc.last = data.Time;
acc.centre = acc.First.AddTicks( (new TimeSpan(data.Time- acc.first)).Ticks / 2);
if (minmaxValid && data.Value < acc.min) acc.min = data.Value;
if (minmaxValid && data.Value > acc.max) acc.max = data.Value;
// have to check the whole queue :(
if (!minValid || !maxValid)
{
acc.min = double.MaxValue;
acc.max = double.MinValue;
//could use accBuffer.Max() and accBuffer.Min, but this avoids iterating the queue twice
foreach (var d in accBuffer)
{
if (d.Value < acc.min) acc.min = d.Value;
if (d.Value > acc.max) acc.max = d.Value;
}
}
acc.avg = acc.sum / acc.count;
});
该解决方案的特别之处在于它非常高效。它不完全是O(n( - 但它非常接近!
达到 O(n( 目标的其余障碍是,当(并且仅当(最大值或最小值落出窗口时,需要迭代窗口队列。我认为不可能完全消除这种情况,但我觉得这里仍有改进的余地,如果你能找到一种方法来避免这种情况。根据您需要保留的元素数量和每个元素的相对大小,您实际上可以通过使用某种排序算法做得更好......但我对此表示怀疑。与总和、计数和平均值不同,在这种情况下,最小值和最大值很难有效。
最后,起初我不知道,但是由于@Enigmativity的回答,我现在看到我正在使用的Scan((运算符现在已合并到MS维护的库中。它几乎是一个直接的替代品,可以通过NuGet使用它而不是MoreLINQ。我在这里发布的代码根本没有真正改变,只需将正确的using
指令放入文件中即可。
嗯......当我检查这个时,无论如何,MoreLINQ 是最近更新的,所以也许没关系。
我找不到完全线性执行此操作的方法,但至少无论什么重申都只在窗口内,而不是在整个数据集上。此方法提供数据样本在 30 秒内的所有滚动(重叠(窗口。
您可以将其作为扩展方法,也可以仅作为常规方法。为了简单起见,我使用了扩展方法。
static IEnumerable<IEnumerable<T>> Windows<T>(this IEnumerable<T> self, Func<T, DateTime> selector, TimeSpan span) {
var enumerator = self.GetEnumerator();
var samples = new List<T>();
var start = DateTime.MinValue;
while (enumerator.MoveNext()) {
var end = selector(enumerator.Current);
if (end > start + span) {
start = end - span;
}
samples = samples.SkipWhile(i => selector(i) < start).ToList();
samples.Add(enumerator.Current);
yield return samples;
}
}
使用示例:在 30 秒内滚动平均值1。
var rollingAverages = BufferData
.Windows(d => d.Time, new TimeSpan(0, 0, 30))
.Select(win => win.Average());
这通过将当前元素视为窗口的结束来工作,因此窗口开始时小于给定的范围,但随着时间的推移增长到该最大值。
- 初始化空示例容器
- 对于数据集的每个项:
- 将窗口结束设置为当前项目的时间。
- 如果可能,请扩大窗口的时间范围。
- 从样品容器中删除过期的物品
- 将当前项添加到示例
- 产量样品容器
1:我的英语统计术语有点生疏,也许这是一个滚动平均值?