消费流的方法,转换,然后交给其他消费者(无状态)

本文关键字:消费者 其他 状态 然后 方法 转换 | 更新日期: 2023-09-27 18:28:54

Rx新增;试图弄清楚。

我通过事件从加速计中获取数据,将数据调整为我自己的格式,然后通过Reactive Extensions向其他消费者提供数据流。

我的方法部分是通过调用一个重要的构造函数来实现的。有人能看到这一点并帮助我理解我应该做什么不同的事情吗?

希望这个片段就足够了:

public class accel_raw_producer : IObservable<AccelerometerFrame_raw>
{
   private static Spatial spatial = null;
   private IObservable<EventPattern<SpatialDataEventArgs>> spatialEvents;
   public accel_raw_producer ()
   {
      spatial = new Spatial();
      spatial.close();
      spatialEvents = System.Reactive.Linq.Observable.FromEventPattern
         <SpatialDataEventHandler, SpatialDataEventArgs>(
         handler => handler.Invoke,
         h => spatial.SpatialData += h,
         h => spatial.SpatialData -= h);
      subscription = spatialEvents.Subscribe();
      spatial.open(-1);
   }

   public IDisposable Subscribe(IObserver<AccelerometerFrame_raw> observer)
   {
      return
         (from evt in spatialEvents
         let e = evt.EventArgs
         select new AccelerometerFrame_raw
         (
            e.spatialData[0].Acceleration[0],
            e.spatialData[0].Acceleration[1],
            e.spatialData[0].Acceleration[2],
            e.spatialData[0].AngularRate[0],
            e.spatialData[0].AngularRate[1],
            e.spatialData[0].AngularRate[2]
         )).Subscribe();
   }
}
public consumerClass :IObserver<AccelerometerFrame_raw>
{
   accel_raw_producer accelStream;
   IDisposable Unsubscriber;
   public consumerClass()
   {
      accelStream = new accel_raw_producer();
      Unsubscriber = accelStream.Subscribe(this);
   }
   public void OnCompleted()
   {
      throw new NotImplementedException();
   }
   public void OnError(Exception error)
   {
      throw new NotImplementedException();
   }
   public void OnNext(AccelerometerFrame_raw accelFrame)
   {
      if (null == accelFrame) return;
      AccelX = accelFrame.Acceleration.X;
      AccelY = accelFrame.Acceleration.Y;
      AccelZ = accelFrame.Acceleration.Z;
      GyroX = accelFrame.Rotation.X;
      GyroY = accelFrame.Rotation.Y;
      GyroZ = accelFrame.Rotation.Z;
   }    
}

当我在AcceleratometerFrame_raw中设置断点时,它会被触发,但值不会传播到使用者。因此,这种方法需要有所不同。

消费流的方法,转换,然后交给其他消费者(无状态)

我不确定这里是否遗漏了一些细节,但您似乎没有使用observer输入参数,订阅也没有真正对onNext值执行任何操作。就我个人而言,我会考虑将其转换为IOobservable,并让调用者自己订阅:

public IObservable<AccelerometerFrame_raw> AccelerometerFrames()
{
     return
        from evt in spatialEvents
        let e = evt.EventArgs
        select new AccelerometerFrame_raw
        (
           e.spatialData[0].Acceleration[0],
           e.spatialData[0].Acceleration[1],
           e.spatialData[0].Acceleration[2],
           e.spatialData[0].AngularRate[0],
           e.spatialData[0].AngularRate[1],
           e.spatialData[0].AngularRate[2]
        );
}