从C#中的不同线程访问对象时出现意外行为

本文关键字:意外 对象 访问 线程 | 更新日期: 2023-09-27 18:21:05

在开发多线程windows服务时,我目前看到了一些不正常的行为。我遇到的问题是,当从不同的线程访问时,一些对象似乎正在重置。

让我用一些代码来演示(简化以解释问题)。。。。

首先,我有一个类,它基于另一个类中的方法启动线程(使用Ninject获取类),然后停止它们:

public class ContainerService : ServiceBase
{    
    private IEnumerable<IRunnableBatch> _services;
    public void start()
    {
        _services = ServiceContainer.SvcContainer.Kernel.GetAll<IRunnableBatch>();
        foreach (IRunnableBatch s in _services)
        {
            s.run();
        }
    }
    public void stop()
    {
        foreach (IRunnableBatch s in _services)
        {
            s.stop();
        }
    }
}

现在,在IRunnableBatch类的run()方法中,我有这样的东西:

public class Batch : IRunnableBatch
{
    //this class is used for starting and stopping threads as well as tracking
    //threads to restart them should the stop
    protected IWatchdog _watchdog; 
    ... code ommitted for brevity but the watchdog class is injected by Ninject
         in the constructor ...
    public void run()
    {
        _watchdog.startThreads(this);
    }
    public void stop()
    {
        _watchdog.stopThreads();
    }
}

下面是看门狗类的代码:

