任务中的嵌套锁.继续使用-安全,否则玩火

本文关键字:安全 玩火 嵌套 继续 任务 | 更新日期: 2023-09-27 18:15:29

Windows service:在配置文件中从目录列表中生成一组FileWatcher对象,有以下要求:

  1. 文件处理可能很耗时——事件必须在它们自己的任务线程上处理
  2. 保持事件处理程序任务的句柄,以等待OnStop()事件完成。
  3. 跟踪上传文件的哈希值;不重复处理
  4. 持久化文件哈希值以允许OnStart()在服务关闭时处理上传的文件。
  5. 永远不要处理一个文件超过一次。

(关于#3,当没有变化时,我们确实会得到事件……最明显的原因是FileWatchers的重复事件问题)

为了做这些事情,我有两个字典——一个用于上传的文件,另一个用于任务本身。这两个对象都是静态的,我需要在添加/删除/更新文件和任务时锁定它们。简化代码:
public sealed class TrackingFileSystemWatcher : FileSystemWatcher {
    private static readonly object fileWatcherDictionaryLock = new object();
    private static readonly object runningTaskDictionaryLock = new object();
    private readonly Dictionary<int, Task> runningTaskDictionary = new Dictionary<int, Task>(15);
    private readonly Dictionary<string, FileSystemWatcherProperties>  fileWatcherDictionary = new Dictionary<string, FileSystemWatcherProperties>();
    //  Wired up elsewhere
    private void OnChanged(object sender, FileSystemEventArgs eventArgs) {
        this.ProcessModifiedDatafeed(eventArgs);
    }
    private void ProcessModifiedDatafeed(FileSystemEventArgs eventArgs) {
        lock (TrackingFileSystemWatcher.fileWatcherDictionaryLock) {
            //  Read the file and generate hash here
            //  Properties if the file has been processed before
            //  ContainsNonNullKey is an extension method
            if (this.fileWatcherDictionary.ContainsNonNullKey(eventArgs.FullPath)) {
                try {
                    fileProperties = this.fileWatcherDictionary[eventArgs.FullPath];
                }
                catch (KeyNotFoundException keyNotFoundException) {}
                catch (ArgumentNullException argumentNullException) {}
            }
            else {  
                // Create a new properties object
            }

            fileProperties.ChangeType = eventArgs.ChangeType;
            fileProperties.FileContentsHash = md5Hash;
            fileProperties.LastEventTimestamp = DateTime.Now;
            Task task;
            try {
                task = new Task(() => new DatafeedUploadHandler().UploadDatafeed(this.legalOrg, datafeedFileData), TaskCreationOptions.LongRunning);
            }
            catch {
              ..
            }
            //  Only lock long enough to add the task to the dictionary
            lock (TrackingFileSystemWatcher.runningTaskDictionaryLock) {
                 try {
                    this.runningTaskDictionary.Add(task.Id, task);  
                }
                catch {
                  ..
                }    
            }

            try {
                task.ContinueWith(t => {
                    try {
                        lock (TrackingFileSystemWatcher.runningTaskDictionaryLock) {
                            this.runningTaskDictionary.Remove(t.Id);
                        }
                        //  Will this lock burn me?
                        lock (TrackingFileSystemWatcher.fileWatcherDictionaryLock) {
                            //  Persist the file watcher properties to
                            //  disk for recovery at OnStart()
                        }
                    }
                    catch {
                      ..
                    }
                });
                task.Start();
            }
            catch {
              ..
            }

        }
    }
}

请求ContinueWith()委托中的FileSystemWatcher集合上的锁,当委托定义在同一对象上的锁内时,效果是什么?我希望它是好的,即使任务在ProcessModifiedDatafeed()释放锁之前启动、完成并进入ContinueWith(),任务线程也会被挂起,直到创建线程释放锁。但是我想确保我没有踩到任何延迟执行的地雷。

查看代码,我可能能够更快地释放锁,避免问题,但我还不确定…需要查看完整的代码才能确定。


更新

为了阻止越来越多的"这段代码很糟糕"的评论,我有很好的理由捕获异常,并且捕获了如此多的异常。这是一个带有多线程处理程序的Windows服务,它可能不会崩溃。永远。如果这些线程中的任何一个有未处理的异常,它将执行此操作。

同样,这些异常被写入未来的防弹。我在下面的注释中给出的例子是为处理程序添加一个工厂…正如今天编写的代码一样,永远不会有空任务,但如果工厂没有正确实现,代码可能会抛出异常。是的,这应该在测试中被发现。然而,我的团队中有一些初级开发人员……"可能。不是。崩溃。"(此外,如果是一个未处理的异常,它必须优雅地关闭,允许当前正在运行的线程完成——我们通过在main()中设置一个未处理的异常处理程序来完成)。我们配置了企业级监视器,当应用程序错误出现在事件日志中时发送警报——这些异常将记录并标记我们。这个方法是经过深思熟虑和讨论的决定。

每个可能的异常都经过仔细考虑,并被选为两类之一-那些适用于单个数据feed并且不会关闭服务(大多数)的异常,以及那些表明明显的编程或其他错误的异常,这些错误从根本上使代码对所有数据feed无效。例如,如果我们不能写入事件日志,我们选择关闭服务,因为这是我们指示数据提要未被处理的主要机制。异常是在本地捕获的,因为本地上下文是唯一可以做出继续决策的地方。此外,允许异常上升到更高的级别(1)违反了抽象的概念,(2)在工作线程中没有意义。

我对反对处理异常的人数感到惊讶。如果每次try. catch(Exception){什么都不做}都给我一毛钱,我明白了,你的零钱就会永远换成五分钱。我认为,如果对。net框架的调用或您自己的代码抛出异常,您需要考虑导致该异常发生的场景,并显式地决定应该如何处理它。我的代码在IO操作中捕获了UnauthorizedExceptions,因为当我考虑如何发生这种情况时,我意识到添加新的数据提要目录需要向服务帐户授予权限(默认情况下它不会拥有这些权限)。

我很感激你的建设性意见……只是请不要笼统地批评简化的示例代码"这太糟糕了"。代码并不糟糕——它是防弹的,而且必须如此。


1我只会在Jon Skeet不同意的情况下争论很长时间

任务中的嵌套锁.继续使用-安全,否则玩火

首先,您的问题:在ContinueWith中请求锁本身不是问题。如果你在另一个锁块里做,那就别做了。您的continuation将在不同的时间、不同的线程中异步执行。

现在,代码本身是有问题的。为什么在几乎不能抛出异常的语句周围使用许多try-catch块?例如:

 try {
     task = new Task(() => new DatafeedUploadHandler().UploadDatafeed(this.legalOrg, datafeedFileData), TaskCreationOptions.LongRunning);
 }
 catch {}

你只是创建任务-我无法想象当这可以抛出。ContinueWith也是如此。:

this.runningTaskDictionary.Add(task.Id, task); 

你可以检查这个键是否已经存在。但即使这样也没有必要,因为任务。Id是您刚刚创建的给定任务实例的唯一Id。:

try {
    fileProperties = this.fileWatcherDictionary[eventArgs.FullPath];
}
catch (KeyNotFoundException keyNotFoundException) {}
catch (ArgumentNullException argumentNullException) {}

更糟。你不应该使用这样的异常-不要捕获KeyNotFoundException,而是在Dictionary上使用适当的方法(如TryGetValue)。

因此,首先,删除所有的try catch块,或者在整个方法中使用一个,或者在可能抛出异常并且无法处理这种情况的语句中使用它们(并且您知道如何处理抛出的异常)。

那么,您处理文件系统事件的方法不是很可伸缩和可靠。许多程序在保存对文件的更改时,会在很短的时间间隔内生成多个更改事件(也有针对同一文件的多个事件按顺序发生的其他情况)。如果您只是在每个事件上开始处理文件,这可能会导致不同类型的麻烦。因此,您可能需要限制给定文件的事件,并在最后一次检测到更改后的一定延迟后才开始处理。不过,这可能有点高级。

不要忘记尽快在文件上获取读锁,这样其他进程就不能在你处理文件的时候修改文件(例如,你可能计算了一个文件的md5,然后有人修改了文件,然后你开始上传——现在你的md5无效了)。另一种方法是记录最后一次写入时间,当涉及到上传时-抓取读锁并检查文件之间是否没有更改。

更重要的是一次可以有很多变化。假设我非常快地复制了1000个文件——你不想用1000个线程一次上传它们。您需要处理一个文件队列,并使用几个线程从该队列中获取项。这样,数千个事件可能同时发生,而您的上传仍然可以可靠地工作。现在,您为每个更改事件创建新线程,并立即开始上传(根据方法名称)-这将在严重的事件负载下失败(以及上面描述的情况)。

不,它不会烧伤你。即使ContinueWith被内联到正在运行new Task(() => new DatafeedUploadHandler()..的当前线程中,它也会获得锁,例如没有死锁。lock语句在内部使用Monitor类,它是reentrant。例如,一个线程可以多次获得一个锁,如果它已经获得/拥有该锁。多线程和锁(线程安全操作)

另一种task.ContinueWithProcessModifiedDatafeed完成之前开始的情况就像你说的那样。运行ContinueWith的线程只能等待获得锁。

我真的会考虑在锁之外做task.ContinueWithtask.Start(),如果你审查它。根据你发布的代码,这是可能的。

您还应该查看System.Collections.Concurrent命名空间中的ConcurrentDictionary。这将使代码更简单,并且您不必自己管理锁定。你正在做某种比较交换/更新这里if (this.fileWatcherDictionary.ContainsNonNullKey(eventArgs.FullPath))。只添加字典中没有的词。这是一个原子操作。没有函数可以用ConcurrentDictionary做到这一点,但是有一个AddOrUpdate方法。也许你可以用这个方法重写它。根据你的代码,你可以安全地使用ConcurrentDictionary至少对于runningTaskDictionary

哦,TaskCreationOptions.LongRunning实际上是为每个任务创建一个新线程,这是一种昂贵的操作。windows内部线程池在新的windows版本中是智能的,并且是动态适应的。它会"看到"你正在做很多IO的事情,并会根据需要和实际产生新的线程。

问候

我没有完全遵循这段代码的逻辑,但是你知道任务继续和调用等待/结果可以内联到当前线程上吗?这可能导致重入。

这是非常危险的,已经烧伤了很多人。

我也不太明白为什么你开始task延迟。这是一种代码气味。另外,为什么要用try包装任务创建?这绝对不能扔。

这显然是部分答案。但是代码在我看来很纠结。如果审计如此困难,那么您可能应该首先以不同的方式编写它。