长时间运行的线程工作被PerformWaitCallback()杀死

本文关键字:PerformWaitCallback 杀死 工作 运行 线程 长时间 | 更新日期: 2023-09-27 18:15:14

我得到了"托管"另一个程序集并让它们处理任务的服务。下面是代码节选:

public void Start()
{
    Log.Instance.Info("Trying runtime to start");
    // Loading all modules
    _workerLoader = new WorkerLoader();
    Log.Instance.Info("Trying to load workers");
    _workerLoader.CreateInstances();
    _tokenSource = new CancellationTokenSource();
    foreach (var worker in _workerLoader.Modules)
    {
        Log.Instance.Info("Adding {0} to global scope", worker.Id);
        var currentWorker = worker;
        _tasks.Add(Task.Factory.StartNew(() => BaseWork(currentWorker, _tokenSource.Token), _tokenSource.Token));
        Thread.Sleep(3000);
    }
    Log.Instance.Info("Runtime started");
}
private void BaseWork(IWorker worker, CancellationToken token)
{
    using (worker)
    {
        worker.WorkerStopped += (sender, args) =>
        {
            var stackTrace = new StackTrace();
            var stackFrames = stackTrace.GetFrames();
            var strFrames = "";
            if (stackFrames != null)
            {
                strFrames =
                    String.Join(Environment.NewLine, stackFrames.Select(
                        x =>
                            String.Format("{0} {1} ({2}:{3})", x.GetMethod(), x.GetFileName(), x.GetFileLineNumber(), x.GetFileColumnNumber())));
            }
            Log.Instance.Info("[{0}] Worker stopped. ({1})", worker.Id, strFrames);
        };
        worker.TaskStarted += (sender, info) => Log.Instance.Info("[{0}] Started: {1}", ((IWorker)sender).Id, info.id);
        worker.TaskFinished += (sender, info) => Log.Instance.Info("[{0}] Finished: {1}", ((IWorker)sender).Id, info.id);
        worker.ErrorOccurred += (sender, exception) => Log.Instance.Error("[{0}] Error: {1}", ((IWorker)sender).Id, exception);
        while (true)
        {
            if (token.IsCancellationRequested)
                token.ThrowIfCancellationRequested();
            worker.ProcessOnce();
        }
    }
}

worker.ProcessOnce()中,worker完成所有必要的操作,如连接远程网站,从DB获取数据,向DB写入数据等。此刻只有一个工人。

在一些不那么棘手的解释之后,进入问题。

问题是正常工作一段时间后,worker被停止,发出关于日志文件的条目。它是随机发生的。我捕获了堆栈跟踪,注入了一些你可以在"worker stopped"事件处理程序中看到的代码,现在开始:

Worker stopped. (Void <BaseWork>b__3(System.Object, System.EventArgs)  (0:0)
Void OnWorkerStopped()  (0:0)
Void Dispose()  (0:0)
Void System.IDisposable.Dispose()  (0:0)
Void BaseWork(YellowPages.Contracts.IWorker, System.Threading.CancellationToken)  (0:0)
Void <Start>b__0()  (0:0)
Void InnerInvoke()  (0:0)
Void Execute()  (0:0)
Void ExecutionContextCallback(System.Object)  (0:0)
Void RunInternal(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean)  (0:0)
Void Run(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean)  (0:0)
Void ExecuteWithThreadLocal(System.Threading.Tasks.Task ByRef)  (0:0)
Boolean ExecuteEntry(Boolean)  (0:0)
Void System.Threading.IThreadPoolWorkItem.ExecuteWorkItem()  (0:0)
Boolean Dispatch()  (0:0)
Boolean PerformWaitCallback()  (0:0))

我做错了什么吗?什么是PerformWaitCallback? ?看起来我正在准备处理池任务。

任何想法?

提前感谢!

长时间运行的线程工作被PerformWaitCallback()杀死

