c# - Lock不能工作,但是Volatile AND Lock可以

本文关键字:Lock Volatile AND 但是 可以 工作 不能 | 更新日期: 2023-09-27 18:07:00

我正试图以最快的速度和尽可能有效地轮询API以获取市场数据。该API允许您根据每个请求从batchSize markets获取市场数据。该API允许您有3个并发请求,但不能超过(或抛出错误)。

我可能从许多比batchSize不同的市场请求数据。

我不断地循环遍历所有的市场,分批请求数据,每个线程一个批次,任何时候都有3个线程。

市场总数(以及批次)可以随时变化。

我使用下面的代码:

private static object lockObj = new object();
private void PollMarkets()
{
    const int NumberOfConcurrentRequests = 3;
    for (int i = 0; i < NumberOfConcurrentRequests; i++)
    {
        int batch = 0;
        Task.Factory.StartNew(async () =>
        {    
            while (true)
            {
                if (markets.Count > 0)
                {
                    List<string> batchMarketIds;
                    lock (lockObj)
                    {
                        var numBatches = (int)Math.Ceiling((double)markets.Count / batchSize);
                        batchMarketIds = markets.Keys.Skip(batch*batchSize).Take(batchSize).ToList();
                        batch = (batch + 1) % numBatches;
                    }
                    var marketData = await GetMarketData(batchMarketIds);
                    // Do something with marketData
                    }
                    else
                    {
                        await Task.Delay(1000); // wait for some markets to be added.
                    }
                }
            }
       });
    }
}

即使在临界区有一个锁,每个线程都以batch = 0开始(每个线程经常轮询重复的数据)。

如果我将batch更改为私有volatile字段,上面的代码将按照我希望的那样工作(volatile和lock)。

所以出于某种原因我的锁不工作?我觉得这是很明显的,但我错过了它。

我认为最好在这里使用锁而不是volatile字段,这也是正确的吗?

谢谢

c# - Lock不能工作,但是Volatile AND Lock可以

问题是您在for循环中定义批处理变量。这意味着线程正在使用它们自己的变量,而不是共享它。

在我看来,您应该使用Queue<>来创建一个作业管道。

像这样

private int batchSize = 10;
private Queue<int> queue = new Queue<int>();
private void AddMarket(params int[] marketIDs)
{
    lock (queue)
    {
        foreach (var marketID in marketIDs)
        {
            queue.Enqueue(marketID);
        }
        if (queue.Count >= batchSize)
        {
            Monitor.Pulse(queue);
        }
    }
}
private void Start()
{
    for (var tid = 0; tid < 3; tid++)
    {
        Task.Run(async () =>
        {
            while (true)
            {
                List<int> toProcess;
                lock (queue)
                {
                    if (queue.Count < batchSize)
                    {
                        Monitor.Wait(queue);
                        continue;
                    }
                    toProcess = new List<int>(batchSize);
                    for (var count = 0; count < batchSize; count++)
                    {
                        toProcess.Add(queue.Dequeue());
                    }
                    if (queue.Count >= batchSize)
                    {
                        Monitor.Pulse(queue);
                    }
                }
                var marketData = await GetMarketData(toProcess);
            }
        });
    }
}