将多线程访问的ConcurrentBag转储到文件的速度不够快

本文关键字:速度 文件 不够 转储 多线程 访问 ConcurrentBag | 更新日期: 2023-09-27 18:27:50

我构建了这段代码来并行处理大量字符串之间的字符串比较,以加快速度。

我使用了ConcurrentBag,因此所有线程(任务)都可以写入线程安全集合。然后我将这个集合转储到一个文件中。

我遇到的问题是,我转储到文件中的ConcurrentBag<string> log的填充速度快于它写入文件的速度。所以我的程序不断地消耗越来越多的ram,直到内存耗尽。

我的问题是我能做什么?改进对日志的写入?暂停任务直到ConcurrentBag被转储,然后继续执行任务?最快的选择是什么?

这是代码:

CsvWriter csv = new CsvWriter(@"C:'test.csv");
List<Bailleur> bailleurs = DataLoader.LoadBailleurs();
ConcurrentBag<string> log = new ConcurrentBag<string>();
int i = 0;
var taskWriteToLog = new Task(() =>
{
    // Consume the items in the bag
    string item;
    while (true)  //  (!log.IsEmpty)
    {
        if (!log.IsEmpty)
        {
            if (log.TryTake(out item))
            {
                csv.WriteLine(item);
            }
            else
                Console.WriteLine("Concurrent Bag busy");
        }
        else
        {
            System.Threading.Thread.Sleep(1000);
        }
    }
});
taskWriteToLog.Start();
Parallel.ForEach(bailleurs, s1 =>
{
    foreach (Bailleur s2 in bailleurs)
    {
        var lcs2 = LongestCommonSubsequenceExtensions.LongestCommonSubsequence(s1.Name, s2.Name);
        string line = String.Format("'"LCS'",'"{0}'",'"{1}'",'"{2}'"", s1.Name, s2.Name, lcs2.Item2);
        log.Add(line);
        // Console.WriteLine(line);
        var dic = DiceCoefficientExtensions.DiceCoefficient(s1.Name, s2.Name);
        line = String.Format("'"DICE'",'"{0}'",'"{1}'",'"{2}'"", s1.Name, s2.Name, dic);
        log.Add(line);
        // Console.WriteLine(line);
    }
    i++;
    Console.WriteLine(i);
});
public class CsvWriter
{
    public string FilePath { get; set; }
    private FileStream _fs { get; set; }
    private StreamWriter _sw { get; set; }
    public CsvWriter2(string filePath)
    {
        FilePath = filePath;
        _fs = new FileStream(FilePath, FileMode.Create, FileAccess.Write);
        _sw = new StreamWriter(_fs);
    }
    public void WriteLine(string line)
    {
        _sw.WriteLine(line);
    }
}

将多线程访问的ConcurrentBag转储到文件的速度不够快

不要直接使用并发包,使用将并发包作为后备存储的BlockingCollection(默认情况下,它是一个并发队列)。

其中一个构造函数重载允许您设置集合大小的上限,如果袋子装满了,它将阻塞插入线程,直到有空间插入为止。

它还为您提供了GetConsumingEnumerable(),使您可以很容易地从包中取出物品,您只需在foreach循环中使用它,它将不断提供您的消费者数据,直到调用CompleteAdding。之后,它运行直到袋子是空的,然后像任何其他已经完成的正常IEnumerable一样退出。如果在调用CompleteAdding之前袋子"变干",它将阻塞线程,并在袋子中放入更多数据时自动重新启动。

void ProcessLog()
{
    CsvWriter csv = new CsvWriter(@"C:'test.csv");
    List<Bailleur> bailleurs = DataLoader.LoadBailleurs();
    const int MAX_BAG_SIZE = 500;
    BlockingCollection<string> log = new BlockingCollection<string>(new ConcurrentBag<string>(), MAX_BAG_SIZE);
    int i = 0;
    var taskWriteToLog = new Task(() =>
    {
        // Consume the items in the bag, no need for sleeps or poleing, When items are available it runs, when the bag is empty but CompletedAdding has not been called it blocks.
        foreach(string item in log.GetConsumingEnumerable())
        {
            csv.WriteLine(item);
        }
    });
    taskWriteToLog.Start();
    Parallel.ForEach(bailleurs, s1 =>
    {
        //Snip... You can switch to BlockingCollection without any changes to this section of code.
    });
    log.CompleteAdding(); //lets anyone using GetConsumingEnumerable know that no new items are comming so they can leave the foreach loops when the bag becomes empty.
}

使用BlockingCollection而不是ConcurrentBag

BlockingCollection<string> log = new BlockingCollection<string>();
var item = log.Take();

在这种情况下,Take将被阻止,直到插入项目,并且您不必检查log.IsEmpty。也将不需要线程。睡眠

while (true)
{
    var item = log.Take();
    //Do something with item......
}

首先,看起来您正在使用行作为块写入文件?

如果你能把所有的数据都输入到对象中,并把它写成更大的块,速度会更快。目前,您可能正在达到写入设备的IOPS最大值。您的线路将很小。所以你的写模式看起来像4k随机IO…或者更糟。

使用不同的集合不会改变这样一个事实,即磁盘写入是您正在做的最慢的事情。

看看concurrentbag,这可能不是直接可能的,但如果你能从包中删除行,并将它们连接到一个接近1-5MB的大字符串/字节数组中,你应该会提高性能。(您可能需要将CR LF插入字符串中。)