带有动态maxCount的SemaphoreSlim

本文关键字:SemaphoreSlim maxCount 动态 | 更新日期: 2023-09-27 18:07:01

我正面临一个问题,我需要限制呼叫另一个web服务器的数量。它会因服务器是共享的而有所不同,可能会有更多或更少的容量。

我正在考虑使用SemaphoreSlim类,但没有公共属性来改变最大计数。

我应该把我的SemaphoreSlim类包装在另一个类,将处理最大计数?有没有更好的办法?

编辑:

这是我正在尝试的:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Semaphore
{
class Program
{
    static SemaphoreSlim _sem = new SemaphoreSlim(10,10000);
    static void Main(string[] args)
    {
        int max = 15;
        for (int i = 1; i <= 50; i++)
        {
            new Thread(Enter).Start(new int[] { i, max});
        }
        Console.ReadLine();
        max = 11;
        for (int i = 1; i <= 50; i++)
        {
            new Thread(Enter).Start(new int[] { i, max });
        }
    }
    static void Enter(object param)
    {
        int[] arr = (int[])param;
        int id = arr[0];
        int max = arr[1];
        try
        {
            Console.WriteLine(_sem.CurrentCount);
            if (_sem.CurrentCount <= max)
                _sem.Release(1);
            else
            {
                _sem.Wait(1000);
                Console.WriteLine(id + " wants to enter");
                Thread.Sleep((1000 * id) / 2); // can be here at
                Console.WriteLine(id + " is in!"); // Only three threads
            }
        }
        catch(Exception ex)
        {
            Console.WriteLine("opps ", id);
            Console.WriteLine(ex.Message);
        }
        finally            
        {
            _sem.Release();
        }
    }
}
}

问题:

1-_sem.Wait(1000)应该取消执行时间超过1000ms的线程,不是吗?

2-我有使用释放/等待的想法吗?

带有动态maxCount的SemaphoreSlim

您不能更改最大计数,但是您可以创建一个具有非常高的最大计数的SemaphoreSlim,并保留其中的一些。参见这个构造函数。

因此,假设并发调用的绝对最大数量是100,但最初您希望它是25。初始化信号量:
SemaphoreSlim sem = new SemaphoreSlim(25, 100);

25是可以并发处理的请求数。您已经预订了另外75个。

如果你想增加允许的数量,只需调用Release(num)。如果您拨打Release(10),那么号码将转到35。

现在,如果您想减少可用请求的数量,您必须多次调用WaitOne。例如,如果要从可用计数中删除10:

for (var i = 0; i < 10; ++i)
{
    sem.WaitOne();
}

这有可能阻塞,直到其他客户端释放信号量。也就是说,如果您允许35个并发请求,并且您想将其减少到25个,但是已经有35个具有活动请求的客户端,那么WaitOne将阻塞直到一个客户端调用Release,并且直到10个客户端释放循环才会终止。

  1. 获取一个信号量
  2. 将容量设置为比您需要的高一些。
  3. 将初始容量设置为您希望的实际最大容量。
  4. 给出信号量给其他人使用
此时,您可以在信号量上任意等待(不需要相应的释放调用)以降低容量。可以多次释放信号量(不需要相应的等待调用)来增加有效容量。

如果这是你做得足够的事情,你可以潜在地创建自己的信号量类,它组成一个SemaphoreSlim并封装这个逻辑。如果你的代码已经释放了一个信号量,而没有先等待它,这种组合也是必不可少的;使用您自己的类,您可以确保此类发布是无操作的。(也就是说,一开始你就应该避免把自己置于那种境地,真的。)

我是这样解决这种情况的:我创建了一个自定义信号量精简类,它允许我增加和减少插槽的数量。这个类还允许我设置最大插槽数量,这样我就不会超过一个"合理"的数字,也可以设置最小插槽数量,这样我就不会低于一个"合理"的阈值。

