停止线程,直到有足够的内存可用

本文关键字:内存 线程 | 更新日期: 2023-09-27 18:12:46

环境:.net 4.0

我有一个用XSLT样式表转换XML文件的任务,下面是我的代码

public string TransformFileIntoTempFile(string xsltPath, 
    string xmlPath)
{
    var transform = new MvpXslTransform();
    transform.Load(xsltPath, new XsltSettings(true, false), 
        new XmlUrlResolver());
    string tempPath = Path.GetTempFileName();
    using (var writer = new StreamWriter(tempPath))
    {
        using (XmlReader reader = XmlReader.Create(xmlPath))
        {
            transform.Transform(new XmlInput(reader), null, 
                new XmlOutput(writer));
        }       
    }
    return tempPath;
}

我有X个线程可以并行启动这个任务。有时我的输入文件大约有300mb,有时只有几MB。

我的问题:当我的程序试图同时转换一些大的XML文件时,我得到OutOfMemoryException。

如何避免这些outofmemoryexception ?我的想法是在执行任务之前停止一个线程,直到有足够的可用内存,但我不知道如何做到这一点。或者还有其他的解决方案(比如把我的任务放在一个不同的应用程序中)。

谢谢

停止线程,直到有足够的内存可用

我不建议阻塞线程。在最坏的情况下,您最终会饿死可能释放所需内存的任务,从而导致死锁或非常糟糕的性能。

相反,我建议你保留一个有优先级的工作队列。从队列中获取跨线程池公平调度的任务。确保没有线程阻塞等待操作,而是将任务重新发布到队列(具有较低优先级)

所以你要做的(例如,在接收到OutOfMemory异常时),是将相同的作业/任务发布到队列并终止当前任务,为另一个任务释放线程。

一种简单的方法是使用后进先出,它确保发布到队列中的任务的优先级将低于该队列中已有的任何其他任务。

从。net Framework 4开始,我们就有了使用内存映射文件特性的API,这个特性在Win32API中已经存在很多年了,所以现在你可以在。net托管代码中使用它了。

对于您的任务更适合"持久化内存映射文件"选项,MSDN:

持久化文件是与对象关联的内存映射文件磁盘上的源文件。当最后一个进程完成工作时文件,数据保存到磁盘上的源文件。这些内存映射文件适合处理非常大的文件源文件。

在MemoryMappedFile.CreateFromFile()方法描述页面上,您可以找到一个很好的示例,描述如何为超大文件创建内存映射视图。

EDIT:关于评论中大量注释的更新

刚刚找到方法MemoryMappedFile.CreateViewStream(),它创建了一个从System.IO.Stream继承的MemoryMappedViewStream类型的流。我相信您可以从这个流创建XmlReader的实例,然后使用这个reader/流实例化您的定制实现的XslTransform。

EDIT2: remi bourgarel (OP)已经测试了这种方法,看起来像这个特定的XslTransform实现(我想知道是否有人会)不会以假设的方式与MM-View流一起工作

主要问题是您正在加载整个Xml文件。如果您只是在读取时进行转换,则通常不会出现内存不足的问题。话虽如此,我找到了一篇MS支持文章,建议如何做到这一点:http://support.microsoft.com/kb/300934

免责声明:我没有测试这个,所以如果你使用它,它的工作,请告诉我们。

您可以考虑使用队列来限制基于某种人工内存边界(例如文件大小)进行的并发转换的数量。可以使用如下命令:

这种类型的节流策略可以与正在处理的最大并发文件数相结合,以确保磁盘不会被过多地抖动。

NB我没有在执行时包含必要的try'catch'finally,以确保异常被传播到调用线程和waithandl总是被释放。我可以在这里详细说明。

public static class QueuedXmlTransform
{
    private const int MaxBatchSizeMB = 300;
    private const double MB = (1024 * 1024);
    private static readonly object SyncObj = new object();
    private static readonly TaskQueue Tasks = new TaskQueue();
    private static readonly Action Join = () => { };
    private static double _CurrentBatchSizeMb;
    public static string Transform(string xsltPath, string xmlPath)
    {
        string tempPath = Path.GetTempFileName();
        using (AutoResetEvent transformedEvent = new AutoResetEvent(false))
        {
            Action transformTask = () =>
            {
                MvpXslTransform transform = new MvpXslTransform();
                transform.Load(xsltPath, new XsltSettings(true, false),
                    new XmlUrlResolver());
                using (StreamWriter writer = new StreamWriter(tempPath))
                using (XmlReader reader = XmlReader.Create(xmlPath))
                {
                    transform.Transform(new XmlInput(reader), null,
                        new XmlOutput(writer));
                }
                transformedEvent.Set();
            };
            double fileSizeMb = new FileInfo(xmlPath).Length / MB;
            lock (SyncObj)
            {
                if ((_CurrentBatchSizeMb += fileSizeMb) > MaxBatchSizeMB)
                {
                    _CurrentBatchSizeMb = fileSizeMb;
                    Tasks.Queue(isParallel: false, task: Join);
                }
                Tasks.Queue(isParallel: true, task: transformTask);
            }
            transformedEvent.WaitOne();
        }
        return tempPath;
    }
    private class TaskQueue
    {
        private readonly object _syncObj = new object();
        private readonly Queue<QTask> _tasks = new Queue<QTask>();
        private int _runningTaskCount;
        public void Queue(bool isParallel, Action task)
        {
            lock (_syncObj)
            {
                _tasks.Enqueue(new QTask { IsParallel = isParallel, Task = task });
            }
            ProcessTaskQueue();
        }
        private void ProcessTaskQueue()
        {
            lock (_syncObj)
            {
                if (_runningTaskCount != 0) return;
                while (_tasks.Count > 0 && _tasks.Peek().IsParallel)
                {
                    QTask parallelTask = _tasks.Dequeue();
                    QueueUserWorkItem(parallelTask);
                }
                if (_tasks.Count > 0 && _runningTaskCount == 0)
                {
                    QTask serialTask = _tasks.Dequeue();
                    QueueUserWorkItem(serialTask);
                }
            }
        }
        private void QueueUserWorkItem(QTask qTask)
        {
            Action completionTask = () =>
            {
                qTask.Task();
                OnTaskCompleted();
            };
            _runningTaskCount++;
            ThreadPool.QueueUserWorkItem(_ => completionTask());
        }
        private void OnTaskCompleted()
        {
            lock (_syncObj)
            {
                if (--_runningTaskCount == 0)
                {
                    ProcessTaskQueue();
                }
            }
        }
        private class QTask
        {
            public Action Task { get; set; }
            public bool IsParallel { get; set; }
        }
    }
}

修复了当滚动到下一个批处理窗口时保持批大小的错误:

_CurrentBatchSizeMb = fileSizeMb;