使用线程(任务)执行工作包含 I/O

本文关键字:包含 工作 执行 线程 任务 | 更新日期: 2023-09-27 17:55:38

我需要从一个文件读取数据,处理并将结果写入另一个文件。我使用后台工作者来显示进程状态。我写了这样的东西在后台工作者的DoWork事件中使用

private void ProcData(string fileToRead,string fileToWrite)
        {
            byte[] buffer = new byte[4 * 1024];
            //fileToRead & fileToWrite have same size
            FileInfo fileInfo = new FileInfo(fileToRead);
            using (FileStream streamReader = new FileStream(fileToRead, FileMode.Open))
            using (BinaryReader binaryReader = new BinaryReader(streamReader))
            using (FileStream streamWriter = new FileStream(fileToWrite, FileMode.Open))
            using (BinaryWriter binaryWriter = new BinaryWriter(streamWriter))
            {
                while (streamWriter.Position < fileInfo.Length)
                {
                    if (streamWriter.Position + buffer.Length > fileInfo.Length)
                    {
                        buffer = new byte[fileInfo.Length - streamWriter.Position];
                    }
                    //read
                    buffer = binaryReader.ReadBytes(buffer.Length);
                    //proccess
                    Proc(buffer);
                    //write
                    binaryWriter.Write(buffer);
                    //report if procentage changed
                    //...
                }//while
            }//using
        }

但它比仅从文件读取和写入文件到写入要慢 5 倍,所以我考虑线程。我在网站上阅读了一些问题,并根据此问题尝试类似的东西

private void ProcData2(string fileToRead, string fileToWrite)
        {
            int threadNumber = 4; //for example
            Task[] tasks = new Task[threadNumber];
            long[] startByte = new long[threadNumber];
            long[] length = new long[threadNumber];
            //divide file to threadNumber(4) part
            //and update startByte & length
            var parentTask = Task.Run(() =>
            {
                for (int i = 0; i < threadNumber; i++)
                {
                    tasks[i] = Task.Factory.StartNew(() =>
                    {
                        Proc2(fileToRead, fileToWrite, startByte[i], length[i]);
                    });
                }
            });
            parentTask.Wait();
            Task.WaitAll(tasks);
        }
        //
        private void Proc2(string fileToRead,string fileToWrite,long fileStartByte,long partLength)
        { 
            byte[] buffer = new byte[4 * 1024];
            using (FileStream streamReader = new FileStream(fileToRead, FileMode.Open,FileAccess.Read,FileShare.Read))
            using (BinaryReader binaryReader = new BinaryReader(streamReader))
            using (FileStream streamWriter = new FileStream(fileToWrite, FileMode.Open,FileAccess.Write,FileShare.Write))
            using (BinaryWriter binaryWriter = new BinaryWriter(streamWriter))
            {
                streamReader.Seek(fileStartByte, SeekOrigin.Begin);
                streamWriter.Seek(fileStartByte, SeekOrigin.Begin);
                while (streamWriter.Position < fileStartByte+partLength)
                {
                    if (streamWriter.Position + buffer.Length > fileStartByte+partLength)
                    {
                        buffer = new byte[fileStartByte+partLength - streamWriter.Position];
                    }
                    //read
                    buffer = binaryReader.ReadBytes(buffer.Length);
                    //proccess
                    Proc(buffer);
                    //write
                    binaryWriter.Write(buffer);
                    //report if procentage changed
                    //...
                }//while
            }//using
        }

但我认为它有一些问题,每次切换任务时它都需要再次寻找。我想读取文件,对Proc()使用线程,然后写入结果,但这似乎是错误的。我怎样才能正确地做到这一点?(从文件中读取缓冲区,使用 task 处理并将其写入其他文件)

/

/===

======================================================================

基于皮特柯克姆的帖子,我修改了我的方法。我不知道为什么,但它对我不起作用。我添加了可以帮助他们的新方法。感谢每一个身体

 private void ProcData3(string fileToRead, string fileToWrite)
        {
            int bufferSize = 4 * 1024;
            int threadNumber = 4;//example
            List<byte[]> bufferPool = new List<byte[]>();
            Task[] tasks = new Task[threadNumber];
            //fileToRead & fileToWrite have same size
            FileInfo fileInfo = new FileInfo(fileToRead);
            using (FileStream streamReader = new FileStream(fileToRead, FileMode.Open))
            using (BinaryReader binaryReader = new BinaryReader(streamReader))
            using (FileStream streamWriter = new FileStream(fileToWrite, FileMode.Open))
            using (BinaryWriter binaryWriter = new BinaryWriter(streamWriter))
            {
                while (streamWriter.Position < fileInfo.Length)
                {
                    //read
                    for (int g = 0; g < threadNumber; g++)
                    {
                        if (streamWriter.Position + bufferSize <= fileInfo.Length)
                        {
                            bufferPool.Add(binaryReader.ReadBytes(bufferSize));
                        }
                        else
                        {
                            bufferPool.Add(binaryReader.ReadBytes((int)(fileInfo.Length - streamWriter.Position)));
                            break;
                        }
                    }
                    //do
                    var parentTask = Task.Run(() =>
                    {
                        for (int th = 0; th < bufferPool.Count; th++)
                        {
                            int index = th;
                            //threads
                            tasks[index] = Task.Factory.StartNew(() =>
                            {
                                Proc(bufferPool[index]);
                            });
                        }//for th
                    });
                    //stop parent task(run childs)
                    parentTask.Wait();
                    //wait till all task be done
                    Task.WaitAll(tasks);
                    //write
                    for (int g = 0; g < bufferPool.Count; g++)
                    {
                        binaryWriter.Write(bufferPool[g]);
                    }
                    //report if procentage changed
                    //...
                }//while
            }//using
        }

使用线程(任务)执行工作包含 I/O

本质上,

您希望将数据处理拆分为并行任务,但不希望拆分 IO。

发生这种情况的方式取决于数据的大小。如果它足够小以适合内存,那么您可以将其全部读入输入数组并创建一个输出数组,然后创建任务来处理一些输入数组并填充一些输出数组,然后将整个输出数组写入文件。

如果数据太大,则需要限制一次读取和写入的数据量。因此,您的主流从读取 N 个数据块并创建 N 个任务来处理它们开始。然后,您等待任务按顺序完成,每次完成任务时,您都会写入输出块并读取新的输入块并创建另一个任务。需要一些实验才能获得 N 和块大小的良好值,这意味着任务的完成速度往往与 IO 的工作速度大致相同。

相关文章: