使用多线程可能会导致内存使用过多
本文关键字:内存 多线程 | 更新日期: 2023-09-27 18:32:26
我有一个将消息记录到数据库(或其他地方)的Windows服务项目。这些消息的频率可能高达每秒十个。由于发送和处理消息不应延迟服务的主进程,因此我启动一个新线程来处理每条消息。这意味着,如果主进程需要发送 100 条日志消息,则会启动 100 个线程来处理每条消息。我了解到,当线程完成时,它将被清洁,所以我不必处理它。只要我在线程中处理所有使用的对象,一切应该都可以正常工作。
服务可能会进入导致关闭服务的异常。在服务关闭之前,它应等待记录消息的所有线程。为了实现这一点,它会在每次启动线程时将线程添加到列表中。调用 wait-for-threads 方法时,将检查列表中的所有线程是否仍处于活动状态,如果是,则使用 join 来等待它。
代码:
创建线程:
/// <summary>
/// Creates a new thread and sends the message
/// </summary>
/// <param name="logMessage"></param>
private static void ThreadSend(IMessage logMessage)
{
ParameterizedThreadStart threadStart = new ParameterizedThreadStart(MessageHandler.HandleMessage);
Thread messageThread = new Thread(threadStart);
messageThread.Name = "LogMessageThread";
messageThread.Start(logMessage);
threads.Add(messageThread);
}
等待线程结束:
/// <summary>
/// Waits for threads that are still being processed
/// </summary>
public static void WaitForThreads()
{
int i = 0;
foreach (Thread thread in threads)
{
i++;
if (thread.IsAlive)
{
Debug.Print("waiting for {0} - {1} to end...", thread.Name, i);
thread.Join();
}
}
}
现在我主要担心的是,如果这个服务运行一个月,它仍然会在列表中拥有所有线程(数百万个)(其中大多数是死的)。这会吞噬记忆,我不知道有多少。总的来说,这对我来说似乎不是一个好的做法,我想清理完成的线程,但我找不到该怎么做。有没有人对此有好的或最佳实践?
线程已死,请从列表中删除它们?
/// <summary>
/// Waits for threads that are still being processed
/// </summary>
public static void WaitForThreads()
{
List<Thread> toRemove = new List<int>();
int i = 0;
foreach (Thread thread in threads)
{
i++;
if (thread.IsAlive)
{
Debug.Print("waiting for {0} - {1} to end...", thread.Name, i);
thread.Join();
}
else
{
toRemove.Add(thread);
}
}
threads.RemoveAll(x => toRemove.Contains(x));
}
查看任务并行性
首先:为每个日志消息创建一个线程不是一个好主意。使用 ThreadPool
或创建有限数量的工作线程来处理来自公共队列(生产者/使用者)的日志项。
第二:当然,您还需要从列表中删除线程引用!当线程方法结束时,它可以自行删除,或者您甚至可以定期执行此操作。例如,每半小时运行一次计时器,检查列表中是否有死线程并删除它们。
如果您在这些线程中所做的只是日志记录,则可能应该有一个日志记录线程和一个主线程放置消息的共享队列。然后,日志记录线程可以读取队列和日志。使用BlockingCollection,这非常容易。
在服务的主线程中创建队列:
BlockingCollection<IMessage> LogMessageQueue = new BlockingCollection<IMessage>();
服务的主线程创建一个 Logger
(见下文)实例,该实例启动线程来处理日志消息。主线程将项目添加到LogMessageQueue
。记录器线程从队列中读取它们。当主线程想要关闭时,它会调用LogMessageQueue.CompleteAdding
。记录器将清空队列并退出。
主线程如下所示:
// start the logger
Logger _loggingThread = new Logger(LogMessageQueue);
// to log a message:
LogMessageQueue.Add(logMessage);
// when the program needs to shut down:
LogMessageQueue.CompleteAdding();
和记录器类:
class Logger
{
BlockingCollection<IMessage> _queue;
Thread _loggingThread;
public Logger(BlockingCollection<IMessage> queue)
{
_queue = queue;
_loggingThread = new Thread(LoggingThreadProc);
}
private void LoggingThreadProc(object state)
{
IMessage msg;
while (_queue.TryTake(out msg, TimeSpan.Infinite))
{
// log the item
}
}
}
这样,您只需要一个额外的线程,就可以保证按照发送的顺序处理消息(当前的方法并非如此),并且您不必担心跟踪线程关闭等。
更新
如果某些日志消息需要一些时间来处理(例如,您描述的电子邮件),则可以异步处理它们。例如:
while (_queue.TryTake(out msg, TimeSpan.Infinite))
{
if (msg.Type == Email)
{
// start asynchronous task to send email
}
else
{
// write to log file
}
}
这样,只有那些可能需要大量时间的消息才会异步运行。如果需要,您还可以在此处为电子邮件设置辅助队列。这样您就不会陷入一堆电子邮件线程的泥潭。相反,您将其限制为一两个,或者可能是少数几个。
请注意,如果需要,您还可以拥有多个 Logger
实例,所有实例都从同一消息队列读取。只需确保它们各自写入不同的日志文件即可。队列本身将支持多个使用者。
一般来说,解决您的问题的方法可能不是最佳实践。我的意思是,您只想在数据库中存储 1000 条消息,而不是创建 1000 个线程,对吗?而且您似乎想异步执行此操作。
但是为每个消息创建一个线程并不是一个好主意,实际上并不能解决这个问题......
相反,我会尝试实现类似消息队列的东西。您可以有多个队列,每个队列都有自己的线程。如果消息传入,您将它们发送到其中一个队列(交替)...
队列要么等待一定数量的消息,要么总是等待一定的时间(例如 1 秒,取决于在数据库中存储例如 100 条消息所需的时间),直到它尝试将排队的消息存储在数据库中。这样,您实际上应该始终具有恒定数量的线程,并且您不应该看到任何性能问题......
此外,它还使您能够批量插入数据,而不仅仅是一个接一个地插入数据库连接的开销等......
当然,如果您的数据库速度较慢,那么任务能够存储消息,越来越多的消息将被排队......但对于您当前的解决方案也是如此。
由于多个答案和评论导致了我的解决方案,我将在此处发布完整的代码。
我使用线程池来管理此页面中用于 wting 函数的线程和代码。
创建线程:
private static void ThreadSend(IMessage logMessage)
{
ThreadPool.QueueUserWorkItem(MessageHandler.HandleMessage, logMessage);
}
等待线程完成:
public static bool WaitForThreads(int maxWaitingTime)
{
int maxThreads = 0;
int placeHolder = 0;
int availableThreads = 0;
while (maxWaitingTime > 0)
{
System.Threading.ThreadPool.GetMaxThreads(out maxThreads, out placeHolder);
System.Threading.ThreadPool.GetAvailableThreads(out availableThreads, out placeHolder);
//Stop if all threads are available
if (availableThreads == maxThreads)
{
return true;
}
System.Threading.Thread.Sleep(TimeSpan.FromMilliseconds(1000));
--maxWaitingTime;
}
return false;
}
(可选)可以在这些方法之外的某个位置添加此项,以限制池中的线程数。
System.Threading.ThreadPool.SetMaxThreads(MaxWorkerThreads, MaxCompletionPortThreads);