将工作推送给观察者

本文关键字:观察者 工作 | 更新日期: 2023-09-27 18:34:37

我有一个侦听器,它以IPayload的形式接收工作。侦听器应该将这项工作推送给实际完成这项工作的观察者。这是我第一次粗略地尝试实现这一目标:

public interface IObserver
{
    void DoWork(IPayload payload);
}
public interface IObservable
{
    void RegisterObserver(IObserver observer);
    void RemoveObserver(IObserver observer);
    void NotifyObservers(IPayload payload);
}
public class Observer : IObserver
{
    public void DoWork(IPayload payload)
    {
        // do some heavy lifting
    }
}
public class Listener : IObservable
{
    private readonly List<IObserver> _observers = new List<IObserver>();
    public void PushIncomingPayLoad(IPayload payload)
    {
        NotifyObservers(payload);
    }
    public void RegisterObserver(IObserver observer)
    {
        _observers.Add(observer);
    }
    public void RemoveObserver(IObserver observer)
    {
        _observers.Remove(observer);
    }
    public void NotifyObservers(IPayload payload)
    {
        Parallel.ForEach(_observers, observer =>
        {
        observer.DoWork(payload);
        });
    }
}

这是遵循观察者/可观察模式的有效方法吗(即pub sub?我的理解是,NotifyObservers也会为每个有效载荷设置一个威胁。这是对的吗?非常欢迎任何改进建议。

请注意,所有观察者都必须在将有效载荷形式的新工作传递给他们之前完成他们的工作 - "观察"的顺序无关紧要。基本上,侦听器必须像主人一样行事,同时尽可能多地利用 TPL 利用主机的核心。恕我直言,这需要向侦听器/可观察者显式注册观察者。

附注:

我认为 Parallel.ForEach

不会为每个观察者创建一个线程:为什么 Parallel.ForEach 不运行多个线程?如果这是真的,我如何确保为每个观察者创建一个线程?

我想到的另一种选择是:

public async void NotifyObservers(IPayload payload)
{
    foreach (var observer in _observers)
    {
    var observer1 = observer;
    await Task.Run(() => observer1.DoWork(payload));
    }
    await Task.WhenAll();
}

将工作推送给观察者

当然你可以这样做,但在 .net 中,如果你不想重新发明轮子,那就不需要了:-(在 c# 中,这可以使用事件来完成。

一个简短的例子:

  //Your Listener who has a public eventhandler where others can add them as listeners
  public class Listener{
      //your eventhandler where others "add" them as listeners
      public event EventHandler<PayLoadEventsArgs> IncomingPayload;
      //a method where you process new data and want to notify the others
      public void PushIncomingPayLoad(IPayload payload)
      {
          //check if there are any listeners
          if(IncomingPayload != null)
              //if so, then notify them with the data in the PayloadEventArgs
              IncomingPayload(this, new PayloadEventArgs(payload));
      }
  }  
  //Your EventArgs class to hold the data
  public class PayloadEventArgs : EventArgs{
      Payload payload { get; private set; }  
      public PayloadEventArgs(Payload payload){
          this.payload = payload;
      }
  }
  public class Worker{
      //add this instance as a observer
      YourListenerInstance.IncomingPayload += DoWork;
      //remove this instance 
      YourListenerInstance.IncomingPayload -= DoWork;
      //This method gets called when the Listener notifies the  IncomingPayload listeners
      void DoWork(Object sender, PayloadEventArgs e){
          Console.WriteLine(e.payload);
      }
   }

编辑:当问题要求并行执行时,在订阅者端执行新线程怎么样?我认为这是实现这一目标的最简单方法。

//Inside the DoWork method of the subscriber start a new thread
Task.Factory.StartNew( () => 
{
      //Do your work here
});
//If you want to make sure that a new thread is used for the task, then add the TaskCreationOptions.LongRunning parameter
Task.Factory.StartNew( () => 
{
      //Do your work here
}, TaskCreationOptions.LongRunning);

希望这能回答你的问题?如果没有,请发表评论。