多线程文件处理

本文关键字:处理 文件 多线程 | 更新日期: 2023-09-27 18:22:18

我想优化这个代码:

    public static void ProcessTo(this StreamReader sr, StreamWriter sw, Action<StreamWriter, string> action, FileProcessOptions fpo = null)
    {
        if (fpo == null)
        {
            fpo = new FileProcessOptions();
        }
        List<string> buffer = new List<string>(fpo.BuferSize);
        while (!sr.EndOfStream)
        {
            buffer.Clear();
            while (!sr.EndOfStream && buffer.Count < fpo.BuferSize)
            {
                buffer.Add(sr.ReadLine());
            }
            if (fpo.UseThreads)
            {
                buffer.AsParallel().ForAll(line => action(sw, line));
            }
            else
            {
                buffer.ForEach(line => action(sw, line));
            }
        }
    }

我处理大量的数据,并希望将这个过程并行化。通常数据是存档的,因此使用多个线程来处理数据流是非常重要的。

多线程文件处理

如果不传递StreamReader,而只传递文件名,则可以编写:

Parallel.Foreach(File.ReadLines(filename), (line) => action(sw, line));

如果你通过了StreamReader,你仍然可以这样做。你只需要创建一个枚举器来读取它。类似于这里所做的:GetEnumerator()在实现IEnumerable<T>并且IEnumerator<T>。使用它,你会写:

LineReaderEnumerable myEnumerable = new LineEnumerator(sr);
Parallel.Foreach(myEnumerable, (line) => action(sw, line));

然而,您可能会遇到一个潜在的问题,因为您可能会有多个线程向该流编写器写入。并且StreamWriter不支持并发写入。它将抛出一个异常。如果您正在同步对输出文件的访问(例如使用锁),那么您在这里就可以了。

你会遇到的另一个问题是事物的输出顺序。几乎可以肯定的是,如果按[1, 2, 3, 4, ... n]的顺序读取行,则输出顺序会有所不同。你可能会得到[1, 2, 4, 3, 6, 5, 7, 9, 8 ... n, n-1]。如果输出顺序很重要,那么您必须想出一种方法来确保按正确的顺序输出。

关于锁,您有:

sr.ProcessParalel(line => 
{ 
    string[] ls = line.Split(''t');
    lock (sw)
    {
        sw.Write(float.Parse(ls[0]));
        sw.Write(int.Parse(ls[1]) * 10 + 1);
        for (int i = 2; i < ls.Length; i++)
        {
            sw.Write(int.Parse(ls[1]));
        }
    }
 });

问题不在于锁。问题是你锁的时间太长了。按照您的编写方式,代码实际上是单线程的,因为所有线程都在等待该锁来进行处理。您需要更改处理,以便锁定的时间尽可能短。

将输出构建为StringBuilder,将其转换为字符串,然后输出该字符串。例如:

string[] ls = line.Split(''t');
StringBuilder sb = new StringBuilder();
sb.Append(float.Parse(ls[0]));
sb.Append(' ');
sb.Append(int.Parse(ls[1])) * 10 + 1);
for (int i = 2; i < ls.Length; i++)
{
    sb.Append(' ');
    sb.Append(int.Parse(ls[i]));    }
}
var sout = sb.ToString();
// lock and write
lock (sw)
{
    sw.Write(sout);
}

使用StringWriter也可以做同样的事情。

最终解决方案:

        public static IEnumerable<string> GetEnumirator(this StreamReader sr)
    {
        while (!sr.EndOfStream)
        {
            yield return sr.ReadLine();
        }
    }
    public static void ProcessParalel(this StreamReader sr, Action<string> action)
    {
        sr.GetEnumirator().AsParallel().ForAll(action);
    }
    public static void ProcessTo(this StreamReader sr, BinaryWriter bw, Action<BinaryWriter, string> action, FileProcessOptions fpo = null)
    {
        sr.ProcessParalel(line =>
        {
            using (MemoryStream ms = new MemoryStream())
            {
                BinaryWriter lbw = new BinaryWriter(ms);
                action(lbw, line);
                ms.Seek(0, SeekOrigin.Begin);
                lock (bw)
                {
                    ms.WriteTo(bw.BaseStream);
                }
            }
        });
    }

在压缩输入流的情况下,我得到了3倍的加速度