. net节流算法

本文关键字:算法 net | 更新日期: 2023-09-27 18:19:13

我想在。net (c#或VB)中实现一个好的节流算法,但我无法弄清楚我该如何做到这一点。

的情况是我的asp.net网站应该发布请求到另一个网站获取结果。每分钟最多发送300个请求。

如果请求超过300限制,另一方Api不返回任何东西(这是我不想在我的代码中使用的检查)。

注:我在其他语言中看到过解决方案,而不是。net,但我是一个新手,请友好并保持您的答案像123一样简单。

谢谢

. net节流算法

您可以有一个简单的应用程序(或会话)类,并检查是否命中。这是一些非常粗糙的东西,只是为了给你一个概念:

public class APIHits {
    public int hits { get; private set; }
    private DateTime minute = DateTime.Now();
    public bool AddHit()
    {
        if (hits < 300) {
            hits++;
            return true;
        }
        else
        {
            if (DateTime.Now() > minute.AddSeconds(60)) 
            {
                //60 seconds later
                minute = DateTime.Now();
                hits = 1;
                return true;
            }
            else
            {
                return false;
            }
        }
    }
}

最简单的方法就是计算数据包之间的时间间隔,并且不允许它们以每0.2秒超过一个的速率发送。也就是说,记录你被调用的时间和你下一次被调用的时间,检查是否至少有200ms的弹性,或者什么都不返回。

此方法将工作,但它只适用于平滑的数据包流-如果您期望活动爆发,那么您可能希望在任何200ms期间允许5条消息,只要平均超过1分钟不超过300个调用。在这种情况下,您可以使用一个值数组来存储最近300个数据包的"时间戳",然后每次收到一个呼叫时,您都可以回看"300个呼叫前",以检查至少已经过了1分钟。

对于这两种方案,Environment.TickCount返回的时间值将足以满足您的需要(跨度不小于200毫秒),因为它精确到大约15毫秒。

这是一个节流的异步和同步实现,它可以限制每次持续时间内对方法的调用次数。它基于当前时间与DateTimeOffset和Task.Delay/Thread.Sleep的简单比较。对于许多不需要高度时间分辨率的实现,它应该可以很好地工作,并且应该在您想要节流的方法之前调用。

该解决方案允许用户指定每个时间段允许的呼叫数(默认为每个时间段1个呼叫)。这可以让你的节流阀保持"稳定"。因为您需要以无法控制调用者何时可以继续为代价,或者调用可以尽可能均匀间隔。

假设目标是300个呼叫/分钟:你可以有一个持续时间为200ms的常规油门,它将均匀地分布在每个呼叫之间,至少有200ms的间隔,或者你可以创建一个油门,允许每秒5个呼叫,而不考虑它们的间隔(前5个呼叫获胜-可能一次全部!)。两者都将速率限制在300个电话/分钟以下,但前者是均匀分离的极端,后者更"爆发"。当在循环中处理项时,将内容均匀分布是很好的,但对于并行运行的内容(如web请求)可能不太好,因为调用时间是不可预测的,并且不必要的延迟实际上可能会降低吞吐量。再一次,你的用例和测试将会是你最好的指南。

这个类是线程安全的,你需要在某个地方保持对它的一个实例的引用,以便需要共享它的对象实例可以访问。对于ASP。. NET web应用程序,可以是应用程序实例上的一个字段,也可以是web页面/控制器上的一个静态字段,可以从您选择的DI容器中作为单例注入,也可以在您的特定场景中以任何其他方式访问共享实例。

编辑:更新以确保延迟不会超过持续时间。

    public class Throttle
    {
        /// <summary>
        /// How maximum time to delay access.
        /// </summary>
        private readonly TimeSpan _duration;
        /// <summary>
        /// The next time to run.
        /// </summary>
        private DateTimeOffset _next = DateTimeOffset.MinValue;
        /// <summary>
        /// Synchronize access to the throttle gate.
        /// </summary>
        private readonly SemaphoreSlim _mutex = new SemaphoreSlim(1, 1);
        /// <summary>
        /// Number of allowed callers per time window.
        /// </summary>
        private readonly int _numAllowed = 1;
        /// <summary>
        /// The number of calls in the current time window.
        /// </summary>
        private int _count;
        /// <summary>
        /// The amount of time per window.
        /// </summary>
        public TimeSpan Duration => _duration;
        /// <summary>
        /// The number of calls per time period.
        /// </summary>
        public int Size => _numAllowed;
        /// <summary>
        /// Crates a Throttle that will allow one caller per duration.
        /// </summary>
        /// <param name="duration">The amount of time that must pass between calls.</param>
        public Throttle(TimeSpan duration)
        {
            if (duration.Ticks <= 0)
                throw new ArgumentOutOfRangeException(nameof(duration));
            _duration = duration;
        }
        /// <summary>
        /// Creates a Throttle that will allow the given number of callers per time period.
        /// </summary>
        /// <param name="num">The number of calls to allow per time period.</param>
        /// <param name="per">The duration of the time period.</param>
        public Throttle(int num, TimeSpan per)
        {
            if (num <= 0 || per.Ticks <= 0)
                throw new ArgumentOutOfRangeException();
            _numAllowed = num;
            _duration = per;
        }
        /// <summary>
        /// Returns a task that will complete when the caller may continue.
        /// </summary>
        /// <remarks>This method can be used to synchronize access to a resource at regular intervals
        /// with no more frequency than specified by the duration,
        /// and should be called BEFORE accessing the resource.</remarks>
        /// <param name="cancellationToken">A cancellation token that may be used to abort the stop operation.</param>
        /// <returns>The number of actors that have been allowed within the current time window.</returns>
        public async Task<int> WaitAsync(CancellationToken cancellationToken = default(CancellationToken))
        {
            await _mutex.WaitAsync(cancellationToken)
                .ConfigureAwait(false);
            try
            {
                var delay = _next - DateTimeOffset.UtcNow;
                // ensure delay is never longer than the duration
                if (delay > _duration)
                    delay = _duration;
                
                // continue immediately based on count
                if (_count < _numAllowed) 
                {
                    _count++;
                    if (delay.Ticks <= 0) // past time window, reset
                    {
                        _next = DateTimeOffset.UtcNow.Add(_duration);
                        _count = 1;
                    }
                    return _count;
                }
                
                // over the allowed count within the window
                if (delay.Ticks > 0)
                {
                    // delay until the next window
                    await Task.Delay(delay, cancellationToken)
                        .ConfigureAwait(false);
                }
                _next = DateTimeOffset.UtcNow.Add(_duration);
                _count = 1;
                return _count;
            }
            finally
            {
                _mutex.Release();
            }
        }
        /// <summary>
        /// Returns a task that will complete when the caller may continue.
        /// </summary>
        /// <remarks>This method can be used to synchronize access to a resource at regular intervals
        /// with no more frequency than specified by the duration,
        /// and should be called BEFORE accessing the resource.</remarks>
        /// <param name="cancellationToken">A cancellation token that may be used to abort the stop operation.</param>
        /// <returns>The number of actors that have been allowed within the current time window.</returns>
        public int Wait(CancellationToken cancellationToken = default(CancellationToken))
        {
            _mutex.Wait(cancellationToken);
            try
            {
                var delay = _next - DateTimeOffset.UtcNow;
                // ensure delay is never larger than the duration.
                if (delay > _duration)
                    delay = _duration;
                
                // continue immediately based on count
                if (_count < _numAllowed) 
                {
                    _count++;
                    if (delay.Ticks <= 0) // past time window, reset
                    {
                        _next = DateTimeOffset.UtcNow.Add(_duration);
                        _count = 1;
                    }
                    return _count;
                }
                
                // over the allowed count within the window
                if (delay.Ticks > 0)
                {
                    // delay until the next window
                    Thread.Sleep(delay);
                }
                _next = DateTimeOffset.UtcNow.Add(_duration);
                _count = 1;
                return _count;
            }
            finally
            {
                _mutex.Release();
            }
        }
    }

此示例显示了如何在循环中同步使用油门,以及如何取消行为。如果你把它想象成人们排队搭车,如果取消令牌发出信号,就好像这个人走出了队伍,其他人向前移动。

 var t = new Throttle(5, per: TimeSpan.FromSeconds(1));
 var c = new CancellationTokenSource(TimeSpan.FromSeconds(22));
 foreach(var i in Enumerable.Range(1,300)) {
     var ct = i > 250
         ? default(CancellationToken)
         : c.Token;
     try
     {
         var n = await t.WaitAsync(ct).ConfigureAwait(false);
         WriteLine($"{i}: [{n}] {DateTime.Now}");
     }
     catch (OperationCanceledException)
     {
         WriteLine($"{i}: Operation Canceled");
     }
 }