多线程上传程序
本文关键字:程序 多线程 | 更新日期: 2023-09-27 18:27:24
我想将巨大的文件(2到40 GB之间)上传到Azure Blob存储。
首先,我成功地将每个文件分割成块(每个块=2MB)。然后我一个接一个地上传区块,在每个区块成功上传后,我会更新一个临时文件,以便在应用程序关闭的情况下继续上传。
现在我想让上传操作多线程化。复习完TPL后,我很困惑该从哪里开始!
从哪里开始TPL有什么指导吗?
void Upload(int segmentId)
{
try
{
string blockId = GetBlockId(segmentId);
var segment = GetSegment(FilePath, segmentId, SeqmentSize);
var md5Hash = CalcMd5Hash(segment);
var blob = new CloudBlockBlob(_link.Uri);
using (var memoryStream = new MemoryStream(segment))
{
blob.PutBlock(blockId, memoryStream, md5Hash);
}
SerializeStatus();
}
catch (Exception exception)
{
...
}
}
很久以前,我在想要upload really large blobs with resumable capability
的地方构建了类似的东西(尽管我使用异步方法而不是TPL)。以下是我所做的:
- 首先,根据块大小,我将文件分成块。每个区块都被分配了一个id。然后我创建了一个对象,其中包含区块id和该区块的状态。为了简单起见,我为块保留了以下状态——
NotStarted
、Successful
和Failed
- 然后,我创建了一个这些块的集合,并将这些数据序列化到一个文件中
- 根据并行线程的数量(比如说x),我从状态为
NotStarted
的集合中获取x个项目。然后我并行处理这些块。我将chunk id作为用户状态传递,这样当我收到回调时,我会根据上传状态相应地更新集合并序列化数据 - 上传完所有区块后,我检查是否有任何失败的区块。如果有的话,我会重试这些块
- 一旦所有块都成功完成,我只需从块集合创建一个块列表并提交该块列表。如果提交块列表操作成功,我只需删除包含块数据的文件
希望这能有所帮助。
更新
一定要看看这个伪代码,看看这是否对你有帮助:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace UploadLargeBlob
{
class Program
{
static void Main(string[] args)
{
List<ChunkInformation> chunksToUpload = new List<ChunkInformation>();
CreateChunkCollection("MyVeryLargeFile", 2*1024*1024);
int numberOfParallelThreads = 8;
do
{
var chunksToProcess = chunksToUpload.Where(c => c.Status == ChunkStatus.NotStarted || c.Status == ChunkStatus.Failed).Take(numberOfParallelThreads);
if (chunksToProcess.Count() == 0)
{
break;
}
List<Task> tasks = new List<Task>();
try
{
foreach (var chunk in chunksToProcess)
{
tasks.Add(Task.Factory.StartNew(() =>
{
DoUpload(chunk);
}, chunk));
}
Task.WaitAll(tasks.ToArray());
}
catch (AggregateException excep)
{
foreach (var task in tasks)
{
if (task.Exception != null)
{
ChunkInformation chunk = task.AsyncState as ChunkInformation;
chunk.Status = ChunkStatus.Failed;
//Now serialize the data.
}
}
}
}
while (true);
}
static void DoUpload(ChunkInformation chunk)
{
//Do the actual upload
//Update chunk status once chunk is uploaded
chunk.Status = ChunkStatus.Successful;
//Serialize the data.
}
static void CreateChunkCollection(string fileName, int chunkSize)
{
}
}
public class ChunkInformation
{
public string Id
{
get;
set;
}
public ChunkStatus Status
{
get;
set;
}
}
public enum ChunkStatus
{
NotStarted,
Successful,
Failed
}
}