用大量的“;锁”;到更多的无锁代码
本文关键字:代码 | 更新日期: 2023-09-27 17:59:39
Upd感谢Matthew Watson注意到并注意到我计划将代码移植到c++-linux,所以我更喜欢"独立于平台"的代码
我的交易应用程序几乎没有锁定。下面的代码是我使用锁的唯一地方。让我从代码开始,它很长,但不要担心有很多重复的部分,所以它很简单。我更喜欢添加所有"重复"部分,以更好地展示我的东西是如何工作的:
Task.Factory.StartNew(() =>
{
while (true)
{
Iterate();
}
}, TaskCreationOptions.LongRunning);
private void Iterate()
{
bool marketDataUpdated = false;
lock (ordersToRegisterLock)
{
if (ordersToRegister.Count > 0)
{
marketDataUpdated = true;
while (ordersToRegister.Count > 0)
{
Order order = ordersToRegister.Dequeue();
// Stage1, Process
}
}
}
lock (aggrUpdatesLock)
{
if (aggrUpdates.Count > 0)
{
marketDataUpdated = true;
while (!aggrUpdates.IsNullOrEmpty())
{
var entry = aggrUpdates.Dequeue();
// Stage1, Process
}
}
}
lock (commonUpdatesLock)
{
if (commonUpdates.Count > 0)
{
marketDataUpdated = true;
while (!commonUpdates.IsNullOrEmpty())
{
var entry = commonUpdates.Dequeue();
// Stage1, Process
}
}
}
lock (infoUpdatesLock)
{
if (infoUpdates.Count > 0)
{
marketDataUpdated = true;
while (!infoUpdates.IsNullOrEmpty())
{
var entry = infoUpdates.Dequeue();
// Stage1, Process
}
}
}
lock (tradeUpdatesLock)
{
if (tradeUpdates.Count > 0)
{
marketDataUpdated = true;
while (!tradeUpdates.IsNullOrEmpty())
{
var entry = tradeUpdates.Dequeue();
// Stage1, Process
}
}
}
if (marketDataUpdated)
{
// Stage2 !
// make a lot of work. expensive operation. recalculate strategies, place orders etc.
}
}
private readonly Queue<Order> ordersToRegister = new Queue<Order>();
private readonly object ordersToRegisterLock = new object();
private readonly Queue<AggrEntry> aggrUpdates = new Queue<AggrEntry>();
private readonly object aggrUpdatesLock = new object();
private readonly Queue<CommonEntry> commonUpdates = new Queue<CommonEntry>();
private readonly object commonUpdatesLock = new object();
private readonly Queue<InfoEntry> infoUpdates = new Queue<InfoEntry>();
private readonly object infoUpdatesLock = new object();
private readonly Queue<TradeEntry> tradeUpdates = new Queue<TradeEntry>();
private readonly object tradeUpdatesLock = new object();
public void RegistorOrder(object sender, Gate.RegisterOrderArgs e)
{
lock (ordersToRegisterLock)
{
ordersToRegister.Enqueue(e.order);
}
}
public void TradeUpdated(object sender, Gate.TradeArgs e)
{
lock (tradeUpdatesLock)
{
foreach (var entry in e.entries)
{
tradeUpdates.Enqueue(entry);
}
}
}
public void InfoUpdated(object sender, Gate.InfoArgs e)
{
lock (infoUpdatesLock)
{
foreach (var entry in e.entries)
{
infoUpdates.Enqueue(entry);
}
}
}
public void CommonUpdated(object sender, Gate.CommonArgs e)
{
lock (commonUpdatesLock)
{
foreach (var entry in e.entries)
{
commonUpdates.Enqueue(entry);
}
}
}
public void AggrUpdated(object sender, Gate.AggrArgs e)
{
lock (aggrUpdatesLock)
{
foreach (var entry in e.entries)
{
aggrUpdates.Enqueue(entry);
}
}
}
在我的代码中,我有两个阶段。Stage1
是更新阶段,Stage2
是工作阶段。我需要尽快在这两个阶段之间切换,比如:
- 有什么更新吗?否
- 有什么更新吗?否
- 有更新吗?是的,订单已更新!应用更新,执行
Stage2
- 有什么更新吗?否
- 有什么更新吗?是的,订单需要注册!应用更新,执行
Stage2
- 有什么更新吗?是,发生交易,应用更新,执行
Stage2
在Stage2
中,我不应该更新,但应该不断"收集"更新,以便以后应用它们。
重要的是,这是非常延迟关键的代码,所以我同意"花掉"一个核心,因为它具有最小延迟!因此,当发生任何更新时,我需要尽快处理并执行Stage2
。
所以我希望现在清楚我需要实现什么,也清楚我是如何实现的。现在是时候讨论我的代码有多好了。我确实看到了几个潜在的问题:
- 很多锁!它能被一些"免锁"代码取代吗?带CAS的旋转锁还是什么
- 占用100%的CPU核心,是否可以节省一些CPU资源而不影响延迟
- 我能告诉你吗。NET使用"专用"核心(设置任务相关性?)以避免额外的"切换"
- 我从一个线程向队列添加,并从另一个线程读取队列。这会是个问题吗?如果向队列添加和读取是不稳定的?我的读取线程是否可能因为缓存更新问题而看不到来自队列的更新
欢迎任何关于如何改进我所写内容的建议,谢谢!
upd部分解决-据我所知,我最好将查询替换为无锁(可能是基于环形缓冲区的?)查询。。我想我以后会使用c++版本的distributor。我也用过这篇文章http://www.umbraworks.net/bl0g/rebuildall/2010/03/08/Running_NET_threads_on_selected_processor_cores并将Task替换为在"固定"内核上运行的Thread,然而我仍然在使用"繁忙旋转",也许我应该使用更智能的东西?
使用下面的代码,您在"阶段1"处理过程中不再被锁定:
Task.Factory.StartNew(() =>
{
while (true)
{
Iterate();
}
}, TaskCreationOptions.LongRunning);
private void Iterate()
{
bool marketDataUpdated = false;
foreach (Order order in ordersToRegister)
{
marketDataUpdated = true;
// Stage1, Process
}
foreach (var entry in aggrUpdates)
{
marketDataUpdated = true;
// Stage1, Process
}
foreach (var entry in commonUpdates)
{
marketDataUpdated = true;
// Stage1, Process
}
foreach (var entry in infoUpdates)
{
marketDataUpdated = true;
// Stage1, Process
}
foreach (var entry in tradeUpdates)
{
marketDataUpdated = true;
// Stage1, Process
}
if (marketDataUpdated)
{
// Stage2 !
// make a lot of work. expensive operation. recalculate strategies, place orders etc.
}
}
private readonly ConcurrentQueue<Order> ordersToRegister = new ConcurrentQueue<Order>();
private readonly ConcurrentQueue<AggrEntry> aggrUpdates = new ConcurrentQueue<AggrEntry>();
private readonly ConcurrentQueue<CommonEntry> commonUpdates = new ConcurrentQueue<CommonEntry>();
private readonly ConcurrentQueue<InfoEntry> infoUpdates = new ConcurrentQueue<InfoEntry>();
private readonly ConcurrentQueue<TradeEntry> tradeUpdates = new ConcurrentQueue<TradeEntry>();
public void RegistorOrder(object sender, Gate.RegisterOrderArgs e)
{
ordersToRegister.Enqueue(e.order);
}
public void TradeUpdated(object sender, Gate.TradeArgs e)
{
foreach (var entry in e.entries)
{
tradeUpdates.Enqueue(entry);
}
}
public void InfoUpdated(object sender, Gate.InfoArgs e)
{
foreach (var entry in e.entries)
{
infoUpdates.Enqueue(entry);
}
}
public void CommonUpdated(object sender, Gate.CommonArgs e)
{
foreach (var entry in e.entries)
{
commonUpdates.Enqueue(entry);
}
}
public void AggrUpdated(object sender, Gate.AggrArgs e)
{
foreach (var entry in e.entries)
{
aggrUpdates.Enqueue(entry);
}
}
这里有一种可能更便携的方法。希望能有所帮助。
public class SafeQueue<T> : Queue<T>
{
public T SafeDequeue()
{
lock (this)
{
return (Count > 0) ? Dequeue() : null;
}
}
public void SafeEnqueue(T entry)
{
lock (this)
{
Enqueue(entry);
}
}
}
Task.Factory.StartNew(() =>
{
while (true)
{
Iterate();
}
}, TaskCreationOptions.LongRunning);
private void Iterate()
{
bool marketDataUpdated = false;
while ((Order order = ordersToRegister.SafeDequeue()) != null)
{
marketDataUpdated = true;
// Stage1, Process
}
while ((var entry = aggrUpdates.SafeDequeue()) != null)
{
marketDataUpdated = true;
// Stage1, Process
}
while ((var entry = commonUpdates.SafeDequeue()) != null)
{
marketDataUpdated = true;
// Stage1, Process
}
while ((var entry = infoUpdates.SafeDequeue()) != null)
{
marketDataUpdated = true;
// Stage1, Process
}
while ((var entry = tradeUpdates.SafeDequeue()) != null)
{
marketDataUpdated = true;
// Stage1, Process
}
if (marketDataUpdated)
{
// Stage2 !
// make a lot of work. expensive operation. recalculate strategies, place orders etc.
}
}
private readonly SafeQueue<Order> ordersToRegister = new SafeQueue<Order>();
private readonly SafeQueue<AggrEntry> aggrUpdates = new SafeQueue<AggrEntry>();
private readonly SafeQueue<CommonEntry> commonUpdates = new SafeQueue<CommonEntry>();
private readonly SafeQueue<InfoEntry> infoUpdates = new SafeQueue<InfoEntry>();
private readonly SafeQueue<TradeEntry> tradeUpdates = new SafeQueue<TradeEntry>();
public void RegistorOrder(object sender, Gate.RegisterOrderArgs e)
{
ordersToRegister.SafeEnqueue(e.order);
}
public void TradeUpdated(object sender, Gate.TradeArgs e)
{
foreach (var entry in e.entries)
{
tradeUpdates.SafeEnqueue(entry);
}
}
public void InfoUpdated(object sender, Gate.InfoArgs e)
{
foreach (var entry in e.entries)
{
infoUpdates.SafeEnqueue(entry);
}
}
public void CommonUpdated(object sender, Gate.CommonArgs e)
{
foreach (var entry in e.entries)
{
commonUpdates.SafeEnqueue(entry);
}
}
public void AggrUpdated(object sender, Gate.AggrArgs e)
{
foreach (var entry in e.entries)
{
aggrUpdates.SafeEnqueue(entry);
}
}