using Picton.Messaging.Logging;
using System;
using System.Threading;
namespace Picton.Messaging.Utils
{
    /// <summary>
    /// An improvement over System.Threading.SemaphoreSlim that allows you to dynamically increase and
    /// decrease the number of threads that can access a resource or pool of resources concurrently.
    /// </summary>
    /// <seealso cref="System.Threading.SemaphoreSlim" />
    public class SemaphoreSlimDynamic : SemaphoreSlim
    {
        #region FIELDS
        private static readonly ILog _logger = LogProvider.GetLogger(typeof(SemaphoreSlimDynamic));
        private readonly ReaderWriterLockSlim _lock;
        #endregion
        #region PROPERTIES
        /// <summary>
        /// Gets the minimum number of slots.
        /// </summary>
        /// <value>
        /// The minimum slots count.
        /// </value>
        public int MinimumSlotsCount { get; private set; }
        /// <summary>
        /// Gets the number of slots currently available.
        /// </summary>
        /// <value>
        /// The available slots count.
        /// </value>
        public int AvailableSlotsCount { get; private set; }
        /// <summary>
        /// Gets the maximum number of slots.
        /// </summary>
        /// <value>
        /// The maximum slots count.
        /// </value>
        public int MaximumSlotsCount { get; private set; }
        #endregion
        #region CONSTRUCTOR
        /// <summary>
        /// Initializes a new instance of the <see cref="SemaphoreSlimDynamic"/> class.
        /// </summary>
        /// <param name="minCount">The minimum number of slots.</param>
        /// <param name="initialCount">The initial number of slots.</param>
        /// <param name="maxCount">The maximum number of slots.</param>
        public SemaphoreSlimDynamic(int minCount, int initialCount, int maxCount)
            : base(initialCount, maxCount)
        {
            _lock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion);
            this.MinimumSlotsCount = minCount;
            this.AvailableSlotsCount = initialCount;
            this.MaximumSlotsCount = maxCount;
        }
        #endregion
        #region PUBLIC METHODS
        /// <summary>
        /// Attempts to increase the number of slots
        /// </summary>
        /// <param name="millisecondsTimeout">The timeout in milliseconds.</param>
        /// <param name="increaseCount">The number of slots to add</param>
        /// <returns>true if the attempt was successfully; otherwise, false.</returns>
        public bool TryIncrease(int millisecondsTimeout = 500, int increaseCount = 1)
        {
            return TryIncrease(TimeSpan.FromMilliseconds(millisecondsTimeout), increaseCount);
        }
        /// <summary>
        /// Attempts to increase the number of slots
        /// </summary>
        /// <param name="timeout">The timeout.</param>
        /// <param name="increaseCount">The number of slots to add</param>
        /// <returns>true if the attempt was successfully; otherwise, false.</returns>
        public bool TryIncrease(TimeSpan timeout, int increaseCount = 1)
        {
            if (increaseCount < 0) throw new ArgumentOutOfRangeException(nameof(increaseCount));
            else if (increaseCount == 0) return false;
            var increased = false;
            try
            {
                if (this.AvailableSlotsCount < this.MaximumSlotsCount)
                {
                    var lockAcquired = _lock.TryEnterWriteLock(timeout);
                    if (lockAcquired)
                    {
                        for (int i = 0; i < increaseCount; i++)
                        {
                            if (this.AvailableSlotsCount < this.MaximumSlotsCount)
                            {
                                Release();
                                this.AvailableSlotsCount++;
                                increased = true;
                            }
                        }
                        if (increased) _logger.Trace($"Semaphore slots increased: {this.AvailableSlotsCount}");
                        _lock.ExitWriteLock();
                    }
                }
            }
            catch (SemaphoreFullException)
            {
                // An exception is thrown if we attempt to exceed the max number of concurrent tasks
                // It's safe to ignore this exception
            }
            return increased;
        }
        /// <summary>
        /// Attempts to decrease the number of slots
        /// </summary>
        /// <param name="millisecondsTimeout">The timeout in milliseconds.</param>
        /// <param name="decreaseCount">The number of slots to add</param>
        /// <returns>true if the attempt was successfully; otherwise, false.</returns>
        public bool TryDecrease(int millisecondsTimeout = 500, int decreaseCount = 1)
        {
            return TryDecrease(TimeSpan.FromMilliseconds(millisecondsTimeout), decreaseCount);
        }
        /// <summary>
        /// Attempts to decrease the number of slots
        /// </summary>
        /// <param name="timeout">The timeout.</param>
        /// <param name="decreaseCount">The number of slots to add</param>
        /// <returns>true if the attempt was successfully; otherwise, false.</returns>
        public bool TryDecrease(TimeSpan timeout, int decreaseCount = 1)
        {
            if (decreaseCount < 0) throw new ArgumentOutOfRangeException(nameof(decreaseCount));
            else if (decreaseCount == 0) return false;
            var decreased = false;
            if (this.AvailableSlotsCount > this.MinimumSlotsCount)
            {
                var lockAcquired = _lock.TryEnterWriteLock(timeout);
                if (lockAcquired)
                {
                    for (int i = 0; i < decreaseCount; i++)
                    {
                        if (this.AvailableSlotsCount > this.MinimumSlotsCount)
                        {
                            if (Wait(timeout))
                            {
                                this.AvailableSlotsCount--;
                                decreased = true;
                            }
                        }
                    }
                    if (decreased) _logger.Trace($"Semaphore slots decreased: {this.AvailableSlotsCount}");
                    _lock.ExitWriteLock();
                }
            }
            return decreased;
        }
        #endregion
    }
}

