使用多线程可能会导致内存使用过多

本文关键字:内存 多线程 | 更新日期: 2023-09-27 18:32:26

我有一个将消息记录到数据库(或其他地方)的Windows服务项目。这些消息的频率可能高达每秒十个。由于发送和处理消息不应延迟服务的主进程,因此我启动一个新线程来处理每条消息。这意味着,如果主进程需要发送 100 条日志消息,则会启动 100 个线程来处理每条消息。我了解到,当线程完成时,它将被清洁,所以我不必处理它。只要我在线程中处理所有使用的对象,一切应该都可以正常工作。

服务可能会进入导致关闭服务的异常。在服务关闭之前,它应等待记录消息的所有线程。为了实现这一点,它会在每次启动线程时将线程添加到列表中。调用 wait-for-threads 方法时,将检查列表中的所有线程是否仍处于活动状态,如果是,则使用 join 来等待它。

代码:

创建线程:

/// <summary>
    /// Creates a new thread and sends the message
    /// </summary>
    /// <param name="logMessage"></param>
    private static void ThreadSend(IMessage logMessage)
    {
        ParameterizedThreadStart threadStart = new ParameterizedThreadStart(MessageHandler.HandleMessage);
        Thread messageThread = new Thread(threadStart);
        messageThread.Name = "LogMessageThread";            
        messageThread.Start(logMessage);
        threads.Add(messageThread);
    }

等待线程结束:

    /// <summary>
    /// Waits for threads that are still being processed
    /// </summary>
    public static void WaitForThreads()
    {
        int i = 0;
        foreach (Thread thread in threads)
        {
            i++;
            if (thread.IsAlive)
            {
                Debug.Print("waiting for {0} - {1} to end...", thread.Name, i);
                thread.Join();
            }
        }
    }

现在我主要担心的是,如果这个服务运行一个月,它仍然会在列表中拥有所有线程(数百万个)(其中大多数是死的)。这会吞噬记忆,我不知道有多少。总的来说,这对我来说似乎不是一个好的做法,我想清理完成的线程,但我找不到该怎么做。有没有人对此有好的或最佳实践?

使用多线程可能会导致内存使用过多

如果

线程已死,请从列表中删除它们?

/// <summary>
/// Waits for threads that are still being processed
/// </summary>
public static void WaitForThreads()
{
    List<Thread> toRemove = new List<int>();
    int i = 0;
    foreach (Thread thread in threads)
    {
        i++;
        if (thread.IsAlive)
        {
            Debug.Print("waiting for {0} - {1} to end...", thread.Name, i);
            thread.Join();
        }
        else
        {
            toRemove.Add(thread);
        }
    }
    threads.RemoveAll(x => toRemove.Contains(x));
}

查看任务并行性

首先:为每个日志消息创建一个线程不是一个好主意。使用 ThreadPool 或创建有限数量的工作线程来处理来自公共队列(生产者/使用者)的日志项。

第二:当然,您还需要从列表中删除线程引用!当线程方法结束时,它可以自行删除,或者您甚至可以定期执行此操作。例如,每半小时运行一次计时器,检查列表中是否有死线程并删除它们。

如果您在这些线程中所做的只是日志记录,则可能应该有一个日志记录线程和一个主线程放置消息的共享队列。然后,日志记录线程可以读取队列和日志。使用BlockingCollection,这非常容易。

在服务的主线程中创建队列:

BlockingCollection<IMessage> LogMessageQueue = new BlockingCollection<IMessage>();

服务的主线程创建一个 Logger(见下文)实例,该实例启动线程来处理日志消息。主线程将项目添加到LogMessageQueue。记录器线程从队列中读取它们。当主线程想要关闭时,它会调用LogMessageQueue.CompleteAdding 。记录器将清空队列并退出。

主线程如下所示:

// start the logger
Logger _loggingThread = new Logger(LogMessageQueue);
// to log a message:
LogMessageQueue.Add(logMessage);
// when the program needs to shut down:
LogMessageQueue.CompleteAdding();

和记录器类:

class Logger
{
    BlockingCollection<IMessage> _queue;
    Thread _loggingThread;
    public Logger(BlockingCollection<IMessage> queue)
    {
        _queue = queue;
        _loggingThread = new Thread(LoggingThreadProc);
    }
    private void LoggingThreadProc(object state)
    {
        IMessage msg;
        while (_queue.TryTake(out msg, TimeSpan.Infinite))
        {
            // log the item
        }
    }
}
这样,您

只需要一个额外的线程,就可以保证按照发送的顺序处理消息(当前的方法并非如此),并且您不必担心跟踪线程关闭等。

更新

如果某些日志消息需要一些时间来处理(例如,您描述的电子邮件),则可以异步处理它们。例如:

while (_queue.TryTake(out msg, TimeSpan.Infinite))
{
    if (msg.Type == Email)
    {
        // start asynchronous task to send email
    }
    else
    {
        // write to log file
    }
}

这样,只有那些可能需要大量时间的消息才会异步运行。如果需要,您还可以在此处为电子邮件设置辅助队列。这样您就不会陷入一堆电子邮件线程的泥潭。相反,您将其限制为一两个,或者可能是少数几个。

请注意,如果需要,您还可以拥有多个 Logger 实例,所有实例都从同一消息队列读取。只需确保它们各自写入不同的日志文件即可。队列本身将支持多个使用者。

我认为

一般来说,解决您的问题的方法可能不是最佳实践。我的意思是,您只想在数据库中存储 1000 条消息,而不是创建 1000 个线程,对吗?而且您似乎想异步执行此操作。

但是为每个消息创建一个线程并不是一个好主意,实际上并不能解决这个问题......

相反,我会尝试实现类似消息队列的东西。您可以有多个队列,每个队列都有自己的线程。如果消息传入,您将它们发送到其中一个队列(交替)...

队列要么等待一定数量的消息,要么总是等待一定的时间(例如 1 秒,取决于在数据库中存储例如 100 条消息所需的时间),直到它尝试将排队的消息存储在数据库中。这样,您实际上应该始终具有恒定数量的线程,并且您不应该看到任何性能问题......

此外,它还使您能够批量插入数据,而不仅仅是一个接一个地插入数据库连接的开销等......

当然,如果您的数据库速度较慢,那么任务能够存储消息,越来越多的消息将被排队......但对于您当前的解决方案也是如此。

由于多个答案和评论导致了我的解决方案,我将在此处发布完整的代码。

我使用线程池来管理此页面中用于 wting 函数的线程和代码。

创建线程:

private static void ThreadSend(IMessage logMessage)
        {    
            ThreadPool.QueueUserWorkItem(MessageHandler.HandleMessage, logMessage);
        }

等待线程完成:

public static bool WaitForThreads(int maxWaitingTime)
        {
            int maxThreads = 0;
            int placeHolder = 0;
            int availableThreads = 0;
            while (maxWaitingTime > 0)
            {
                System.Threading.ThreadPool.GetMaxThreads(out maxThreads, out placeHolder);
                System.Threading.ThreadPool.GetAvailableThreads(out availableThreads, out placeHolder);
                //Stop if all threads are available
                if (availableThreads == maxThreads)
                {
                    return true;
                }
                System.Threading.Thread.Sleep(TimeSpan.FromMilliseconds(1000));
                --maxWaitingTime;            
            }
            return false;
        }

(可选)可以在这些方法之外的某个位置添加此项,以限制池中的线程数。

System.Threading.ThreadPool.SetMaxThreads(MaxWorkerThreads, MaxCompletionPortThreads);