在合并临时关闭事件时提高效率和公平性
本文关键字:提高效率 公平性 事件 合并 | 更新日期: 2023-09-27 18:00:13
我有一堆线程可以生成类型为 A
和
B
的事件。我的程序接收这些事件,将它们包装在消息中,然后通过网络发送它们。消息 可以保存一个A
事件、一个B
事件或一个A
事件和一个B
事件:
SendMessage(new Message(a: 1, b: null));
SendMessage(new Message(a: null, b: 2 ));
SendMessage(new Message(a: 3, b: 4 ));
A
类型的事件发生得非常频繁,而B
类型的事件发生的频率要低得多。因此,当一个线程生成一个B
事件时,我的程序会稍等片刻,看看另一个线程是否生成了一个A
事件,并在可能的情况下组合A
事件和B
事件。
这是我的代码:
object gate = new object();
int? pendingB;
Message WrapA(int a, int millisecondsTimeout)
{
int? b;
lock (gate)
{
b = pendingB;
pendingB = null;
Monitor.Pulse(gate);
}
return new Message(a, b);
}
Message WrapB(int b, int millisecondsTimeout)
{
lock (gate)
{
if (pendingB == null)
{
pendingB = b;
Monitor.Wait(gate, millisecondsTimeout);
if (pendingB != b) return null;
pendingB = null;
}
}
return new Message(null, b);
}
到目前为止,这有效。但是,存在两个问题:
如果有很多
A
事件和大量的B
事件,则算法不是很有效:即使有足够的A
事件,也只有一定比例的B
事件附加到A
事件。如果一段时间内没有生成
A
事件(不常见,但并非不可能(,则该算法是完全不公平的:生成B
事件的线程每次都必须等待,而所有其他线程可以立即发送其B
事件。
如何提高算法的效率和公平性?
约束:
&项目符号; WrapA
和WrapB
必须在较短的确定性时间内终止。
&项目符号; 必须在任何锁外部调用SendMessage
。
• 除了 gate
之外,没有可用的同步机制。
• 没有其他线程、任务、计时器等可用。
• 由于 A
类型的事件在正常情况下发生得如此频繁,因此在WrapB
中忙于等待是可以的。
这是一个可以用作基准的测试程序:
public static class Program
{
static int counter0 = 0;
static int counterA = 0;
static int counterB = 0;
static int counterAB = 0;
static void SendMessage(Message m)
{
if (m != null)
if (m.a != null)
if (m.b != null)
Interlocked.Increment(ref counterAB);
else
Interlocked.Increment(ref counterA);
else
if (m.b != null)
Interlocked.Increment(ref counterB);
else
Interlocked.Increment(ref counter0);
}
static Thread[] Start(int threadCount, int eventCount,
int eventInterval, int wrapTimeout, Func<int, int, Message> wrap)
{
Thread[] threads = new Thread[threadCount * eventCount];
for (int i = 0; i < threadCount; i++)
{
for (int j = 0; j < eventCount; j++)
{
int k = i * 1000 + j;
int l = j * eventInterval + i;
threads[i * eventCount + j] = new Thread(() =>
{
Thread.Sleep(l);
SendMessage(wrap(k, wrapTimeout));
});
threads[i * eventCount + j].Start();
}
}
return threads;
}
static void Join(params Thread[] threads)
{
for (int i = 0; i < threads.Length; i++)
{
threads[i].Join();
}
}
public static void Main(string[] args)
{
var wrapper = new MessageWrapper();
var sw = Stopwatch.StartNew();
// Only A events
var t0 = Start(10, 40, 7, 1000, wrapper.WrapA);
Join(t0);
// A and B events
var t1 = Start(10, 40, 7, 1000, wrapper.WrapA);
var t2 = Start(10, 10, 19, 1000, wrapper.WrapB);
Join(t1);
Join(t2);
// Only B events
var t3 = Start(10, 20, 7, 1000, wrapper.WrapB);
Join(t3);
Console.WriteLine(sw.Elapsed);
Console.WriteLine("0: {0}", counter0);
Console.WriteLine("A: {0}", counterA);
Console.WriteLine("B: {0}", counterB);
Console.WriteLine("AB: {0}", counterAB);
Console.WriteLine("Generated A: {0}, Sent A: {1}",
10 * 40 + 10 * 40, counterA + counterAB);
Console.WriteLine("Generated B: {0}, Sent B: {1}",
10 * 10 + 10 * 20, counterB + counterAB);
}
}
为了好玩,这里有一个无锁实现:
public sealed class MessageWrapper
{
private int pendingB;
public Message WrapA(int a, int millisecondsTimeout)
{
int b = Interlocked.Exchange(ref pendingB, -1);
return new Message(a, b == -1 ? null : b);
}
public Message WrapB(int b, int millisecondsTimeout)
{
var sw = new SpinWait();
while (Interlocked.CompareExchange(ref pendingB, b, -1) != -1)
{
// Spin
sw.SpinOnce();
if (sw.NextSpinWillYield)
{
// Let us make progress instead of yielding the processor
// (avoid context switch)
return new Message(null, b);
}
}
return null;
}
}
结果
原始实现:
00:00:02.0433298
0: 0
A: 733
B: 233
AB: 67
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300
无锁实现:
00:00:01.2546310
0: 0
A: 717
B: 217
AB: 83
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300
更新
不幸的是,上面的实现有一个错误和一些缺点。这是一个改进的版本:
public class MessageWrapper
{
private int pendingB = EMPTY;
private const int EMPTY = -1;
public Message WrapA(int a, int millisecondsTimeout)
{
int? b;
int count = 0;
while ((b = Interlocked.Exchange(ref pendingB, EMPTY)) == EMPTY)
{
if (count % 7 == 0)
{
Thread.Sleep(0);
}
else if (count % 23 == 0)
{
Thread.Sleep(1);
}
else
{
Thread.Yield();
}
if (++count == 480)
{
return new Message(a, null);
}
}
return new Message(a, b);
}
public Message WrapB(int b, int millisecondsTimeout)
{
int count = 0;
while (Interlocked.CompareExchange(ref pendingB, b, EMPTY) != EMPTY)
{
// Spin
Thread.SpinWait((4 << count++));
if (count > 10)
{
// We didn't manage to place our payload.
// Let's send it ourselves:
return new Message(null, b);
}
}
// We placed our payload.
// Wait some more to see if some WrapA snatches it.
while (Interlocked.CompareExchange(ref pendingB, EMPTY, EMPTY) == b)
{
Thread.SpinWait((4 << count++));
if (count > 20)
{
// No WrapA came along. Pity, we will have to send it ourselves
int payload = Interlocked.CompareExchange(ref pendingB, EMPTY, b);
return payload == b ? new Message(null, b) : null;
}
}
return null;
}
}
结果:
OP的实现
00:00:02.1389474
0: 0
A: 722
B: 222
AB: 78
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300
第二次实现:
00:00:01.2752425
0: 0
A: 700
B: 200
AB: 100
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300
为了多样化,我尝试了一种基于并发集合的方法。对我来说,从发布的约束中不清楚这是否可以,但无论如何我都会拍摄我的答案:
这是我机器上原始代码的典型输出:
00:00:01.7835426
0: 0
A: 723
B: 223
AB: 77
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300
这是我建议的典型输出,比原始代码慢约 20%,但它捕获了更多的"AB"消息:
00:00:02.1322512
0: 0
A: 701
B: 201
AB: 99
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300
消息包装器实现:
public class MessageWrapper
{
private BlockingCollection<int?> messageA = new BlockingCollection<int?>();
private BlockingCollection<int?> messageB = new BlockingCollection<int?>();
public Message WrapA(int a, int millisecondsTimeout)
{
messageA.Add(a);
return CreateMessage(0);
}
public Message WrapB(int b, int millisecondsTimeout)
{
messageB.Add(b);
return CreateMessage(millisecondsTimeout);
}
private Message CreateMessage(int timeout)
{
int? a, b;
if (messageB.TryTake(out b) | messageA.TryTake(out a, timeout))
{
return new Message(a, b);
}
else
{
return null;
}
}
}
似乎是 Reactive Extesions 的完美候选者。可以使用 Buffer 方法对事件进行分组,或使用其他类似的扩展来筛选和合并事件。
也许这个解决方案不符合你的约束之一,但在我看来,这是最好的解决方案。反应式扩展非常强大。
我将给出另一个建议,该建议更严格地遵循给定的约束;在我的机器上,此实现在运行测试程序时始终捕获 97 条或更多"AB"消息,与原始代码相比性能下降约 5%:
class MessageWrapper
{
object gate = new object();
int? pendingB;
public Message WrapA(int a, int millisecondsTimeout)
{
Message returnMessage = null;
bool lockTaken = false;
Monitor.TryEnter(gate, 100, ref lockTaken);
if (lockTaken)
{
returnMessage = new Message(a, pendingB);
pendingB = null;
Monitor.Pulse(gate);
Monitor.Exit(gate);
}
else
{
returnMessage = new Message(a, null);
}
return returnMessage;
}
public Message WrapB(int b, int millisecondsTimeout)
{
Message returnMessage = null;
bool lockTaken = false;
Monitor.TryEnter(gate, 100, ref lockTaken);
if (lockTaken)
{
if (pendingB != null)
{
Monitor.Wait(gate, 100);
}
if (pendingB != null)
{
returnMessage = new Message(null, b);
}
else
{
pendingB = b;
if (!Monitor.Wait(gate, millisecondsTimeout))
{
pendingB = null;
Monitor.Pulse(gate);
returnMessage = new Message(null, b);
}
}
Monitor.Exit(gate);
}
else
{
returnMessage = new Message(null, b);
}
return returnMessage;
}
}
这里发生的事情与原始代码中基本相同,但我们也在等待已经有一个 pendingB 对象,而不仅仅是返回"B"消息。这提高了我们可以找到的"AB"消息的数量,而性能成本很小。
它看起来有点混乱,但这主要是因为我选择使用更实时友好的构造Monitor.TryTake而不是原始锁。此外,使用单个返回语句是一个巧妙的技巧,可以避免在调用 Monitor.Exit 之前意外返回死锁。
摆弄各种超时可以以牺牲准确性为代价来提高性能,反之亦然。 100 毫秒是我最初的猜测,至少在我的机器上看起来不错。
最后一点,在这个 WrapB 的实现中,我们可以更改行
if (pendingB != null)
{
Monitor.Wait(gate, 100);
}
自
while (pendingB != null)
{
Monitor.Wait(gate, 100);
}
以获得 100% 的准确性,但它严重弄乱了测试程序中的指标,因为它同步了"B"事件,当只有"B"消息流时,这些事件显然表现非常差。
如果我删除 t3 测试,它的运行速度比原始代码快约 5%,同时始终从 100 条"AB"消息中找到 100 条。但是运行时当然不再是确定性的,因为我们无法知道我们将绕循环旋转多少次。
编辑:
好吧,除非我们做类似的事情
int spinCount = 0;
while (pendingB != null && spinCount < 5)
{
spinCount++;
Monitor.Wait(gate, 100);
}
这将为我们提供等待时间的上限。当我们只有"B"消息流时,它确实解决了性能问题,并且运行时间与原始代码大致相同,同时始终从 100 条"AB"消息中找到 100 条。
好的,所以我尝试创建一个快速 A 和 AB,然后创建一个慢速 B。这意味着我的总体时间较慢(主要是因为仅 b 流(,但合并时间和仅 a 时间更快。以下是结果:
A's only: 00:00:00.3975499
Combine: 00:00:00.4234934
B's only: 00:00:02.0079422
Total: 00:00:02.8314751
0: 0
A: 700
B: 200
AB: 100
Generated A: 800, Sent A: 800
Generated B: 300, Sent B: 300
代码如下:
class MessageWrapper
{
object bMessageLock = new object();
object pendingBLock = new object();
int? pendingB;
ManualResetEvent gateOpen = new ManualResetEvent(true); // Gate is open initially.
private bool IsGateOpen()
{
return gateOpen.WaitOne(0);
}
private void OpenGate()
{
gateOpen.Set();
}
private void CloseGate()
{
gateOpen.Reset();
}
public Message WrapA(int a, int millisecondsTimeout)
{
// check if the gate is open. Use WaitOne(0) to return immediately.
if (IsGateOpen())
{
return new Message(a, null);
}
else
{
// This extra lock is to make sure that we don't get stale b's.
lock (pendingBLock)
{
// and reopen the gate.
OpenGate();
// there is a waiting b
// Send combined message
var message = new Message(a, pendingB);
pendingB = null;
return message;
}
}
}
public Message WrapB(int b, int millisecondsTimeout)
{
// Remove this if you don't have overlapping B's
var timespentInLock = Stopwatch.StartNew();
lock (bMessageLock) // Only one B message can be sent at a time.... may need to fix this.
{
pendingB = b;
// Close gate
CloseGate();
// Wait for the gate to be opened again (meaning that the message has been sent)
if (timespentInLock.ElapsedMilliseconds < millisecondsTimeout &&
gateOpen.WaitOne(millisecondsTimeout - (int)timespentInLock.ElapsedMilliseconds))
// If you don't have overlapping b's use this clause instead.
//if (gateOpen.WaitOne(millisecondsTimeout))
{
lock (pendingBLock)
{
// Gate was opened, so combined message was sent.
return null;
}
}
else
{
// Timeout expired, so send b-only message.
lock (pendingBLock)
{
// reopen gate.
OpenGate();
pendingB = null;
return new Message(null, b);
}
}
}
}
}
主要工作是使用手动重置事件完成的。这个想法是,如果门是打开的,那么你可以自由发送A。当"b"到达时,你关闭门并强制A组合它。我必须说,具有单个pendingB
字段会在一定程度上限制此操作。只有一个变量意味着只有一个线程可以将它的 b 存储在挂起的 B 中。这就是为什么我有额外的bMessageLock
.
此外,需要控制对此变量的访问,因此pendingBLock
.
这段代码中可能仍然存在错误,但尽管我对其进行了测试,但我仍然会得到所有 100 条消息的组合。
最后,我包括了对 WrapB 等待时间的检查。最初,WrapB 的排队总共需要 200 秒。如果您有重叠的调用,则可以添加检查。如果您不介意他们排队,请改用更简单的代码。
好吧,我的第一个想法是拥有一个也处理优先级的信号灯,但也许这篇文章会给你更多的见解.Net 互斥问题
基本上,ideea 是有某种方法来确定 2 种类型的事件的优先级,以便在没有收到 A
类型的事件时,B
类型的事件可以尽可能快地运行。
我意识到这可能不是适合您的解决方案,因为您的第三个约束是除了 Gate 之外没有可用的同步机制,但我希望我可以为您指出正确的方向。
这是一种提高公平性的方法的草图 - 这意味着所有B
发送都会受到长达 100 毫秒的延迟。但是我不知道它是否符合您的约束。
- 在全局上下文中,具有类型为
IMessageSender
的单个MessageSender
对象 IMessageSender
有两种实现,即DefaultMessageSender
和BWrappingMessageSender
(存储b
值(
消息发送者的行为如下所示:
- 被要求发送
A
DefaultMessageSender
:只需发送即可 -
DefaultMessageSender
被要求发送B
:将全局MessageSender
切换为知道刚刚传递的值的新BWrappingMessageSender
b
-
BWrappingMessageSender
被要求发送A
:发送一个带有传递a
及其自己的b
的 AB,并将全局MessageSender
切换为DefaultMessageSender
-
BWrappingMessageSender
被要求发送B
:发送一个带有自己b
的B
,并将全局MessageSender
切换为知道刚刚传递的值的新BWrappingMessageSender
b
我还没有确定的是,新创建的BWrappingMessageSender
知道在创建后 100 毫秒内发送普通B
,如果当时没有被告知要做任何其他事情。
这是我经过一些实验后的解决方案:
- 如果单元素队列为空,我们占据位置。
- 如果位置已经被占用,我们会礼貌地推动乘员继续前进,稍等片刻,然后再试一次。
- 如果有人粗鲁并在我们等待时劫持了现场,我们会跳过队列并继续前进。
法典:
Message WrapA(int a, int millisecondsTimeout)
{
bool lockTaken = false;
int? b = null;
try
{
Monitor.TryEnter(gate, millisecondsTimeout, ref lockTaken);
if (lockTaken)
{
if (pendingB != null)
{
b = pendingB;
pendingB = null;
Monitor.Pulse(gate);
}
}
}
finally
{
if (lockTaken)
{
Monitor.Exit(gate);
}
}
return new Message(a, b);
}
Message WrapB(int b, int millisecondsTimeout)
{
bool lockTaken = false;
try
{
TimeoutHelper timeout = new TimeoutHelper(millisecondsTimeout);
Monitor.TryEnter(gate, timeout.RemainingTime(), ref lockTaken);
if (lockTaken)
{
if (pendingB == null)
{
pendingB = b;
Monitor.Wait(gate, timeout.RemainingTime());
if (pendingB == null) return null;
pendingB = null;
}
else
{
Monitor.Pulse(gate);
try { }
finally { lockTaken = false; Monitor.Exit(gate); }
Thread.Sleep(1);
Monitor.TryEnter(gate, timeout.RemainingTime(), ref lockTaken);
if (lockTaken)
{
if (pendingB == null)
{
pendingB = b;
Monitor.Wait(gate, timeout.RemainingTime());
if (pendingB == null) return null;
pendingB = null;
}
}
}
}
}
finally
{
if (lockTaken)
{
Monitor.Exit(gate);
}
}
return new Message(null, b);
}
不确定它能满足你的要求,但这是我的主张。它基本上尽可能将任何 B 消息交给 A,并检查消息是否已发送:
class MessageWrapper
{
object gate = new object();
int? pendingB;
public Message WrapA(int a, int millisecondsTimeout)
{
int? b;
lock (gate)
{
b = pendingB;
pendingB = null;
Thread.Sleep(1); // yield. 1 seems the best value after some testing
}
return new Message(a, b);
}
public Message WrapB(int b, int millisecondsTimeout)
{
int? bb = b;
lock (gate)
{
if (pendingB == null)
{
pendingB = b;
bb = null;
}
}
Thread.Sleep(3);
if (bb == null)
{
lock (gate)
{
if (pendingB != null)
{
bb = pendingB;
pendingB = null;
}
}
}
return new Message(null, bb);
}
}
这是另一个尝试。方法是等待生成A
事件以附加到B
事件,而不是等待B
事件附加到A
事件。
object gate = new object();
int? pendingA;
public Message WrapA(int a, int millisecondsTimeout)
{
bool queued = false;
lock (gate)
{
if (pendingA == null)
{
queued = true;
pendingA = a;
Monitor.Pulse(gate);
}
}
if (queued)
{
Thread.Sleep(3);
lock (gate)
{
if (pendingA == null)
return null;
a = pendingA.Value;
pendingA = null;
}
}
return new Message(a, null);
}
public Message WrapB(int b, int millisecondsTimeout)
{
int? a;
lock (gate)
{
if (pendingA == null)
Monitor.Wait(gate, millisecondsTimeout);
a = pendingA;
pendingA = null;
}
return new Message(a, b);
}
经过三个小时的尝试,我设法得到了以下结果:
00:00:01.85773040: 0答:741乙: 241阿瑟: 59生成 A:800,发送 A:800生成 B: 300, 发送 B: 300共:1100条
我的方法:
(1(每当有消息B(从现在称为B(并且还没有B等待时,它就会将其放入"队列"。如果在给定的超时内没有其他数据包,它将发送消息。(2(当队列中实际上有一个B时,它会撞掉队列中的第一个B,并发送此消息。这是为了确保公平。正在发送的新 B 将遵循与情况 1 相同的情况(它将排队,并在给定的时间内发送(。(3(当有消息A(现在称为A(并且没有待处理的B时,将立即发送A。不执行实际等待。(4(当发送A并且队列中有B时,它会从队列中"窃取"它。两条消息都换行在一起发送。因为 B 正在等待在另一个线程上发送,而 A 窃取了它,所以我们需要一个空检查。A 会通知 B,但 B 会通知它没有什么要发送的。B 将返回空值。
要在代码中完成此操作,请执行以下操作:
public class MessageWrapper
{
readonly object _gate = new object();
int? _pendingB;
public Message WrapA(int a, int millisecondsTimeout)
{
int? currentB;
lock (_gate)
{
currentB = _pendingB;
_pendingB = null;
Monitor.Pulse(_gate); // B stolen, get rid of waiting threads
}
return new Message(a, currentB);
}
public Message WrapB(int b, int millisecondsTimeout)
{
lock (_gate)
{
if (_pendingB != null)
{
var currentB = _pendingB;
_pendingB = b;
Monitor.Pulse(_gate); // release for fairness
Monitor.Wait(_gate, millisecondsTimeout); // wait for fairness
return new Message(null, currentB);
}
else
{
_pendingB = b;
Monitor.Pulse(_gate); // release for fairness
Monitor.Wait(_gate, millisecondsTimeout); // wait for A
if (_pendingB == null) return null;
var currentB = _pendingB;
_pendingB = null;
return new Message(null, currentB);
}
}
}
}
大问题。我真的很喜欢花一些时间在这上面。我使用的解决方案的匹配数量是原始问题在我的计算机硬件上导致的匹配量的 4 倍。
也许比我更了解显示器和锁的人可以改善这一点。
-
在进行匹配时释放另一个线程,而不是让该线程执行完全睡眠只是为了最终返回 null。也许这真的没有那么昂贵。为了解决这个问题,我引入了自动重置事件,但由于我不明白的原因,自动重置事件没有按照我的预期运行,并将匹配项从 100 减少到 70。
-
线程的最终超时可以改进,因为一旦超时,它仍然需要通过有争议的锁。
它确实完全符合要求:
- 所有进程都将在指定的时间段内终止(最后一个锁定可能会增加几个周期,具体取决于锁定的争议程度(。
- 发送在锁之外。
- 使用门同步
- 无需额外的计时器
- 优先级和线程被平等对待
原始问题结果:
- 时间:4.5秒
- 答:773
- 乙: 273
- 阿瑟: 27
此类结果为:
- 时间:5.4秒
- 答:700
- 乙: 300
-
阿瑟: 100
class MessageWrapper { object gate = new object(); int EmptyThreadsToReleaseA = 0; int EmptyThreadsToReleaseB = 0; Queue<int> queueA = new Queue<int>(); Queue<int> queueB = new Queue<int>(); AutoResetEvent EmptyThreadEventA = new AutoResetEvent(false); AutoResetEvent EmptyThreadEventB = new AutoResetEvent(false); public Message WrapA(int a, int millisecondsTimeout) { lock (gate) { if (queueB.Count > 0) { Interlocked.Increment(ref EmptyThreadsToReleaseB); EmptyThreadEventB.Set(); return new Message(a, queueB.Dequeue()); } else { queueA.Enqueue(a); } } System.Threading.Thread.Sleep(millisecondsTimeout); //EmptyThreadEventA.WaitOne(millisecondsTimeout); lock (gate) { if (EmptyThreadsToReleaseA > 0) { Interlocked.Decrement(ref EmptyThreadsToReleaseA); return null; } return new Message(queueA.Dequeue(), null); } } public Message WrapB(int b, int millisecondsTimeout) { lock (gate) { if (queueA.Count > 0) { Interlocked.Increment(ref EmptyThreadsToReleaseA); EmptyThreadEventA.Set(); return new Message(queueA.Dequeue(), b); } else { queueB.Enqueue(b); } } System.Threading.Thread.Sleep(millisecondsTimeout); //EmptyThreadEventB.WaitOne(millisecondsTimeout); lock (gate) { if (EmptyThreadsToReleaseB > 0) { Interlocked.Decrement(ref EmptyThreadsToReleaseB); return null; } return new Message(null, queueB.Dequeue()); } } }
我试图避免不必要的锁定,特别是对于 A 类型的事件。此外,我对包装类的逻辑进行了一些更改。我发现直接从此类发送消息比只返回消息更方便,这是因为在我的实现中,对 SendB
的单个调用可能会发送两条 B 消息。我在代码中放了一些解释性注释
public class MessageWrapper
{
private readonly object _gate = new object();
private object _pendingB;
public void SendA(int a, int millisecondsTimeout, Action<Message> send)
{
var b = Interlocked.Exchange<object>(ref _pendingB, null);
send(new Message(a, (int?)b));
// this code will just release any pending "assure that B was sent" threads.
// but everything works fine even without it
lock (_gate)
{
Monitor.PulseAll(_gate);
}
}
public void SendB(int b, int millisecondsTimeout, Action<Message> send)
{
// needed for Interlocked to function properly and to be able to chack that exatly this b event was sent.
var boxedB = (object)(int?)b;
// excange currently pending B event with newly arrived one
var enqueuedB = Interlocked.Exchange(ref _pendingB, boxedB);
if (enqueuedB != null)
{
// if there was some pending B event then just send it.
send(new Message(null, (int?)enqueuedB));
}
// now we have to wait up to millisecondsTimeout to ensure that our message B was sent
lock (_gate)
{
// release any currently waiting threads.
Monitor.PulseAll(_gate);
if (Monitor.Wait(_gate, millisecondsTimeout))
{
// if we there pulsed, then we have nothing to do, as our event was already sent
return;
}
}
// check whether our event is still pending
enqueuedB = Interlocked.CompareExchange(ref _pendingB, null, boxedB);
if (ReferenceEquals(enqueuedB, boxedB))
{
// if so, then just send it.
send(new Message(null, (int?)enqueuedB));
}
}
}
此外,我还在您的测试类中进行了一些更改,这是我在评论中提到的一个原因 - 我在测试 AB 子句时向所有测试线程添加了同步事件。此外,我已将同时运行的线程数从您版本中的 500 个减少到 20 个(全部用于 AB 子句(。所有这些线程中的调用仍然按线程数移动(在线程 Start 方法中作为参数传递(,所以我希望测试仍然非常相关。
public static class Program
{
private static int _counter0 = 0;
private static int _counterA = 0;
private static int _counterB = 0;
private static int _counterAb = 0;
private static object _lastA;
private static object _lastB;
private static object _firstA;
private static object _firstB;
public static void Main(string[] args)
{
var wrapper = new MessageWrapper();
var sw = Stopwatch.StartNew();
var threadsCount = 10;
var a0called = 40;
// Only A events
var t0 = Start(threadsCount, a0called, 7, 1000, wrapper.SendA);
Join(t0);
var aJointCalled = 40;
var bJointCalled = 10;
var syncEvent = new CountdownEvent(threadsCount + threadsCount);
_firstA = null;
_firstB = null;
// A and B events
var t1 = Start(threadsCount, aJointCalled, 7, 1000, wrapper.SendA, syncEvent);
var t2 = Start(threadsCount, bJointCalled, 19, 1000, wrapper.SendB, syncEvent);
Join(t1);
Join(t2);
var lastA = _lastA;
var lastB = _lastB;
var b0called = 20;
// Only B events
var t3 = Start(threadsCount, b0called, 7, 1000, wrapper.SendB);
Join(t3);
Console.WriteLine(sw.Elapsed);
Console.WriteLine("0: {0}", _counter0);
Console.WriteLine("A: {0}", _counterA);
Console.WriteLine("B: {0}", _counterB);
Console.WriteLine("AB: {0}", _counterAb);
Console.WriteLine(
"Generated A: {0}, Sent A: {1}",
(threadsCount * a0called) + (threadsCount * aJointCalled),
_counterA + _counterAb);
Console.WriteLine(
"Generated B: {0}, Sent B: {1}",
(threadsCount * bJointCalled) + (threadsCount * b0called),
_counterB + _counterAb);
Console.WriteLine("First A was sent on {0: MM:hh:ss ffff}", _firstA);
Console.WriteLine("Last A was sent on {0: MM:hh:ss ffff}", lastA);
Console.WriteLine("First B was sent on {0: MM:hh:ss ffff}", _firstB);
Console.WriteLine("Last B was sent on {0: MM:hh:ss ffff}", lastB);
Console.ReadLine();
}
private static void SendMessage(Message m)
{
if (m != null)
{
if (m.A != null)
{
if (m.B != null)
{
Interlocked.Increment(ref _counterAb);
}
else
{
Interlocked.Increment(ref _counterA);
Interlocked.Exchange(ref _lastA, DateTime.Now);
Interlocked.CompareExchange(ref _firstA, DateTime.Now, null);
}
}
else if (m.B != null)
{
Interlocked.Increment(ref _counterB);
Interlocked.Exchange(ref _lastB, DateTime.Now);
Interlocked.CompareExchange(ref _firstB, DateTime.Now, null);
}
else
{
Interlocked.Increment(ref _counter0);
}
}
}
private static Thread[] Start(
int threadCount,
int eventCount,
int eventInterval,
int wrapTimeout,
Action<int, int, Action<Message>> wrap,
CountdownEvent syncEvent = null)
{
var threads = new Thread[threadCount];
for (int i = 0; i < threadCount; i++)
{
threads[i] = new Thread(
(p) =>
{
if (syncEvent != null)
{
syncEvent.Signal();
syncEvent.Wait();
}
Thread.Sleep((int)p);
for (int j = 0; j < eventCount; j++)
{
int k = (((int)p) * 1000) + j;
Thread.Sleep(eventInterval);
wrap(k, wrapTimeout, SendMessage);
}
});
threads[i].Start(i);
}
return threads;
}
private static void Join(params Thread[] threads)
{
foreach (Thread t in threads)
{
t.Join();
}
}
}
附言此外,感谢您提出非常有趣的问题。
这方面的限制因素实际上是约束,特别是仅使用 gate
进行同步的要求以及无法生成任何其他计时器/线程/任务等。这最终将编程解决方案与使用Monitor
对象联系起来。例如,Christoffer的解决方案虽然优雅,但在技术上使用gate
以外的同步,因为它被包裹在BlockingCollection
的内部。afrischke之前列出的另一个非常创新的解决方案也使用gate
以外的同步。
经过大量的实验、阅读和研究,我不得不说,我认为这个问题没有一个更好(更快?(的解决方案来完全满足约束。我使用以下机制设法获得了边际性能增益。它不漂亮,但它符合要求,至少在我的机器上平均快 1-5%;
object gate = new object();
ConcurrentDictionary<Guid, int> _bBag = new ConcurrentDictionary<Guid, int>();
public Message WrapA(int a, int millisecondsTimeout)
{
Message message = null;
int? b = null;
lock (gate)
{
if (!_bBag.IsEmpty)
{
Guid key = _bBag.Keys.FirstOrDefault();
int gotB = 0;
if (_bBag.TryRemove(key, out gotB))
{
b = gotB;
Monitor.PulseAll(gate);
}
}
}
message = new Message(a, b);
return message;
}
public Message WrapB(int b, int millisecondsTimeout)
{
Guid key = Guid.NewGuid();
_bBag.TryAdd(key, b);
lock (gate) { Monitor.Wait(gate, millisecondsTimeout); }
int storedB = 0;
if (_bBag.TryRemove(key, out storedB))
{
return new Message(null, b);
}
return null;
}
放宽gate
要求可将速度再提高一点点,尤其是在不在调试模式下运行时;
object gate = new object();
ManualResetEvent mre = new ManualResetEvent(false /*initialState*/);
ConcurrentDictionary<Guid, int> _bBag = new ConcurrentDictionary<Guid, int>();
public Message WrapA(int a, int millisecondsTimeout)
{
Message message = null;
int? b = null;
lock (gate)
{
if (!_bBag.IsEmpty)
{
Guid key = _bBag.Keys.FirstOrDefault();
int gotB = 0;
if (_bBag.TryRemove(key, out gotB))
{
b = gotB;
Monitor.PulseAll(gate);
}
}
}
message = new Message(a, b);
return message;
}
public Message WrapB(int b, int millisecondsTimeout)
{
Guid key = Guid.NewGuid();
_bBag.TryAdd(key, b);
mre.WaitOne(millisecondsTimeout); // use a manual reset instead of Monitor
int storedB = 0;
if (_bBag.TryRemove(key, out storedB))
{
return new Message(null, b);
}
return null;
}
总而言之,我想说的是,鉴于严格的要求,您已经有一个非常微调的解决方案。我真的希望我错了,有人找到更好的解决方案 - 它会非常有用!