如何在多线程方案中对只应执行一次的代码进行单元测试

本文关键字:一次 代码 单元测试 执行 多线程 方案 | 更新日期: 2023-09-27 18:31:40

类包含一个只应创建一次的属性。创建过程是通过传入参数的Func<T>进行的。这是缓存方案的一部分。

该测试注意的是,无论有多少线程尝试访问该元素,创建都只发生一次。

单元测试的机制是在访问器周围启动大量线程,并计算调用创建函数的次数。

根本不是确定性的,不能保证这有效地测试了多线程访问。也许一次只有一个线程会击中锁。(实际上,如果lock不存在,getFunctionExecuteCount在 7 到 9 之间......在我的机器上,没有什么可以保证在 CI 服务器上它会是一样的)

如何以确定性的方式重写单元测试?如何确保lock被多个线程多次触发?

using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Example.Test
{
    public class MyObject<T> where T : class
    {
        private readonly object _lock = new object();
        private T _value = null;
        public T Get(Func<T> creator)
        {
            if (_value == null)
            {
                lock (_lock)
                {
                    if (_value == null)
                    {
                        _value = creator();
                    }
                }
            }
            return _value;
        }
    }
    [TestClass]
    public class UnitTest1
    {
        [TestMethod]
        public void MultipleParallelGetShouldLaunchGetFunctionOnlyOnce()
        {
            int getFunctionExecuteCount = 0;
            var cache = new MyObject<string>();
            Func<string> creator = () =>
            {
                Interlocked.Increment(ref getFunctionExecuteCount);
                return "Hello World!";
            };
            // Launch a very big number of thread to be sure
            Parallel.ForEach(Enumerable.Range(0, 100), _ =>
            {
                cache.Get(creator);
            });
            Assert.AreEqual(1, getFunctionExecuteCount);
        }
    }
}

最糟糕的情况是,如果有人破坏了lock代码,并且测试服务器有一些滞后。此测试不应通过:

using NUnit.Framework;
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Example.Test
{
    public class MyObject<T> where T : class
    {
        private readonly object _lock = new object();
        private T _value = null;
        public T Get(Func<T> creator)
        {
            if (_value == null)
            {
                // oups, some intern broke the code
                //lock (_lock)
                {
                    if (_value == null)
                    {
                        _value = creator();
                    }
                }
            }
            return _value;
        }
    }
    [TestFixture]
    public class UnitTest1
    {
        [Test]
        public void MultipleParallelGetShouldLaunchGetFunctionOnlyOnce()
        {
            int getFunctionExecuteCount = 0;
            var cache = new MyObject<string>();
            Func<string> creator = () =>
            {
                Interlocked.Increment(ref getFunctionExecuteCount);
                return "Hello World!";
            };
            Parallel.ForEach(Enumerable.Range(0, 2), threadIndex =>
            {
                // testing server has lag
                Thread.Sleep(threadIndex * 1000);
                cache.Get(creator);
            });
             // 1 test passed :'(
            Assert.AreEqual(1, getFunctionExecuteCount);
        }
    }
}

如何在多线程方案中对只应执行一次的代码进行单元测试

为了使其具有确定性,您只需要两个线程,并确保其中一个线程在函数内部阻塞,而另一个线程也尝试进入内部。

[TestMethod]
public void MultipleParallelGetShouldLaunchGetFunctionOnlyOnce()
{
    var evt = new ManualResetEvent(false);
    int functionExecuteCount = 0;
    var cache = new MyObject<object>();
    Func<object> creator = () =>
    {
        Interlocked.Increment(ref functionExecuteCount);
        evt.WaitOne();
        return new object();
    };
    var t1 = Task.Run(() => cache.Get(creator));
    var t2 = Task.Run(() => cache.Get(creator));
    // Wait for one task to get inside the function
    while (functionExecuteCount == 0)
        Thread.Yield();
    // Allow the function to finish executing
    evt.Set();
    // Wait for completion
    Task.WaitAll(t1, t2);
    Assert.AreEqual(1, functionExecuteCount);
    Assert.AreEqual(t1.Result, t2.Result);
}

您可能希望在此测试:)上设置超时


这是一个允许测试更多案例的变体:

public void MultipleParallelGetShouldLaunchGetFunctionOnlyOnce()
{
    var evt = new ManualResetEvent(false);
    int functionExecuteCount = 0;
    var cache = new MyObject<object>();
    Func<object> creator = () =>
    {
        Interlocked.Increment(ref functionExecuteCount);
        evt.WaitOne();
        return new object();
    };
    object r1 = null, r2 = null;
    var t1 = new Thread(() => { r1 = cache.Get(creator); });
    t1.Start();
    var t2 = new Thread(() => { r2 = cache.Get(creator); });
    t2.Start();
    // Make sure both threads are blocked
    while (t1.ThreadState != ThreadState.WaitSleepJoin)
        Thread.Yield();
    while (t2.ThreadState != ThreadState.WaitSleepJoin)
        Thread.Yield();
    // Let them continue
    evt.Set();
    // Wait for completion
    t1.Join();
    t2.Join();
    Assert.AreEqual(1, functionExecuteCount);
    Assert.IsNotNull(r1);
    Assert.AreEqual(r1, r2);
}

如果要延迟第二次调用,则无法使用Thread.Sleep,因为它会导致线程进入WaitSleepJoin状态:

线程被阻塞。这可能是调用Thread.SleepThread.Join、请求锁(例如,通过调用Monitor.EnterMonitor.Wait)或等待线程同步对象(如ManualResetEvent

)的结果。

而且我们将无法判断线程是处于睡眠状态还是在您的ManualResetEvent上等待......

但是您可以轻松地用忙碌的等待来代替睡眠。注释掉lock,并将t2更改为:

var t2 = new Thread(() =>
{
    var sw = Stopwatch.StartNew();
    while (sw.ElapsedMilliseconds < 1000)
        Thread.Yield();
    r2 = cache.Get(creator);
});

现在测试将失败。

我认为不存在真正的确定性方法,但是您可以提高概率,这样就很难不引起并发种族:

Interlocked.Increment(ref getFunctionExecuteCount);
Thread.Yield();
Thread.Sleep(1);
Thread.Yield();
return "Hello World!";

通过提高Sleep()参数(到10?),越来越不可能没有并发比赛发生。

除了pid的回答:
此代码实际上并没有创建很多线程。

// Launch a very big number of thread to be sure
Parallel.ForEach(Enumerable.Range(0, 100), _ =>
{
    cache.Get(creator);
});

它将启动~Environment.ProcessorCount线程。更多细节。

如果你想得到很多线程,你应该明确地去做。

var threads = Enumerable.Range(0, 100)
    .Select(_ => new Thread(() => cache.Get(creator))).ToList();
threads.ForEach(thread => thread.Start());
threads.ForEach(thread => thread.Join());

因此,如果你有足够的线程,并且你会强制它们切换,你会得到并发竞争。

如果您关心 CI 服务器只有一个可用内核的情况,则可以通过更改Process.ProcessorAffinity属性在测试中包含此约束。更多细节。