如何处理使用SemaphoreSlim时挂起的线程
本文关键字:SemaphoreSlim 挂起 线程 何处理 处理 | 更新日期: 2023-09-27 18:09:04
我有一些通过第三方库运行数千个url的代码。偶尔,库中的方法会挂起,占用一个线程。过了一段时间,所有的线程都被什么都不做的进程占用了,然后就停止了。
我使用SemaphoreSlim
来控制添加新线程,所以我可以有一个最佳数量的任务运行。我需要一种方法来识别运行时间过长的任务,然后杀死它们,同时从SemaphoreSlim
释放一个线程,以便创建一个新任务。
我正在努力与这里的方法,所以我做了一些测试代码,模仿我正在做什么。它创建的任务有10%的挂起机会,所以很快所有线程都挂起了。
我应该如何检查这些并删除它们?
代码如下:
class Program
{
public static SemaphoreSlim semaphore;
public static List<Task> taskList;
static void Main(string[] args)
{
List<string> urlList = new List<string>();
Console.WriteLine("Generating list");
for (int i = 0; i < 1000; i++)
{
//adding random strings to simulate a large list of URLs to process
urlList.Add(Path.GetRandomFileName());
}
Console.WriteLine("Queueing tasks");
semaphore = new SemaphoreSlim(10, 10);
Task.Run(() => QueueTasks(urlList));
Console.ReadLine();
}
static void QueueTasks(List<string> urlList)
{
taskList = new List<Task>();
foreach (var url in urlList)
{
Console.WriteLine("{0} tasks can enter the semaphore.",
semaphore.CurrentCount);
semaphore.Wait();
taskList.Add(DoTheThing(url));
}
}
static async Task DoTheThing(string url)
{
Random rand = new Random();
// simulate the IO process
await Task.Delay(rand.Next(2000, 10000));
// add a 10% chance that the thread will hang simulating what happens occasionally with http request
int chance = rand.Next(1, 100);
if (chance <= 10)
{
while (true)
{
await Task.Delay(1000000);
}
}
semaphore.Release();
Console.WriteLine(url);
}
}
正如人们已经指出的那样,终止线程通常是不好的,并且在c#中没有保证这样做的方法。使用一个单独的进程来完成工作,然后杀死它,这比尝试Thread.Abort要好一些。但仍然不是最好的办法。理想情况下,您需要协作的线程/进程,它们使用IPC来决定何时自救。这样可以正确地完成清理。
说了这么多,你可以使用下面的代码来做你想做的事情。我写这篇文章是假设你的任务将在一个线程中完成。只要稍加修改,您就可以在进程
中使用相同的逻辑来执行任务。代码绝不是无懈可击的,只是为了说明问题。并发代码并没有得到很好的测试。锁的持有时间超过了需要的时间,并且有些地方我没有锁定(如Log函数)
class TaskInfo {
public Thread Task;
public DateTime StartTime;
public TaskInfo(ParameterizedThreadStart startInfo, object startArg) {
Task = new Thread(startInfo);
Task.Start(startArg);
StartTime = DateTime.Now;
}
}
class Program {
const int MAX_THREADS = 1;
const int TASK_TIMEOUT = 6; // in seconds
const int CLEANUP_INTERVAL = TASK_TIMEOUT; // in seconds
public static SemaphoreSlim semaphore;
public static List<TaskInfo> TaskList;
public static object TaskListLock = new object();
public static Timer CleanupTimer;
static void Main(string[] args) {
List<string> urlList = new List<string>();
Log("Generating list");
for (int i = 0; i < 2; i++) {
//adding random strings to simulate a large list of URLs to process
urlList.Add(Path.GetRandomFileName());
}
Log("Queueing tasks");
semaphore = new SemaphoreSlim(MAX_THREADS, MAX_THREADS);
Task.Run(() => QueueTasks(urlList));
CleanupTimer = new Timer(CleanupTasks, null, CLEANUP_INTERVAL * 1000, CLEANUP_INTERVAL * 1000);
Console.ReadLine();
}
// TODO: Guard against re-entrancy
static void CleanupTasks(object state) {
Log("CleanupTasks started");
lock (TaskListLock) {
var now = DateTime.Now;
int n = TaskList.Count;
for (int i = n - 1; i >= 0; --i) {
var task = TaskList[i];
Log($"Checking task with ID {task.Task.ManagedThreadId}");
// kill processes running for longer than anticipated
if (task.Task.IsAlive && now.Subtract(task.StartTime).TotalSeconds >= TASK_TIMEOUT) {
Log("Cleaning up hung task");
task.Task.Abort();
}
// remove task if it is not alive
if (!task.Task.IsAlive) {
Log("Removing dead task from list");
TaskList.RemoveAt(i);
continue;
}
}
if (TaskList.Count == 0) {
Log("Disposing cleanup thread");
CleanupTimer.Dispose();
}
}
Log("CleanupTasks done");
}
static void QueueTasks(List<string> urlList) {
TaskList = new List<TaskInfo>();
foreach (var url in urlList) {
Log($"Trying to schedule url = {url}");
semaphore.Wait();
Log("Semaphore acquired");
ParameterizedThreadStart taskRoutine = obj => {
try {
DoTheThing((string)obj);
} finally {
Log("Releasing semaphore");
semaphore.Release();
}
};
var task = new TaskInfo(taskRoutine, url);
lock (TaskListLock)
TaskList.Add(task);
}
Log("All tasks queued");
}
// simulate all processes get hung
static void DoTheThing(string url) {
while (true)
Thread.Sleep(5000);
}
static void Log(string msg) {
Console.WriteLine("{0:HH:mm:ss.fff} Thread {1,2} {2}", DateTime.Now, Thread.CurrentThread.ManagedThreadId.ToString(), msg);
}
}