使用任务计划程序的应用会快速耗尽内存

本文关键字:内存 应用 任务 计划 程序 | 更新日期: 2023-09-27 18:34:13

该应用程序解析某个目录中的文件,同时将新文件添加到目录中。我使用ConcurrentQueue并尝试将工作拆分为内核数。因此,如果有文件要处理 - 它应该同时处理多达 4(核心(文件。然而,该应用程序在处理 10-30 个文件后在几秒钟内运行 OOM。我看到内存消耗迅速增长到 ~1.5GB,而不是出现 OOM 错误。我要做任务计划程序,所以我可能做错了什么。文件解析是通过对文件运行一些.exe来完成的,该文件使用 <5mb 或 ram。每次计时器线程经过时,任务计划程序都会运行。但它甚至在计时器第二次过去之前就运行 OOM。

private void OnTimedEvent(object source, ElapsedEventArgs e)
    {
        DirectoryInfo info = new DirectoryInfo(AssemblyDirectory);
        FileInfo[] allSrcFiles = info.GetFiles("*.dat").OrderBy(p => p.CreationTime).ToArray();
        var validSrcFiles = allSrcFiles.Where(p => (DateTime.Now - p.CreationTime) > TimeSpan.FromSeconds(60));
        var newFilesToParse = validSrcFiles.Where(f => !ProcessedFiles.Contains(f.Name));
        if (newFilesToParse.Any()) Console.WriteLine("Adding " + newFilesToParse.Count() + " files to the Queue");
        foreach (var file in newFilesToParse)
        {
            FilesToParseQueue.Enqueue(file);
            ProcessedFiles.Add(file.Name);
        }
        if (!busy)
        {
            if (FilesToParseQueue.Any())
            {
                busy = true;
                Console.WriteLine("");
                Console.WriteLine("There are " + FilesToParseQueue.Count + " files in queue. Processing...");
            }
            var scheduler = new LimitedConcurrencyLevelTaskScheduler(coresCount); //4
            TaskFactory factory = new TaskFactory(scheduler);
            while (FilesToParseQueue.Any())
            {
                factory.StartNew(() =>
                {
                    FileInfo file;
                    if (FilesToParseQueue.TryDequeue(out file))
                    {
                        //Dequeue();
                        ParseFile(file);
                    }
                });
            }
            if (!FilesToParseQueue.Any())
            {
                busy = false;
                Console.WriteLine("Finished processing Files in the Queue. Waiting for new files...");
            }
        }
    }

使用任务计划程序的应用会快速耗尽内存

只要

有文件要处理,您的代码就会继续创建新的Task,并且它的速度要快得多,可以处理文件。但它没有其他限制(如目录中的文件数量(,这就是为什么它会很快耗尽内存的原因。

一个简单的解决方法是将出队列移到循环之外:

while (true)
{
    FileInfo file;
    if (FilesToParseQueue.TryDequeue(out file))
    {
        factory.StartNew(() => ParseFile(file));
    }
    else
    {
        break;
    }
}

如果每个内核仅创建一个Task并使用Task内的循环处理文件,则会获得更好的性能。

这种问题(将多个工作单元排队,并希望并行处理它们(非常适合 TPL 数据流:

private async void OnTimedEvent(object source, ElapsedEventArgs e)
{
  DirectoryInfo info = new DirectoryInfo(AssemblyDirectory);
  FileInfo[] allSrcFiles = info.GetFiles("*.dat").OrderBy(p => p.CreationTime).ToArray();
  var validSrcFiles = allSrcFiles.Where(p => (DateTime.Now - p.CreationTime) > TimeSpan.FromSeconds(60));
  var newFilesToParse = validSrcFiles.Where(f => !ProcessedFiles.Contains(f.Name));
  if (newFilesToParse.Any()) Console.WriteLine("Adding " + newFilesToParse.Count() + " files to the Queue");
  var blockOptions = new ExecutionDataflowBlockOptions
  {
    MaxDegreeOfParallelism = coresCount,
  };
  var block = new ActionBlock<FileInfo>(ParseFile, blockOptions);
  var filesToParseCount = 0;
  foreach (var file in newFilesToParse)
  {
    block.Post(file);
    ProcessedFiles.Add(file.Name);
    ++filesToParseCount;
  }
  Console.WriteLine("There are " + filesToParseCount + " files in queue. Processing...");
  block.Complete();
  await block.Completion;
  Console.WriteLine("Finished processing Files in the Queue. Waiting for new files...");
}

基本解决方案

实际上,您可以通过将其剥离到基本要素来修复代码,如下所示:

// This is technically a misnomer. It should be
// called "FileNamesQueuedForProcessing" or similar.
// Non-thread-safe. Assuming timer callback access only.
private readonly HashSet<string> ProcessedFiles = new HashSet<string>();
private readonly LimitedConcurrencyLevelTaskScheduler LimitedConcurrencyScheduler = new LimitedConcurrencyLevelTaskScheduler(Environment.ProcessorCount);
private void OnTimedEvent(object source, ElapsedEventArgs e)
{
    DirectoryInfo info = new DirectoryInfo(AssemblyDirectory);
    // Slightly rewritten to cut down on allocations.
    FileInfo[] newFilesToParse = info
        .GetFiles("*.dat")
        .Where(f =>
            (DateTime.Now - f.CreationTime) > TimeSpan.FromSeconds(60) && // I'd consider removing this filter.
            !ProcessedFiles.Contains(f.Name))
        .OrderBy(p => p.CreationTime)
        .ToArray();
    if (newFilesToParse.Length != 0) Console.WriteLine("Adding " + newFilesToParse.Count() + " files to the Queue");
    foreach (FileInfo file in newFilesToParse)
    {
        // Fire and forget.
        // You can add the resulting task to a shared thread-safe collection
        // if you want to observe completion/exceptions/cancellations.
        Task.Factory.StartNew(
            () => ParseFile(file)
            , CancellationToken.None
            , TaskCreationOptions.DenyChildAttach
            , LimitedConcurrencyScheduler
        );
        ProcessedFiles.Add(file.Name);
    }
}

请注意,我没有自己做任何类型的负载平衡,而是依靠LimitedConcurrencyLevelTaskScheduler来执行广告 - 也就是说,立即接受Task.Factory.StartNew上的所有工作项,在内部对它们进行排队,并在将来的某个时间点在最多 [N = 最大并行度] 线程池线程上处理它们。

附言我假设OnTimedEvent总是在同一线程上触发。如果没有,则需要进行一些小的更改以确保线程安全:

private void OnTimedEvent(object source, ElapsedEventArgs e)
{
    lock (ProcessedFiles)
    {
        // As above.
    }
}

替代解决方案

现在,这里有一个稍微新颖的方法:我们摆脱计时器并LimitedConcurrencyLevelTaskScheduler并将所有处理封装在一个单一的模块化管道中怎么样?会有很多阻塞代码(除非你打破TPL数据流 - 但我在这里坚持使用基类库类型(,但是阶段之间的消息传递非常简单,它使设计非常吸引人(当然在我看来(。

private async Task PipelineAsync()
{
    const int MAX_FILES_TO_BE_QUEUED = 16;
    using (BlockingCollection<FileInfo> queue = new BlockingCollection<FileInfo>(boundedCapacity: MAX_FILES_TO_BE_QUEUED))
    {
        Task producer = Task.Run(async () =>
        {
            try
            {
                while (true)
                {
                    DirectoryInfo info = new DirectoryInfo(AssemblyDirectory);
                    HashSet<string> namesOfFilesQeueuedForProcessing = new HashSet<string>();
                    FileInfo[] newFilesToParse = info
                        .GetFiles("*.dat")
                        .Where(f =>
                            (DateTime.Now - f.CreationTime) > TimeSpan.FromSeconds(60) &&
                            !ProcessedFiles.Contains(f.Name))
                        .OrderBy(p => p.CreationTime) // Processing order is not guaranteed.
                        .ToArray();
                    foreach (FileInfo file in newFilesToParse)
                    {
                        // This will block if we reach bounded capacity thereby throttling
                        // the producer (meaning we'll never overflow the handover collection).
                        queue.Add(file);
                        namesOfFilesQeueuedForProcessing.Add(file.Name);
                    }
                    await Task.Delay(TimeSpan.FromSeconds(60)).ConfigureAwait(false);
                }
            }
            finally
            {
                // Exception? Cancellation? We'll let the
                // consumer know that it can wind down.
                queue.CompleteAdding();
            }
        });
        Task consumer = Task.Run(() =>
        {
            ParallelOptions options = new ParallelOptions {
                MaxDegreeOfParallelism = Environment.ProcessorCount
            };
            Parallel.ForEach(queue.GetConsumingEnumerable(), options, file => ParseFile(file)); 
        });
        await Task.WhenAll(producer, consumer).ConfigureAwait(false);
    }
}

这种模式的一般形式在Stephen Toub的"并行编程模式"中描述,第55页。我强烈建议您看一看。

这里的权衡是您将因使用BlockingCollection<T>Parallel.ForEach而执行的阻塞量。不过,管道作为一个概念的好处很多:新阶段(Task实例(易于添加,完成和取消易于连接,观察生产者和消费者异常,并且所有可变状态都是令人愉快的本地。