产生多个IEnumerables

本文关键字:IEnumerables | 更新日期: 2023-09-27 18:04:50

我有一段计算资产的代码。有数百万个这样的数据流,所以我想计算流中的所有数据。我当前的"管道"是这样的:

我有一个作为数据阅读器执行的查询。

然后我的Asset类有一个接受IDataReader的构造函数;

Public Asset(IdataReader rdr){
  // logic that initiates fields
}

和一个将IDataReader转换为IEnumerable

的方法
public static IEnumerable<Asset> ToAssets(IDataReader rdr) {
    // make sure the reader is in the right formt
    CheckReaderFormat(rdr);
    // project reader into IEnumeable<Asset>
    while (rdr.Read()) yield return new Asset(rdr);
}

然后传递给一个函数进行实际计算然后将其投影到IEnumerable

然后获得一个包装器,将答案暴露为IDataReader,然后将其传递给OracleBulkCopy并将流写入DB。

到目前为止,它像一个魅力。由于设置,我可以将DataReader交换为从文件读取的IEnumerable,或者将结果写入文件等。这一切都取决于我如何将类/函数串在一起。

现在:有几件事我可以计算,例如,除了正常的答案,我可以有一个DebugAnswer类,也输出一些中间数字调试。所以我想做的是将IEnumerable投射到几个输出流中,这样我就可以在这些流上放置"监听器"。这样我就不用反复查看数据了。我该怎么做呢?有点像有几个事件,然后只触发特定的代码,如果有一个附加的监听器。

有时我也写入DB,但也写入zipfile只是为了保留结果的备份。然后我想在IEnumerable上有两个'listener '。一个项目是作为IDataReader,另一个直接写入文件。

如何输出多个输出流以及如何在一个输出流上放置多个侦听器?是什么让我写出这样的数据流?

编辑

一些我想要做的伪代码:

foreach(Asset in Assets){
   if(DebugListener != null){
     // compute 
     DebugAnswer da = new DebugAnswer {result = 100};
     yield da to DebugListener;  // so instead of yield return yield to that stream
   }
   if(AnswerListener != null){
     // compute basic stuff 
     Answer a = new Answer { bla = 200 };
     yield a to AnswerListener;
   }
}

提前感谢,

Gert-Jan

产生多个IEnumerables

你所描述的听起来有点像响应式框架通过IObservable接口提供的东西,但我不确定它是否允许多个订阅者到单个订阅流。

更新

如果你看一下IObservable的文档,它有一个很好的例子,如何做你正在做的事情,多个订阅者到一个对象。

您的示例使用Rx重写:

// The stream of assets
IObservable<Asset> assets = ...
// The stream of each asset projected to a DebugAnswer
IObservable<DebugAnswer> debugAnswers = from asset in assets
                                        select new DebugAnswer { result = 100 };
// Subscribe the DebugListener to receive the debugAnswers
debugAnswers.Subscribe(DebugListener);
// The stream of each asset projected to an Anwer
IObservable<Answer> answers = from asset in assets
                              select new Answer { bla = 200 };
// Subscribe the AnswerListener to receive the answers
answers.Subscribe(AnswerListener);

这正是响应式扩展的工作(从4.0开始成为。net的一部分,在3.5中作为库提供)。

你不需要多个"监听器",你只需要不具有破坏性甚至不一定可转换的管道组件。

IEnumerable<T> PassThroughEnumerable<T>(IEnumerable<T> source, Action<T> action) {
    foreach (T t in source) {
       Action(t);
       yield return t;
    }    
}

或者,当您在管道中进行处理时,只引发一些要使用的事件。如果你愿意,你可以同步它们:

static IEnumerable<Asset> ToAssets(IDataReader rdr) {
   CheckReaderFormat(rdr);
   var h = this.DebugAsset;
   while (rdr.Read()) {
      var a = new Asset(rdr);
      if (h != null) h(a);
      yield return a;
   }
}
public event EventHandler<Asset> DebugAsset;

如果我没看错的话,应该可以替换或装饰包装器。WrapperDecorator可能会将呼叫转发给正常的OracleBulkCopy(或任何您正在使用的)并添加一些自定义调试代码。

这对你有帮助吗?

马提亚