带有计时器的观察者模式

本文关键字:观察者模式 计时器 | 更新日期: 2023-09-27 17:50:09

我在我的应用程序中使用了Observer Pattern

我有一个主题,它有一个System.Timers。定时器对象在其中命名为'tmr'。这个计时器的tick事件在每60秒后触发。在这个tick事件中,我将通知所有附属于我的主题的观察者。我使用了for循环来迭代我的观察员列表&然后触发观察者更新方法。

假设我有10个观察者附在我的主题上。

每个观察者需要10秒来完成它的处理。

现在在for循环中执行通知会导致最后一个Observer's Update方法在90秒后被调用。也就是说,下一个观察者更新方法只有在前一个观察者完成它的处理之后才会被调用。

但这不是我想在我的应用程序。我需要所有观察者的Update方法在计时器发生时立即被触发。这样观察者就不用等待了。我希望这可以通过Threading来完成。

因此,我将代码修改为
// Fires the updates instantly
    public void Notify()
    {
      foreach (Observer o in _observers)
      {
        Threading.Thread oThread = new Threading.Thread(o.Update);
        oThread.Name = o.GetType().Name;
        oThread.Start();
      }
    }

但我心里有两个疑问,

  1. 如果有10个观察者我的计时器间隔是60秒然后,语句new Thread()将触发600次。

  2. 是否建议在每个计时器上创建新线程?
  3. 如果我的观察者花了太多时间来完成他们的更新逻辑,即超过60秒。意味着计时器在观察者被更新之前发生。

我可以张贴示例代码。如果需要……

我使用的代码…

using System;
using System.Collections.Generic;
using System.Timers;
using System.Text;
using Threading = System.Threading;
using System.ComponentModel;
namespace singletimers
{
  class Program
  {

    static void Main(string[] args)
    {
      DataPullerSubject.Instance.Attach(Observer1.Instance);
      DataPullerSubject.Instance.Attach(Observer2.Instance);
      Console.ReadKey();
    }
  }
  public sealed class DataPullerSubject
  {
    private static volatile DataPullerSubject instance;
    private static object syncRoot = new Object();
    public static DataPullerSubject Instance
    {
      get
      {
        if (instance == null)
        {
          lock (syncRoot)
          {
            if (instance == null)
              instance = new DataPullerSubject();
          }
        }
        return instance;
      }
    }
    int interval = 10 * 1000;
    Timer tmr;
    private List<Observer> _observers = new List<Observer>();
    DataPullerSubject()
    {
      tmr = new Timer();
      tmr.Interval = 1; // first time to call instantly
      tmr.Elapsed += new ElapsedEventHandler(tmr_Elapsed);
      tmr.Start();
    }
    public void Attach(Observer observer)
    {
      _observers.Add(observer);
    }
    public void Detach(Observer observer)
    {
      _observers.Remove(observer);
    }
    // Fires the updates instantly
    public void Notify()
    {
      foreach (Observer o in _observers)
      {
        Threading.Thread oThread = new Threading.Thread(o.Update);
        oThread.Name = o.GetType().Name;
        oThread.Start();
      }
    }
    private void tmr_Elapsed(object source, ElapsedEventArgs e)
    {
      tmr.Interval = interval;
      tmr.Stop(); // stop the timer until all notification triggered
      this.Notify();
      tmr.Start();//start again
    }
  }

  public abstract class Observer
  {
    string data;
    public abstract void Update();
    public virtual void GetDataFromDBAndSetToDataSet(string param)
    {
      Console.WriteLine("Processing for: " + param);
      data = param + new Random().Next(1, 2000);
      Threading.Thread.Sleep(10 * 1000);//long work
      Console.WriteLine("Data set for: " + param);
    }
  }

  public sealed class Observer1 : Observer
  {
    private static volatile Observer1 instance;
    private static object syncRoot = new Object();
    public static Observer1 Instance
    {
      get
      {
        if (instance == null)
        {
          lock (syncRoot)
          {
            if (instance == null)
              instance = new Observer1();
          }
        }
        return instance;
      }
    }
    Observer1()
    {
    }
    public override void Update()
    {
      base.GetDataFromDBAndSetToDataSet("Observer1");
    }
  }
  public sealed class Observer2 : Observer
  {
    private static volatile Observer2 instance;
    private static object syncRoot = new Object();
    public static Observer2 Instance
    {
      get
      {
        if (instance == null)
        {
          lock (syncRoot)
          {
            if (instance == null)
              instance = new Observer2();
          }
        }
        return instance;
      }
    }
    Observer2()
    {
    }
    public override void Update()
    {
      base.GetDataFromDBAndSetToDataSet("Observer2");
    }
  }
}

谢谢,亲切的问候。

带有计时器的观察者模式

  • 不鼓励使用new Thread。使用TaskTask<T>
  • 你创建一个可观察模式框架的最好尝试可能只会接近Rx。使用能解决你提到的问题的方法(例如,如果处理需要花费太多时间)。Rx将给你很大的灵活性来定义你的可观察场景

1)您可以通过ThreadPool使用ThreadPool中的线程。QueueUserWorkItem或者你可以使用Tasks

2)你必须同步你的方法

或者,观察者可以以非阻塞的方式实现Update。也就是说,Update总是立即返回。如果有必要,Observer对象有责任在一个新线程中执行它们的工作。

我不确定这在你的场景中是否有帮助——我不知道你的"观察者"是什么,但也许你也不知道?