任务线程和Azure CloudBlobClient

本文关键字:CloudBlobClient Azure 线程 任务 | 更新日期: 2023-09-27 18:19:45

我已经研究了几天了,已经阅读了之前关于多个线程和blob客户端的问题,并实现了他们的建议。

我把这个问题归纳到下面。

没有生成任何错误,只是没有向线程测试容器(已存在)写入任何内容。有时一个blob被写入,然后什么都没有。

如果我将睡眠时间增加到1秒,一切都会好起来。

编写代码的原因是为了对Azure的blob编写能力进行基准测试。(我目前有8个单线程实例,每小时处理700000个,但如果我能解决这个问题,我相信我可以得到更高的结果)

using System;
using System.Net;
using System.Threading;
using Microsoft.WindowsAzure;
using Microsoft.WindowsAzure.ServiceRuntime;
using Microsoft.WindowsAzure.StorageClient;
using System.Threading.Tasks;
namespace ThreadedWriterTest
{
    public class WorkerRole : RoleEntryPoint
    {
        private static CloudStorageAccount storageAccount;
        public override void Run()
        {
            while (true)
            {
                Thread.Sleep(10);
                Task.Factory.StartNew(()=> writeStuff());
            }
        }
        private void writeStuff()
        {
            CloudBlobClient threadClient = storageAccount.CreateCloudBlobClient();
            threadClient.GetBlobReference("threadtest/" + Guid.NewGuid().ToString()).UploadText("Hello " + Guid.NewGuid().ToString());
        }

        public override bool OnStart()
        {
            ServicePointManager.DefaultConnectionLimit = 12;
            storageAccount = CloudStorageAccount.Parse(CloudConfigurationManager.GetSetting("XXX"));
            return base.OnStart();
        }
    }
}

任务线程和Azure CloudBlobClient

上面的代码生成了太多并发线程,我用Thread.Sleep()节流的天真方法不足以限制线程数。

引入信号量(基本上是一种计算有多少线程同时执行的机制)极大地解决了这个问题。我正在稳步增加并发限制和实例数,并且已经超过了每小时100万。(实际代码生成随机长度数据16-32K,奇数为~4MB,4个实例,10个并发线程)

using System;
using System.Net;
using System.Threading;
using Microsoft.WindowsAzure;
using Microsoft.WindowsAzure.ServiceRuntime;
using Microsoft.WindowsAzure.StorageClient;
using System.Threading.Tasks;
namespace ThreadedWriterTest
{
    public class WorkerRole : RoleEntryPoint
    {
        private static CloudStorageAccount storageAccount;
        private static Semaphore semaphore = new Semaphore(3, 3);
        public override void Run()
        {
            while (true)
            {
                semaphore.WaitOne();
                Task.Factory.StartNew(()=> writeStuff());
            }
        }
        private void writeStuff()
        {
            CloudBlobClient threadClient = storageAccount.CreateCloudBlobClient();
            threadClient.GetBlobReference("threadtest/" + Guid.NewGuid().ToString()).UploadText("Hello " + Guid.NewGuid().ToString());
            semaphore.Release();
        }

        public override bool OnStart()
        {
            ServicePointManager.DefaultConnectionLimit = 12;
            storageAccount = CloudStorageAccount.Parse(CloudConfigurationManager.GetSetting("XXX"));
            return base.OnStart();
        }
    }
}