public class Watchdog : IWatchdog
{
    private ILog _logger;
    private Dictionary<int, MethodInfo> _batches = new Dictionary<int, MethodInfo>();
    private Dictionary<int, Thread> _threads = new Dictionary<int, Thread>();
    private IRunnableBatch _service;
    private Thread _watcher;
    private Dictionary<int, ThreadFailure> _failureCounts = new Dictionary<int, ThreadFailure>();
    private bool _runWatchdog = true;
    #region IWatchdog Members
    /**
     *  This function will scan an IRunnableService for the custom attribute
     *  "BatchAttribute" and use that to determine what methods to run when 
     *  a batch needs to be launched
     */
    public void startThreads(IRunnableBatch s)
    {
        _service = s;
        //scan service for runnable methods
        Type t = s.GetType();
        MethodInfo[] methods = t.GetMethods();
        foreach (MethodInfo m in methods)
        {
            object[] attrs = m.GetCustomAttributes(typeof(BatchAttribute), true);
            if (attrs != null && attrs.Length >= 1)
            {
                BatchAttribute b = attrs[0] as BatchAttribute;
                _batches.Add(b.Batch_Number, m);
            }
        }
        //loop through and see if the batches need to run
        foreach (KeyValuePair<int, MethodInfo> kvp in _batches)
        {
            startThread(kvp.Key, kvp.Value);
        }
        //check if the watcher thread is running. If not, start it
        if (_watcher == null || !_watcher.IsAlive)
        {
            _watcher = new Thread(new ThreadStart(watch));
            _watcher.Start();
            _logger.Info("Watcher thread started.");
        }
    }
    private void startThread(int key, MethodInfo method)
    {
        if (_service.shouldBatchRun(key))
        {
            Thread thread = new Thread(new ThreadStart(() => method.Invoke(_service, null)));
            try
            {
                thread.Start();
                _logger.Info("Batch " + key + " (" + method.Name + ") has been started.");
                if (_threads.ContainsKey(key))
                {
                    _threads[key] = thread;
                }
                else
                {
                    _threads.Add(key, thread);
                }
            }
            catch (Exception ex)
            {
                //mark this as the first problem starting the thread.
                if (ex is System.Threading.ThreadStateException || ex is System.OutOfMemoryException)
                {
                    _logger.Warn("Unable to start thread: " + method.Name, ex);
                    ThreadFailure tf = new ThreadFailure();
                    tf.Count = 1;
                    _failureCounts.Add(key, tf);
                }
                else { throw; }
            }
        }
    }
    public void stopThreads()
    {
        _logger.Info("stopThreads called");
        //stop the watcher thread first
        if (_watcher != null && _watcher.IsAlive)
        {
            _logger.Info("Stopping watcher thread.");
            _runWatchdog = false;
            _watcher.Join();
            _logger.Info("Watcher thread stopped.");
        }
        int stoppedCount = 0;
        _logger.Info("There are " + _threads.Count + " batches to stop.");
        while (stoppedCount < _threads.Count)
        {
            ArrayList stopped = new ArrayList();
            foreach (KeyValuePair<int, Thread> kvp in _threads)
            {
                if (kvp.Value.IsAlive)
                {
                    _service.stopBatch(kvp.Key);
                    kvp.Value.Join(); //wait for thread to terminate
                    _logger.Info("Batch " + kvp.Key.ToString() + " stopped");
                }
                else
                {
                    _logger.Info("Batch " + kvp.Key + " (" + _batches[kvp.Key].Name + ") has been stopped");
                    stoppedCount++;
                    stopped.Add(kvp.Key);
                }
            }
            foreach (int n in stopped)
            {
                _threads.Remove(n);
            }
        }
    }
    public void watch()
    {
        int numIntervals = 15 * 12; //15 minutes in 5 second intervals 
        while (_runWatchdog)
        {
            //cycle through the batches and check the matched threads.
            foreach (KeyValuePair<int, MethodInfo> kvp in _batches)
            {
                //if they are not running 
                if (!_threads[kvp.Key].IsAlive)
                {
                    //mark the thread failure and then try again.
                    ThreadFailure tf;
                    if (_failureCounts.ContainsKey(kvp.Key))
                    {
                        tf = _failureCounts[kvp.Key];
                    }
                    else
                    {
                        tf = new ThreadFailure();
                    }
                    tf.Count++;
                    if (tf.Count >= 8)
                    {
                        //log an error as we've been trying to start this thread for 2 hours now
                        _logger.Error("Unable to start the thread: " + kvp.Value.Name + " ***** NOT TRYING AGAIN UNTIL SERVICE RESTART");
                    }
                    else
                    {
                        _logger.Warn("Thread (" + kvp.Value.Name + ") found stopped... RESTARTING");
                        startThread(kvp.Key, kvp.Value);
                    }
                }
            }
            //sleep 15 minutes and repeat.
            _logger.Info("*** Watcher sleeping for 15 minutes");
            for (int i = 1; i <= numIntervals; i++)
            {
                if (!_runWatchdog) 
                    break;
                Thread.Sleep(5000); //sleep for 5 seconds
            }
            _logger.Info("*** Watcher woke up.");
        }
        _logger.Info("Watcher thread stopping.");
    }
    public void setLogger(ILog l)
    {
        _logger = l;
    }
    #endregion
}

因此,主程序调用ContainerService.start(),后者调用IRunnableBatch.run(),它调用IWatchdog.startThreads()。startThread()方法定位并启动它找到的所有线程,然后启动一个线程来监视其他线程,以防它们因某种原因死亡。然后函数退出,一直返回到主函数。

现在,服务只需等待服务管理器调用OnStop(),但出于测试目的,我让主线程睡眠1分钟,然后调用ContainerService.stop()。

在所有这些解释之后,我现在开始讨论这个问题。。。。呜呜!!

当主线程调用stop(),而stop()方法调用IRunnableBatch.stop()时,如果我在那里有一个断点并检查_watchdog变量,我会发现它的所有相关成员变量都被设置回了它们的初始值(没有线程,没有观察线程,没有批处理,什么都没有…)。

有人知道为什么吗?

从C#中的不同线程访问对象时出现意外行为

我看到了问题。阅读https://github.com/ninject/ninject/wiki/Multi-injection,您将看到GetAll返回一个枚举值,该枚举值在迭代时激活对象,而不是列表。因此,在ContainerService.start中,将创建可运行的批处理对象,并在stop中创建一组全新的对象。

试着在调用GetAll之后添加一个.ToList(),或者更改Ninject配置,使您的可运行程序不是瞬态的。