从azure存储异步下载blob,并将它们保存在DataTable中

本文关键字:保存 存在 DataTable 存储 azure 异步 下载 blob | 更新日期: 2023-09-27 17:49:33

下面的代码显示了我如何从azure blob存储下载blob并将它们保存到DataTable中:

foreach (var currIndexGroup in blobsGroupedByIndex)
{
    DataRow dr = dtResult.NewRow();
    foreach (var currIndex in currIndexGroup)
    {       
        long fileByteLength = currIndex.Properties.Length;
        byte[] serializedAndCompressedResult = new byte[fileByteLength];
        currIndex.DownloadToByteArray(serializedAndCompressedResult, 0);
        dr[currIndex.Metadata["columnName"]] = DeflateStream.UncompressString(serializedAndCompressedResult);
    }
    dtResult.Rows.Add(dr);
}

问题是,下载非常慢。下载1000个真正的小斑点大约需要20秒。如果我尝试通过使用currIndex.DownloadToByteArrayAsync(serializedAndCompressedResult, 0);异步运行它,后续行会抛出Bad state (invalid stored block lengths)异常。

异步填充这个数据表的正确方法是什么?

从azure存储异步下载blob,并将它们保存在DataTable中

//the plan here is to make a model that holds your currIndex and byte array so you can return that model from a task
public class MyModel 
{
    public CloudBlockBlob CurrIndex {get;set;} 
    public byte[] FileBytes {get;set;}
}

foreach (var currIndexGroup in blobsGroupedByIndex)
{
    var myTasks = new List<Task<MyModel>>();
    foreach (var currIndex in currIndexGroup)
    {     
        myTasks.Add(Task<MyModel>.Factory.StartNew(() => 
        {
            var myModel = new MyModel();
            myModel.CurrIndex = currIndex;
            long fileByteLength = myModel.CurrIndex.Properties.Length;
            myModel.FileBytes = new byte[fileByteLength];
            currIndex.DownloadToByteArray(myModel.FileBytes, 0);
            return myModel;
        });
    }
    Task.WaitAll(myTasks.ToArray());
    foreach (var task in myTasks)
    {
        MyModel myModel = task.Result;
        DataRow dr = dtResult.NewRow();
        dr[myModel.CurrIndex.Metadata["columnName"]] = DeflateStream.UncompressString(myModel.FileBytes);
        dtResult.Rows.Add(dr);
    }
}

你可以通过在你的外部foreach循环上使用Parallel.ForEach来进一步提高并行性。您必须锁定dtResult以使其线程安全。