如何处理在前一个事件处理完成之前触发的实时事件(c#)
本文关键字:事件 实时 一个 处理 何处理 事件处理 | 更新日期: 2023-09-27 18:18:47
假设我们有一个实时事件的侦听器,该侦听器在事件被触发时执行一些代码块。
在我们的讨论中,假设我们有一个MyTime类,它有一个成员currentTime。
我们已经设置了它,以便每当计算机时钟改变时,currentTime被设置为当前时间的值。我们已经为currentTime对象实现了属性改变INotifyPropertyChanged接口:
public event PropertyChangedEventHandler PropertyChanged;
public string currentTime
{
get { return _currentTime; }
set { _currentTime= value; this.NotifyPropertyChanged("currentTime"); }
}
public void NotifyPropertyChanged(object sender, PropertyChangedEventArgs e) {
if (PropertyChanged != null)
PropertyChanged(this, new PropertyChangedEventArgs(name));
}
其他类,比如ProcessTime正在监听这个事件:
TimeChanged += new PropertyChangedEventHandler(PropertyChanged};
,它有一个函数将执行一些东西:
public void TimeChanged(object sender, PropertyChangedEventArgs e)
{
// Process lots of calculations
}
由于我们的计算机时钟一直在变化,它将继续触发事件。在我的理解中,一旦发生第一次更改,我们将执行TimeChanged块。当我们在执行的时候,我们会不断收到越来越多的通知,并尽可能快地处理它们,创建一个长长的等待处理的事件队列。
问题是,在我们处理了第一次时间变化并继续进行下一次时间变化之后,"实时"已经遥遥领先了,无论我们计算什么,我们都是在计算过去发生的事情。
我们想做的是忽略所有新事件,直到我们完成原始处理,然后才开始再次侦听事件。
设置多个线程不是一个选择,因为它不能解决问题,我们不想处理每次更改,只处理那些当我们的资源被释放时。
显然,我使用了时间更改和上面的代码作为演示示例,但它简明而充分地展示了我们在这里试图完成的内容。
我想使用某种缓冲器,但我在这方面的知识非常有限。由于
感谢到目前为止所有的答案。将开始实施它。将尝试记录成功/失败。
首先,所讨论的事件没有被异步调用。所以除非你在不断变化的线程上设置时间,否则设置时间的调用不会回来,直到所有的事件都处理了它,你才会再次设置它。如果你想避免这个问题,你需要将事件处理移到另一个线程。
最终,情况的复杂性和你想要的实时程度决定了这个问题的最终答案。但是,假设您想要的东西对于相对较少的线程(假设有十几个)来说是相当健壮的,那么我大概会这样做。
private var _Callbacks = new List<PropertyChangedEventHandler>();
public event PropertyChangedEventHandler PropertyChanged
{
add
{
lock(_Callbacks)
_Callbacks.Add(value);
Thread Worker = new Thread(PollTime);
Worker.Background = true;
Worker.Start(value);
}
remove
{
lock(_Callbacks)
_Callbacks.Remove(value);
}
}
private void PollTime(object callback)
{
PropertyChangedEventHandler c = (PropertyChangedEventHandler)callback;
string LastReported = null;
while(true)
{
lock(_Callbacks)
if (!_Callbacks.Contains(c))
return;
if (LastReported != _currentTime)
{
LastReported = _currentTime;
c(this, new PropertyChangedEventArgs(name));
}
else
Thread.Sleep(10);
}
}
public string currentTime
{
get { return _currentTime; }
set { _currentTime= value; }
}
这样你就获得了事件的线程安全性(以防有人试图在不合适的时间订阅/取消订阅),并且每个订阅者都有自己的线程来处理回调。订阅者不会得到所有相同的事件,但是当时间改变时,他们都会得到通知。慢点的不会得到那么多事件因为它们会失去一些中间值。这不会通知如果时间重置没有变化,但我不认为这是一个很大的损失。如果值在有限的集合内交替,可能会出现问题,但随着时间的推移,这不是问题。
关于委托、事件及其工作方式的更多信息,请访问http://www.sellsbrothers.com/writing/delegates.htm
这将是我的方法。
- 不要让消费者阻塞生产者的事件线程。
- 创建一个轻量级的"临界区"(基本上是一个原子条件变量),以便在给定的时间内只能激活一次消费者处理程序的调用。
下面是实现该逻辑的完整示例。有一个EventProducer
和一个EventConsumer
。可以根据需要将它们配置为比彼此更快或更慢。事件生成器创建一个后台线程来引发事件。EventConsumer
使用自定义CriticalSectionSlim
类和简单的TryEnter
/Exit
模式来避免同时调用处理代码。它还使用。net 4.0 Task
类的默认行为将处理代码发布到线程池。如果发生异常,则在下一次调用时从主处理程序线程重新抛出异常。
using System;
using System.Globalization;
using System.Threading;
using System.Threading.Tasks;
internal sealed class Program
{
private static void Main(string[] args)
{
using (EventProducer producer = new EventProducer(TimeSpan.FromMilliseconds(250.0d)))
using (EventConsumer consumer = new EventConsumer(producer, TimeSpan.FromSeconds(1.0d)))
{
Console.WriteLine("Press ENTER to stop.");
Console.ReadLine();
}
Console.WriteLine("Done.");
}
private static class ConsoleLogger
{
public static void WriteLine(string message)
{
Console.WriteLine(
"[{0}]({1}) {2}",
DateTime.Now.ToString("hh:mm:ss.fff", CultureInfo.InvariantCulture),
Thread.CurrentThread.ManagedThreadId,
message);
}
}
private sealed class EventConsumer : IDisposable
{
private readonly CriticalSectionSlim criticalSection;
private readonly EventProducer producer;
private readonly TimeSpan processingTime;
private Task currentTask;
public EventConsumer(EventProducer producer, TimeSpan processingTime)
{
if (producer == null)
{
throw new ArgumentNullException("producer");
}
if (processingTime < TimeSpan.Zero)
{
throw new ArgumentOutOfRangeException("processingTime");
}
this.processingTime = processingTime;
this.criticalSection = new CriticalSectionSlim();
this.producer = producer;
this.producer.SomethingHappened += this.OnSomethingHappened;
}
public void Dispose()
{
this.Dispose(true);
GC.SuppressFinalize(this);
}
private void Dispose(bool disposing)
{
if (disposing)
{
this.producer.SomethingHappened -= this.OnSomethingHappened;
}
}
private void OnSomethingHappened(object sender, EventArgs e)
{
if (this.criticalSection.TryEnter())
{
try
{
this.StartTask();
}
catch (Exception)
{
this.criticalSection.Exit();
throw;
}
}
}
private void StartTask()
{
if (this.currentTask != null)
{
this.currentTask.Wait();
}
this.currentTask = Task.Factory.StartNew(this.OnSomethingHappenedTask);
}
private void OnSomethingHappenedTask()
{
try
{
this.OnSomethingHappenedImpl();
}
finally
{
this.criticalSection.Exit();
}
}
private void OnSomethingHappenedImpl()
{
ConsoleLogger.WriteLine("BEGIN: Consumer processing.");
Thread.CurrentThread.Join(this.processingTime);
ConsoleLogger.WriteLine("END: Consumer processing.");
}
}
private sealed class EventProducer : IDisposable
{
private readonly TimeSpan timeBetweenEvents;
private readonly Thread thread;
private volatile bool shouldStop;
public EventProducer(TimeSpan timeBetweenEvents)
{
if (timeBetweenEvents < TimeSpan.Zero)
{
throw new ArgumentOutOfRangeException("timeBetweenEvents");
}
this.timeBetweenEvents = timeBetweenEvents;
this.thread = new Thread(this.Run);
this.thread.Start();
}
public event EventHandler SomethingHappened;
public void Dispose()
{
this.Dispose(true);
GC.SuppressFinalize(this);
}
private void Dispose(bool disposing)
{
if (disposing)
{
this.shouldStop = true;
this.thread.Join();
}
}
private void Run()
{
while (!shouldStop)
{
this.RaiseEvent();
Thread.CurrentThread.Join(this.timeBetweenEvents);
}
}
private void RaiseEvent()
{
EventHandler handler = this.SomethingHappened;
if (handler != null)
{
ConsoleLogger.WriteLine("Producer is raising event.");
handler(this, EventArgs.Empty);
}
}
}
private sealed class CriticalSectionSlim
{
private int active;
public CriticalSectionSlim()
{
}
public bool TryEnter()
{
return Interlocked.CompareExchange(ref this.active, 1, 0) == 0;
}
public void Exit()
{
Interlocked.Exchange(ref this.active, 0);
}
}
}
你可以这样做
一旦你收到事件,注销事件,或者换句话说,停止监听事件。
完成事件处理后,通过重新注册到event
开始再次侦听事件public void TimeChanged(object sender, PropertyChangedEventArgs e)
{
//un register
TimeChanged -= new PropertyChangedEventHandler(PropertyChanged};
// Process lots of calculations
//re-register
TimeChanged += new PropertyChangedEventHandler(PropertyChanged};
}
我建议将所有计划任务按进程时间(DateTime)排序放在队列中。time事件(tick)只需要检查队列头部的任务是否处于"pending"状态。这是如果它的处理时间已经达到或通过。然后,给定当前时间,将该任务从队列中删除并执行。
任务在完成时通过execute方法中给出的回调通知任务队列(这可能也占用了当前时间)。当任务正在执行时,任务队列将不会执行任何其他任务。当任务通知完成时,任务队列将立即检查队列头部的任务(如果有的话)是否挂起,等等。
现在,当你有时间顺序的任务队列时,可以在这里做一个很好的改进,当你在队列的头部执行任务时,你可以设置一个定时器来触发(或一次性改变的监听器),而不是周期性的滴答声,因为你总是知道下一个事件的时间。不需要多个监听器和一个控制器来决定有多少正在执行,等等。
interface ITask
{
void Execute(ITaskCallBack callBack, DateTime currentTime);
}
interface ITaskCallBack
{
void OnCompleted(ITask task); // The task parameter is needed for concurrency
}
每当一个任务被添加或删除时,下一个事件的时间将被更新。
重要:如果您添加的新任务碰巧想要与现有任务同时执行,则应将其添加在所有任务之后。这避免了子任务占用调度程序。
定时队列是您的任务调度程序/控制器。您可以使它单线程或多线程,因为你喜欢。尽管我不认为多线程有多大意义,除非你使用多个处理器。
interface ITaskScheduler
{
void Add(ITask task, DateTime executeTime);
void Remove(ITask);
}
这里的另一个好处是调度程序知道调度的时间和实际开始的时间。因此,您可以对延迟的任务或由于加载导致的延迟进行有价值的诊断。如果您的系统需要性能确定性,这一点很重要。
希望这是有意义和有用的。