如何处理使用SemaphoreSlim时挂起的线程

本文关键字:SemaphoreSlim 挂起 线程 何处理 处理 | 更新日期: 2023-09-27 18:09:04

我有一些通过第三方库运行数千个url的代码。偶尔,库中的方法会挂起,占用一个线程。过了一段时间,所有的线程都被什么都不做的进程占用了,然后就停止了。

我使用SemaphoreSlim来控制添加新线程,所以我可以有一个最佳数量的任务运行。我需要一种方法来识别运行时间过长的任务,然后杀死它们,同时从SemaphoreSlim释放一个线程,以便创建一个新任务。

我正在努力与这里的方法,所以我做了一些测试代码,模仿我正在做什么。它创建的任务有10%的挂起机会,所以很快所有线程都挂起了。

我应该如何检查这些并删除它们?

代码如下:

class Program
{
    public static SemaphoreSlim semaphore;
    public static List<Task> taskList;
    static void Main(string[] args)
    {
        List<string> urlList = new List<string>();
        Console.WriteLine("Generating list");
        for (int i = 0; i < 1000; i++)
        {
            //adding random strings to simulate a large list of URLs to process
            urlList.Add(Path.GetRandomFileName());
        }
        Console.WriteLine("Queueing tasks");
        semaphore = new SemaphoreSlim(10, 10);
        Task.Run(() => QueueTasks(urlList));
        Console.ReadLine();
    }
    static void QueueTasks(List<string> urlList)
    {
        taskList = new List<Task>();
        foreach (var url in urlList)
        {
            Console.WriteLine("{0} tasks can enter the semaphore.",
                  semaphore.CurrentCount);
            semaphore.Wait();
            taskList.Add(DoTheThing(url));
        }
    }
    static async Task DoTheThing(string url)
    {
        Random rand = new Random();
        // simulate the IO process
        await Task.Delay(rand.Next(2000, 10000));
        // add a 10% chance that the thread will hang simulating what happens occasionally with http request
        int chance = rand.Next(1, 100);
        if (chance <= 10)
        {
            while (true)
            {
                await Task.Delay(1000000);
            }
        }
        semaphore.Release();
        Console.WriteLine(url);
    }
}

如何处理使用SemaphoreSlim时挂起的线程

正如人们已经指出的那样,终止线程通常是不好的,并且在c#中没有保证这样做的方法。使用一个单独的进程来完成工作,然后杀死它,这比尝试Thread.Abort要好一些。但仍然不是最好的办法。理想情况下,您需要协作的线程/进程,它们使用IPC来决定何时自救。这样可以正确地完成清理。

说了这么多,你可以使用下面的代码来做你想做的事情。我写这篇文章是假设你的任务将在一个线程中完成。只要稍加修改,您就可以在进程

中使用相同的逻辑来执行任务。

代码绝不是无懈可击的,只是为了说明问题。并发代码并没有得到很好的测试。锁的持有时间超过了需要的时间,并且有些地方我没有锁定(如Log函数)

class TaskInfo {
    public Thread Task;
    public DateTime StartTime;
    public TaskInfo(ParameterizedThreadStart startInfo, object startArg) {
        Task = new Thread(startInfo);
        Task.Start(startArg);
        StartTime = DateTime.Now;
    }
}
class Program {
    const int MAX_THREADS = 1;
    const int TASK_TIMEOUT = 6; // in seconds
    const int CLEANUP_INTERVAL = TASK_TIMEOUT; // in seconds
    public static SemaphoreSlim semaphore;
    public static List<TaskInfo> TaskList;
    public static object TaskListLock = new object();
    public static Timer CleanupTimer;
    static void Main(string[] args) {
        List<string> urlList = new List<string>();
        Log("Generating list");
        for (int i = 0; i < 2; i++) {
            //adding random strings to simulate a large list of URLs to process
            urlList.Add(Path.GetRandomFileName());
        }
        Log("Queueing tasks");
        semaphore = new SemaphoreSlim(MAX_THREADS, MAX_THREADS);
        Task.Run(() => QueueTasks(urlList));
        CleanupTimer = new Timer(CleanupTasks, null, CLEANUP_INTERVAL * 1000, CLEANUP_INTERVAL * 1000);

        Console.ReadLine();
    }
    // TODO: Guard against re-entrancy
    static void CleanupTasks(object state) {
        Log("CleanupTasks started");
        lock (TaskListLock) {
            var now = DateTime.Now;
            int n = TaskList.Count;
            for (int i = n - 1; i >= 0; --i) {
                var task = TaskList[i];
                Log($"Checking task with ID {task.Task.ManagedThreadId}");
                // kill processes running for longer than anticipated
                if (task.Task.IsAlive && now.Subtract(task.StartTime).TotalSeconds >= TASK_TIMEOUT) {
                    Log("Cleaning up hung task");
                    task.Task.Abort();
                }
                // remove task if it is not alive
                if (!task.Task.IsAlive) {
                    Log("Removing dead task from list");
                    TaskList.RemoveAt(i);
                    continue;
                }
            }
            if (TaskList.Count == 0) {
                Log("Disposing cleanup thread");
                CleanupTimer.Dispose();
            }
        }
        Log("CleanupTasks done");
    }
    static void QueueTasks(List<string> urlList) {
        TaskList = new List<TaskInfo>();
        foreach (var url in urlList) {
            Log($"Trying to schedule url = {url}");
            semaphore.Wait();
            Log("Semaphore acquired");
            ParameterizedThreadStart taskRoutine = obj => {
                try {
                    DoTheThing((string)obj);
                } finally {
                    Log("Releasing semaphore");
                    semaphore.Release();
                }
            };
            var task = new TaskInfo(taskRoutine, url);
            lock (TaskListLock)
                TaskList.Add(task);
        }
        Log("All tasks queued");
    }
    // simulate all processes get hung
    static void DoTheThing(string url) {
        while (true)
            Thread.Sleep(5000);
    }
    static void Log(string msg) {
        Console.WriteLine("{0:HH:mm:ss.fff} Thread {1,2} {2}", DateTime.Now, Thread.CurrentThread.ManagedThreadId.ToString(), msg);
    }
}