好的,我可以在单项目上解决我的问题。

// SemaphoreSlim.cs
//
// Copyright (c) 2008 Jérémie "Garuma" Laval
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
//
//
using System;
using System.Diagnostics;
using System.Threading.Tasks;
namespace System.Threading
{
    public class SemaphoreSlimCustom : IDisposable
    {
        const int spinCount = 10;
        const int deepSleepTime = 20;
        private object _sync = new object();

        int maxCount;
        int currCount;
        bool isDisposed;
        public int MaxCount
        {
            get { lock (_sync) { return maxCount; } }
            set
            {
                lock (_sync)
                {
                    maxCount = value;
                }
            }
        }
        EventWaitHandle handle;
        public SemaphoreSlimCustom (int initialCount) : this (initialCount, int.MaxValue)
        {
        }
        public SemaphoreSlimCustom (int initialCount, int maxCount)
        {
            if (initialCount < 0 || initialCount > maxCount || maxCount < 0)
                throw new ArgumentOutOfRangeException ("The initialCount  argument is negative, initialCount is greater than maxCount, or maxCount is not positive.");
            this.maxCount = maxCount;
            this.currCount = initialCount;
            this.handle = new ManualResetEvent (initialCount > 0);
        }
        public void Dispose ()
        {
            Dispose(true);
        }
        protected virtual void Dispose (bool disposing)
        {
            isDisposed = true;
        }
        void CheckState ()
        {
            if (isDisposed)
                throw new ObjectDisposedException ("The SemaphoreSlim has been disposed.");
        }
        public int CurrentCount {
            get {
                return currCount;
            }
        }
        public int Release ()
        {
            return Release(1);
        }
        public int Release (int releaseCount)
        {
            CheckState ();
            if (releaseCount < 1)
                throw new ArgumentOutOfRangeException ("releaseCount", "releaseCount is less than 1");
            // As we have to take care of the max limit we resort to CAS
            int oldValue, newValue;
            do {
                oldValue = currCount;
                newValue = (currCount + releaseCount);
                newValue = newValue > maxCount ? maxCount : newValue;
            } while (Interlocked.CompareExchange (ref currCount, newValue, oldValue) != oldValue);
            handle.Set ();
            return oldValue;
        }
        public void Wait ()
        {
            Wait (CancellationToken.None);
        }
        public bool Wait (TimeSpan timeout)
        {
            return Wait ((int)timeout.TotalMilliseconds, CancellationToken.None);
        }
        public bool Wait (int millisecondsTimeout)
        {
            return Wait (millisecondsTimeout, CancellationToken.None);
        }
        public void Wait (CancellationToken cancellationToken)
        {
            Wait (-1, cancellationToken);
        }
        public bool Wait (TimeSpan timeout, CancellationToken cancellationToken)
        {
            CheckState();
            return Wait ((int)timeout.TotalMilliseconds, cancellationToken);
        }
        public bool Wait (int millisecondsTimeout, CancellationToken cancellationToken)
        {
            CheckState ();
            if (millisecondsTimeout < -1)
                throw new ArgumentOutOfRangeException ("millisecondsTimeout",
                                                       "millisecondsTimeout is a negative number other than -1");
            Stopwatch sw = Stopwatch.StartNew();
            Func<bool> stopCondition = () => millisecondsTimeout >= 0 && sw.ElapsedMilliseconds > millisecondsTimeout;
            do {
                bool shouldWait;
                int result;
                do {
                    cancellationToken.ThrowIfCancellationRequested ();
                    if (stopCondition ())
                        return false;
                    shouldWait = true;
                    result = currCount;
                    if (result > 0)
                        shouldWait = false;
                    else
                        break;
                } while (Interlocked.CompareExchange (ref currCount, result - 1, result) != result);
                if (!shouldWait) {
                    if (result == 1)
                        handle.Reset ();
                    break;
                }
                SpinWait wait = new SpinWait ();
                while (Thread.VolatileRead (ref currCount) <= 0) {
                    cancellationToken.ThrowIfCancellationRequested ();
                    if (stopCondition ())
                        return false;
                    if (wait.Count > spinCount) {
                        int diff = millisecondsTimeout - (int)sw.ElapsedMilliseconds;
                        int timeout = millisecondsTimeout < 0 ? deepSleepTime :

                            Math.Min (Math.Max (diff, 1), deepSleepTime);
                        handle.WaitOne (timeout);
                    } else
                        wait.SpinOnce ();
                }
            } while (true);
            return true;
        }
        public WaitHandle AvailableWaitHandle {
            get {
                return handle;
            }
        }
        public Task WaitAsync ()
        {
            return Task.Factory.StartNew (() => Wait ());
        }
        public Task WaitAsync (CancellationToken cancellationToken)
        {
            return Task.Factory.StartNew (() => Wait (cancellationToken), cancellationToken);
        }
        public Task<bool> WaitAsync (int millisecondsTimeout)
        {
            return Task.Factory.StartNew (() => Wait (millisecondsTimeout));
        }
        public Task<bool> WaitAsync (TimeSpan timeout)
        {
            return Task.Factory.StartNew (() => Wait (timeout));
        }
        public Task<bool> WaitAsync (int millisecondsTimeout, CancellationToken cancellationToken)
        {
            return Task.Factory.StartNew (() => Wait (millisecondsTimeout, cancellationToken), cancellationToken);
        }
        public Task<bool> WaitAsync (TimeSpan timeout, CancellationToken cancellationToken)
        {
            return Task.Factory.StartNew (() => Wait (timeout, cancellationToken), cancellationToken);
        }
    }
}

更新。net Core 5答案:

假设我想要一个最多有10个请求的锁,但大多数情况下我只想要1个。

private readonly static SemaphoreSlim semLock = new(1, 10);

现在当我想要释放一些资源时,我可以这样做:

semLock.Release(Math.Min(9, requiredAmount));

注意,9比10少1,因为我们已经有了一个初始版本。

当我想再次限制可用资源时,我可以调用:
while(semLock.CurrentCount > 1)
{
    await semLock.WaitAsync();
}

将等待它返回到1