如何正确地限制WebJobs对DocumentDb的访问

本文关键字:DocumentDb 访问 WebJobs 正确地 | 更新日期: 2023-09-27 18:10:31

我有一个带有blob和队列触发器的Azure WebKob,用于将数据保存到Azure DocumentDb。

有时我得到一个错误:

Microsoft.Azure.Documents。RequestRateTooLargeException: Message: {"Errors":["Request rate is large"]}

目前我使用这段代码限制请求。WebJob函数:

public async Task ParseCategoriesFromCsv(...)
{
    double find = 2.23, add = 5.9, replace = 10.67;
    double requestCharge = Math.Round(find + Math.Max(add, replace));
    await categoryProvider.SaveCategories(requestCharge , categories);
}

操作文档数据库客户端的类别提供程序:

public async Task<ResourceResponse<Document>[]> SaveCategories(double requestCharge, Category[] categories)
{
    var requestDelay = TimeSpan.FromSeconds(60.0 / (collectionOptions.RequestUnits / requestCharge));
    var scheduler = new IntervalTaskScheduler(requestDelay, Scheduler.Default); // Rx
    var client = new DocumentClient(endpoint, authorizationKey,
        new ConnectionPolicy
        {
            ConnectionMode = documentDbOptions.ConnectionMode,
            ConnectionProtocol = documentDbOptions.ConnectionProtocol
        });
    return await Task.WhenAll(documents.Select(async d =>
       await scheduler.ScheduleTask(
           () => client.PutDocumentToDb(collectionOptions.CollectionLink, d.SearchIndex, d))));
}

控制/测量/同步请求的任务调度程序:

private readonly Subject<Action> _requests = new Subject<Action>();
private readonly IDisposable _observable;
public IntervalTaskScheduler(TimeSpan requestDelay, IScheduler scheduler)
{
    _observable = _requests.Select(i => Observable.Empty<Action>()
                                                  .Delay(requestDelay)
                                                  .StartWith(i))
                           .Concat()
                           .ObserveOn(scheduler)
                           .Subscribe(action => action());
}
public Task<T> ScheduleTask<T>(Func<Task<T>> request)
{
    var tcs = new TaskCompletionSource<T>();
    _requests.OnNext(async () =>
    {
        try
        {
            T result = await request();
            tcs.SetResult(result);
        }
        catch (Exception ex)
        {
            tcs.SetException(ex);
        }
    });
    return tcs.Task;
}

基本上是ResourceResponse<Document>.RequestCharge的一些常量但是:

  • 当我有1个队列触发它工作正常,但当8队列它抛出一个错误。
  • 如果将请求费用增加8倍,则8个队列可以正常工作,但只有1个队列的工作速度慢8倍。

什么节流/测量/同步机制可以在这里很好地工作?

如何正确地限制WebJobs对DocumentDb的访问

从。net SDK 1.8.0开始,我们自动在合理范围内处理请求率过大的异常(默认情况下重试9次,并在从服务器返回下一次重试后尊重重试)。

如果你需要更好的控制,你可以在ConnectionPolicy实例上配置RetryOptions,你传递给DocumentClient对象,我们会用它覆盖默认的重试策略。

因此,您不再需要像上面那样在应用程序代码中添加任何自定义逻辑来处理429异常。

当获得429(请求率太大)时,响应会告诉您需要等待多长时间。有一个标头x-ms-retry-after。这是有值的。在ms中等待该时间段

catch (AggregateException ex) when (ex.InnerException is DocumentClientException)
{
    DocumentClientException dce = (DocumentClientException)ex.InnerException;
    switch ((int)dce.StatusCode)
    {
        case 429:
            Thread.Sleep(dce.RetryAfter);
            break;
         default:
             Console.WriteLine("  Failed: {0}", ex.InnerException.Message);
             throw;
     }                    
}

在我看来,你应该能够用你的SaveCategories方法做到这一点,使它与Rx很好地工作:

public IObservable<ResourceResponse<Document>[]> SaveCategories(double requestCharge, Category[] categories)
{
    var requestDelay = TimeSpan.FromSeconds(60.0 / (collectionOptions.RequestUnits / requestCharge));
    var client = new DocumentClient(endpoint, authorizationKey,
        new ConnectionPolicy
        {
            ConnectionMode = documentDbOptions.ConnectionMode,
            ConnectionProtocol = documentDbOptions.ConnectionProtocol
        });
    return
        Observable.Interval(requestDelay)
            .Zip(documents, (delay, doc) => doc)
            .SelectMany(doc => Observable.FromAsync(() => client.PutDocumentToDb(collectionOptions.CollectionLink, doc.SearchIndex, doc)))
            .ToArray();
}

这完全摆脱了您的IntervalTaskScheduler类,并确保您将请求率限制为每个requestDelay时间跨度的一个请求,但允许响应花费尽可能长的时间。To .ToArray()调用将返回多个值的IObservable<ResourceResponse<Document>>转换为在可观察对象完成时返回单个值数组的IObservable<ResourceResponse<Document>[]>

我无法测试你的代码,所以我测试了一个我认为模拟你的代码的样本:

var r = new Random();
var a = Enumerable.Range(0, 1000);
var i = Observable.Interval(TimeSpan.FromSeconds(2.0));
var sw = Stopwatch.StartNew();
var query =
    i.Zip(a, (ii, aa) => aa)
        .SelectMany(aa => Observable.Start(() =>
        {
            var x = sw.Elapsed.TotalMilliseconds;
            Thread.Sleep(r.Next(0, 5000));
            return x;
        }))
        .Select(x => new
        {
            started = x,
            ended = sw.Elapsed.TotalMilliseconds
        });

我得到了这样的结果,这表明请求被限制了:

 4026.2983  5259.7043 
 2030.1287  6940.2326 
 6027.0439  9664.1045 
 8027.9993 10207.0579 
10028.1762 12301.4746 
12028.3190 12711.4440 
14040.7972 17433.1964 
16040.9267 17574.5924 
18041.0529 19077.5545