如何处理在前一个事件处理完成之前触发的实时事件(c#)

本文关键字:事件 实时 一个 处理 何处理 事件处理 | 更新日期: 2023-09-27 18:18:47

假设我们有一个实时事件的侦听器,该侦听器在事件被触发时执行一些代码块。

在我们的讨论中,假设我们有一个MyTime类,它有一个成员currentTime。

我们已经设置了它,以便每当计算机时钟改变时,currentTime被设置为当前时间的值。我们已经为currentTime对象实现了属性改变INotifyPropertyChanged接口:

public event PropertyChangedEventHandler PropertyChanged;

       public string currentTime
            {
                get { return _currentTime; }
                set { _currentTime= value; this.NotifyPropertyChanged("currentTime"); } 
             }

    public void NotifyPropertyChanged(object sender, PropertyChangedEventArgs e) {
    if (PropertyChanged != null)
                PropertyChanged(this, new PropertyChangedEventArgs(name));
    }

其他类,比如ProcessTime正在监听这个事件:

TimeChanged += new PropertyChangedEventHandler(PropertyChanged};

,它有一个函数将执行一些东西:

public void TimeChanged(object sender, PropertyChangedEventArgs e)
{
// Process lots of calculations
}

由于我们的计算机时钟一直在变化,它将继续触发事件。在我的理解中,一旦发生第一次更改,我们将执行TimeChanged块。当我们在执行的时候,我们会不断收到越来越多的通知,并尽可能快地处理它们,创建一个长长的等待处理的事件队列。

问题是,在我们处理了第一次时间变化并继续进行下一次时间变化之后,"实时"已经遥遥领先了,无论我们计算什么,我们都是在计算过去发生的事情。

我们想做的是忽略所有新事件,直到我们完成原始处理,然后才开始再次侦听事件。

设置多个线程不是一个选择,因为它不能解决问题,我们不想处理每次更改,只处理那些当我们的资源被释放时。

显然,我使用了时间更改和上面的代码作为演示示例,但它简明而充分地展示了我们在这里试图完成的内容。

我想使用某种缓冲器,但我在这方面的知识非常有限。由于

感谢到目前为止所有的答案。将开始实施它。将尝试记录成功/失败。

如何处理在前一个事件处理完成之前触发的实时事件(c#)

首先,所讨论的事件没有被异步调用。所以除非你在不断变化的线程上设置时间,否则设置时间的调用不会回来,直到所有的事件都处理了它,你才会再次设置它。如果你想避免这个问题,你需要将事件处理移到另一个线程。

最终,情况的复杂性和你想要的实时程度决定了这个问题的最终答案。但是,假设您想要的东西对于相对较少的线程(假设有十几个)来说是相当健壮的,那么我大概会这样做。

private var _Callbacks = new List<PropertyChangedEventHandler>();
public event PropertyChangedEventHandler PropertyChanged
{
    add
    {
        lock(_Callbacks)
            _Callbacks.Add(value);
        Thread Worker = new Thread(PollTime);
        Worker.Background = true;
        Worker.Start(value);
    }
    remove
    {
        lock(_Callbacks)
            _Callbacks.Remove(value);
    }
}
private void PollTime(object callback)
{
    PropertyChangedEventHandler c = (PropertyChangedEventHandler)callback;
    string LastReported = null;
    while(true)
    {
        lock(_Callbacks)
            if (!_Callbacks.Contains(c))
                return;
        if (LastReported != _currentTime)
        {
            LastReported = _currentTime;
            c(this, new PropertyChangedEventArgs(name));
        }
        else
            Thread.Sleep(10);
    }
}
public string currentTime
{
    get { return _currentTime; }
    set { _currentTime= value; } 
}

这样你就获得了事件的线程安全性(以防有人试图在不合适的时间订阅/取消订阅),并且每个订阅者都有自己的线程来处理回调。订阅者不会得到所有相同的事件,但是当时间改变时,他们都会得到通知。慢点的不会得到那么多事件因为它们会失去一些中间值。这不会通知如果时间重置没有变化,但我不认为这是一个很大的损失。如果值在有限的集合内交替,可能会出现问题,但随着时间的推移,这不是问题。

关于委托、事件及其工作方式的更多信息,请访问http://www.sellsbrothers.com/writing/delegates.htm

这将是我的方法。

  1. 不要让消费者阻塞生产者的事件线程。
  2. 创建一个轻量级的"临界区"(基本上是一个原子条件变量),以便在给定的时间内只能激活一次消费者处理程序的调用。

下面是实现该逻辑的完整示例。有一个EventProducer和一个EventConsumer。可以根据需要将它们配置为比彼此更快或更慢。事件生成器创建一个后台线程来引发事件。EventConsumer使用自定义CriticalSectionSlim类和简单的TryEnter/Exit模式来避免同时调用处理代码。它还使用。net 4.0 Task类的默认行为将处理代码发布到线程池。如果发生异常,则在下一次调用时从主处理程序线程重新抛出异常。

using System;
using System.Globalization;
using System.Threading;
using System.Threading.Tasks;
internal sealed class Program
{
    private static void Main(string[] args)
    {
        using (EventProducer producer = new EventProducer(TimeSpan.FromMilliseconds(250.0d)))
        using (EventConsumer consumer = new EventConsumer(producer, TimeSpan.FromSeconds(1.0d)))
        {
            Console.WriteLine("Press ENTER to stop.");
            Console.ReadLine();
        }
        Console.WriteLine("Done.");
    }
    private static class ConsoleLogger
    {
        public static void WriteLine(string message)
        {
            Console.WriteLine(
                "[{0}]({1}) {2}",
                DateTime.Now.ToString("hh:mm:ss.fff", CultureInfo.InvariantCulture),
                Thread.CurrentThread.ManagedThreadId,
                message);
        }
    }
    private sealed class EventConsumer : IDisposable
    {
        private readonly CriticalSectionSlim criticalSection;
        private readonly EventProducer producer;
        private readonly TimeSpan processingTime;
        private Task currentTask;
        public EventConsumer(EventProducer producer, TimeSpan processingTime)
        {
            if (producer == null)
            {
                throw new ArgumentNullException("producer");
            }
            if (processingTime < TimeSpan.Zero)
            {
                throw new ArgumentOutOfRangeException("processingTime");
            }
            this.processingTime = processingTime;
            this.criticalSection = new CriticalSectionSlim();
            this.producer = producer;
            this.producer.SomethingHappened += this.OnSomethingHappened;
        }
        public void Dispose()
        {
            this.Dispose(true);
            GC.SuppressFinalize(this);
        }
        private void Dispose(bool disposing)
        {
            if (disposing)
            {
                this.producer.SomethingHappened -= this.OnSomethingHappened;
            }
        }
        private void OnSomethingHappened(object sender, EventArgs e)
        {
            if (this.criticalSection.TryEnter())
            {
                try
                {
                    this.StartTask();
                }
                catch (Exception)
                {
                    this.criticalSection.Exit();
                    throw;
                }
            }
        }
        private void StartTask()
        {
            if (this.currentTask != null)
            {
                this.currentTask.Wait();
            }
            this.currentTask = Task.Factory.StartNew(this.OnSomethingHappenedTask);
        }
        private void OnSomethingHappenedTask()
        {
            try
            {
                this.OnSomethingHappenedImpl();
            }
            finally
            {
                this.criticalSection.Exit();
            }
        }
        private void OnSomethingHappenedImpl()
        {
            ConsoleLogger.WriteLine("BEGIN: Consumer processing.");
            Thread.CurrentThread.Join(this.processingTime);
            ConsoleLogger.WriteLine("END:   Consumer processing.");
        }
    }
    private sealed class EventProducer : IDisposable
    {
        private readonly TimeSpan timeBetweenEvents;
        private readonly Thread thread;
        private volatile bool shouldStop;
        public EventProducer(TimeSpan timeBetweenEvents)
        {
            if (timeBetweenEvents < TimeSpan.Zero)
            {
                throw new ArgumentOutOfRangeException("timeBetweenEvents");
            }
            this.timeBetweenEvents = timeBetweenEvents;
            this.thread = new Thread(this.Run);
            this.thread.Start();
        }
        public event EventHandler SomethingHappened;
        public void Dispose()
        {
            this.Dispose(true);
            GC.SuppressFinalize(this);
        }
        private void Dispose(bool disposing)
        {
            if (disposing)
            {
                this.shouldStop = true;
                this.thread.Join();
            }
        }
        private void Run()
        {
            while (!shouldStop)
            {
                this.RaiseEvent();
                Thread.CurrentThread.Join(this.timeBetweenEvents);
            }
        }
        private void RaiseEvent()
        {
            EventHandler handler = this.SomethingHappened;
            if (handler != null)
            {
                ConsoleLogger.WriteLine("Producer is raising event.");
                handler(this, EventArgs.Empty);
            }
        }
    }
    private sealed class CriticalSectionSlim
    {
        private int active;
        public CriticalSectionSlim()
        {
        }
        public bool TryEnter()
        {
            return Interlocked.CompareExchange(ref this.active, 1, 0) == 0;
        }
        public void Exit()
        {
            Interlocked.Exchange(ref this.active, 0);
        }
    }
}

你可以这样做

一旦你收到事件,注销事件,或者换句话说,停止监听事件。

完成事件处理后,通过重新注册到event

开始再次侦听事件
public void TimeChanged(object sender, PropertyChangedEventArgs e)
{
//un register
TimeChanged -= new PropertyChangedEventHandler(PropertyChanged};
// Process lots of calculations
//re-register
TimeChanged += new PropertyChangedEventHandler(PropertyChanged};
}

我建议将所有计划任务按进程时间(DateTime)排序放在队列中。time事件(tick)只需要检查队列头部的任务是否处于"pending"状态。这是如果它的处理时间已经达到或通过。然后,给定当前时间,将该任务从队列中删除并执行。

任务在完成时通过execute方法中给出的回调通知任务队列(这可能也占用了当前时间)。当任务正在执行时,任务队列将不会执行任何其他任务。当任务通知完成时,任务队列将立即检查队列头部的任务(如果有的话)是否挂起,等等。

现在,当你有时间顺序的任务队列时,可以在这里做一个很好的改进,当你在队列的头部执行任务时,你可以设置一个定时器来触发(或一次性改变的监听器),而不是周期性的滴答声,因为你总是知道下一个事件的时间。不需要多个监听器和一个控制器来决定有多少正在执行,等等。

interface ITask
{
    void Execute(ITaskCallBack callBack, DateTime currentTime);
}
interface ITaskCallBack
{
    void OnCompleted(ITask task); // The task parameter is needed for concurrency
}

每当一个任务被添加或删除时,下一个事件的时间将被更新。

重要:如果您添加的新任务碰巧想要与现有任务同时执行,则应将其添加在所有任务之后。这避免了子任务占用调度程序。

定时队列是您的任务调度程序/控制器。您可以使它单线程或多线程,因为你喜欢。尽管我不认为多线程有多大意义,除非你使用多个处理器。

interface ITaskScheduler
{
    void Add(ITask task, DateTime executeTime);
    void Remove(ITask);
}

这里的另一个好处是调度程序知道调度的时间和实际开始的时间。因此,您可以对延迟的任务或由于加载导致的延迟进行有价值的诊断。如果您的系统需要性能确定性,这一点很重要。

希望这是有意义和有用的。