BaseWork下面的整个堆栈跟踪部分只是Task委托排队到ThreadPool。如果您只写Log.Instance.Info(new StackTrace().ToString()),您将得到一个更详细的清单,如:

// this is the anonymous handler delegate where you print the stack trace
<BaseWork>b__3(System.Object, System.EventArgs)
// you worker fires the WorkerStopped inside the Dispose method
SomeClass.OnWorkerStopped()
// this is where your worker was diposed
YellowPages.Contracts.Worker.Dispose()
// this is when method was invoked
SomeClass.BaseWork(YellowPages.Contracts.IWorker, System.Threading.CancellationToken) 
// everything below this line is just your method being queued to the thread pool
// and is irrelevant
mscorlib.dll!System.Threading.Tasks.ContinuationTaskFromTask.InnerInvoke()
mscorlib.dll!System.Threading.Tasks.Task.Execute()
mscorlib.dll!System.Threading.Tasks.Task.ExecutionContextCallback(object obj = {unknown})
mscorlib.dll!System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext executionContext = {unknown}, System.Threading.ContextCallback callback = {unknown}, object state = {unknown}, bool preserveSyncCtx = {unknown})
mscorlib.dll!System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext executionContext = {unknown}, System.Threading.ContextCallback callback = {unknown}, object state = {unknown}, bool preserveSyncCtx = {unknown})
mscorlib.dll!System.Threading.Tasks.Task.ExecuteWithThreadLocal(ref System.Threading.Tasks.Task currentTaskSlot = {unknown})
mscorlib.dll!System.Threading.Tasks.Task.ExecuteEntry(bool bPreventDoubleExecution = {unknown})
mscorlib.dll!System.Threading.Tasks.Task.System.Threading.IThreadPoolWorkItem.ExecuteWorkItem()
mscorlib.dll!System.Threading.ThreadPoolWorkQueue.Dispatch()
mscorlib.dll!System.Threading._ThreadPoolWaitCallback.PerformWaitCallback()

您的实际问题(很可能)是worker.ProcessOnce()抛出异常:

  1. worker.ProcessOnce()抛出一个异常,它没有捕获它。
  2. BaseWork中的using块确保在其隐式finally子句中调用worker.Dispose()
  3. worker.Dispose()触发WorkerStopped事件,记录堆栈跟踪。
  4. 由于没有catch子句来捕获异常,它被Task默默地吞下(除非你使用的是4.5以前的。net或者你启用了ThrowUnobservedTaskExceptions选项)。

如果你在某个地方调用Task.Wait(),它会阻塞线程,然后在worker抛出AggregateException后抛出它。因为你可能没有在任何地方等待,你可以创建一个在出现异常时触发的延续,例如:

_tasks.Add(Task.Factory
    // create the task
    .StartNew(() => BaseWork(currentWorker, _tokenSource.Token), _tokenSource.Token)
    // create the exception-only continuation
    .ContinueWith(t => 
          Log.Instance.Error(t.Exception.Flatten().ToString()),
          TaskContinuationOptions.OnlyOnFaulted)
);

或者,您可以简单地在对worker.ProcessOnce()的调用周围添加一个try/catch块并记录那里的异常(但是上面所示的方法将捕获在BaseWork中抛出的任何异常,因此使用它更安全)。

添加这个将允许您记录实际的异常。如果它是由于代码中的错误而发生的,请修复它。否则(如果这是一个可以预期的异常,如套接字异常),为该异常添加一个catch处理程序并吞下它。

另外,还不清楚ErrorOccurred事件应该发出什么信号(可能不是异常)?

简单地用try catch块包围worker.ProcessOnce();,并确保它不会抛出任何异常。如果你发现不是这样,我真的怀疑你的日志系统正在崩溃Log.Instance.Info。仔细检查日志类代码,并考虑使用锁,以避免多个线程同时访问同一个变量。

lock(myobject)
{
   .....
}
相关文章:
  • 没有找到相关文章