带有动态maxCount的SemaphoreSlim
本文关键字:SemaphoreSlim maxCount 动态 | 更新日期: 2023-09-27 18:07:01
我正面临一个问题,我需要限制呼叫另一个web服务器的数量。它会因服务器是共享的而有所不同,可能会有更多或更少的容量。
我正在考虑使用SemaphoreSlim类,但没有公共属性来改变最大计数。
我应该把我的SemaphoreSlim类包装在另一个类,将处理最大计数?有没有更好的办法?
编辑:这是我正在尝试的:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Semaphore
{
class Program
{
static SemaphoreSlim _sem = new SemaphoreSlim(10,10000);
static void Main(string[] args)
{
int max = 15;
for (int i = 1; i <= 50; i++)
{
new Thread(Enter).Start(new int[] { i, max});
}
Console.ReadLine();
max = 11;
for (int i = 1; i <= 50; i++)
{
new Thread(Enter).Start(new int[] { i, max });
}
}
static void Enter(object param)
{
int[] arr = (int[])param;
int id = arr[0];
int max = arr[1];
try
{
Console.WriteLine(_sem.CurrentCount);
if (_sem.CurrentCount <= max)
_sem.Release(1);
else
{
_sem.Wait(1000);
Console.WriteLine(id + " wants to enter");
Thread.Sleep((1000 * id) / 2); // can be here at
Console.WriteLine(id + " is in!"); // Only three threads
}
}
catch(Exception ex)
{
Console.WriteLine("opps ", id);
Console.WriteLine(ex.Message);
}
finally
{
_sem.Release();
}
}
}
}
问题:
1-_sem.Wait(1000)应该取消执行时间超过1000ms的线程,不是吗?
2-我有使用释放/等待的想法吗?
您不能更改最大计数,但是您可以创建一个具有非常高的最大计数的SemaphoreSlim
,并保留其中的一些。参见这个构造函数。
SemaphoreSlim sem = new SemaphoreSlim(25, 100);
25是可以并发处理的请求数。您已经预订了另外75个。
如果你想增加允许的数量,只需调用Release(num)。如果您拨打Release(10)
,那么号码将转到35。
现在,如果您想减少可用请求的数量,您必须多次调用WaitOne
。例如,如果要从可用计数中删除10:
for (var i = 0; i < 10; ++i)
{
sem.WaitOne();
}
这有可能阻塞,直到其他客户端释放信号量。也就是说,如果您允许35个并发请求,并且您想将其减少到25个,但是已经有35个具有活动请求的客户端,那么WaitOne
将阻塞直到一个客户端调用Release
,并且直到10个客户端释放循环才会终止。
- 获取一个信号量
- 将容量设置为比您需要的高一些。
- 将初始容量设置为您希望的实际最大容量。
- 给出信号量给其他人使用
如果这是你做得足够的事情,你可以潜在地创建自己的信号量类,它组成一个SemaphoreSlim
并封装这个逻辑。如果你的代码已经释放了一个信号量,而没有先等待它,这种组合也是必不可少的;使用您自己的类,您可以确保此类发布是无操作的。(也就是说,一开始你就应该避免把自己置于那种境地,真的。)
我是这样解决这种情况的:我创建了一个自定义信号量精简类,它允许我增加和减少插槽的数量。这个类还允许我设置最大插槽数量,这样我就不会超过一个"合理"的数字,也可以设置最小插槽数量,这样我就不会低于一个"合理"的阈值。
using Picton.Messaging.Logging;
using System;
using System.Threading;
namespace Picton.Messaging.Utils
{
/// <summary>
/// An improvement over System.Threading.SemaphoreSlim that allows you to dynamically increase and
/// decrease the number of threads that can access a resource or pool of resources concurrently.
/// </summary>
/// <seealso cref="System.Threading.SemaphoreSlim" />
public class SemaphoreSlimDynamic : SemaphoreSlim
{
#region FIELDS
private static readonly ILog _logger = LogProvider.GetLogger(typeof(SemaphoreSlimDynamic));
private readonly ReaderWriterLockSlim _lock;
#endregion
#region PROPERTIES
/// <summary>
/// Gets the minimum number of slots.
/// </summary>
/// <value>
/// The minimum slots count.
/// </value>
public int MinimumSlotsCount { get; private set; }
/// <summary>
/// Gets the number of slots currently available.
/// </summary>
/// <value>
/// The available slots count.
/// </value>
public int AvailableSlotsCount { get; private set; }
/// <summary>
/// Gets the maximum number of slots.
/// </summary>
/// <value>
/// The maximum slots count.
/// </value>
public int MaximumSlotsCount { get; private set; }
#endregion
#region CONSTRUCTOR
/// <summary>
/// Initializes a new instance of the <see cref="SemaphoreSlimDynamic"/> class.
/// </summary>
/// <param name="minCount">The minimum number of slots.</param>
/// <param name="initialCount">The initial number of slots.</param>
/// <param name="maxCount">The maximum number of slots.</param>
public SemaphoreSlimDynamic(int minCount, int initialCount, int maxCount)
: base(initialCount, maxCount)
{
_lock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion);
this.MinimumSlotsCount = minCount;
this.AvailableSlotsCount = initialCount;
this.MaximumSlotsCount = maxCount;
}
#endregion
#region PUBLIC METHODS
/// <summary>
/// Attempts to increase the number of slots
/// </summary>
/// <param name="millisecondsTimeout">The timeout in milliseconds.</param>
/// <param name="increaseCount">The number of slots to add</param>
/// <returns>true if the attempt was successfully; otherwise, false.</returns>
public bool TryIncrease(int millisecondsTimeout = 500, int increaseCount = 1)
{
return TryIncrease(TimeSpan.FromMilliseconds(millisecondsTimeout), increaseCount);
}
/// <summary>
/// Attempts to increase the number of slots
/// </summary>
/// <param name="timeout">The timeout.</param>
/// <param name="increaseCount">The number of slots to add</param>
/// <returns>true if the attempt was successfully; otherwise, false.</returns>
public bool TryIncrease(TimeSpan timeout, int increaseCount = 1)
{
if (increaseCount < 0) throw new ArgumentOutOfRangeException(nameof(increaseCount));
else if (increaseCount == 0) return false;
var increased = false;
try
{
if (this.AvailableSlotsCount < this.MaximumSlotsCount)
{
var lockAcquired = _lock.TryEnterWriteLock(timeout);
if (lockAcquired)
{
for (int i = 0; i < increaseCount; i++)
{
if (this.AvailableSlotsCount < this.MaximumSlotsCount)
{
Release();
this.AvailableSlotsCount++;
increased = true;
}
}
if (increased) _logger.Trace($"Semaphore slots increased: {this.AvailableSlotsCount}");
_lock.ExitWriteLock();
}
}
}
catch (SemaphoreFullException)
{
// An exception is thrown if we attempt to exceed the max number of concurrent tasks
// It's safe to ignore this exception
}
return increased;
}
/// <summary>
/// Attempts to decrease the number of slots
/// </summary>
/// <param name="millisecondsTimeout">The timeout in milliseconds.</param>
/// <param name="decreaseCount">The number of slots to add</param>
/// <returns>true if the attempt was successfully; otherwise, false.</returns>
public bool TryDecrease(int millisecondsTimeout = 500, int decreaseCount = 1)
{
return TryDecrease(TimeSpan.FromMilliseconds(millisecondsTimeout), decreaseCount);
}
/// <summary>
/// Attempts to decrease the number of slots
/// </summary>
/// <param name="timeout">The timeout.</param>
/// <param name="decreaseCount">The number of slots to add</param>
/// <returns>true if the attempt was successfully; otherwise, false.</returns>
public bool TryDecrease(TimeSpan timeout, int decreaseCount = 1)
{
if (decreaseCount < 0) throw new ArgumentOutOfRangeException(nameof(decreaseCount));
else if (decreaseCount == 0) return false;
var decreased = false;
if (this.AvailableSlotsCount > this.MinimumSlotsCount)
{
var lockAcquired = _lock.TryEnterWriteLock(timeout);
if (lockAcquired)
{
for (int i = 0; i < decreaseCount; i++)
{
if (this.AvailableSlotsCount > this.MinimumSlotsCount)
{
if (Wait(timeout))
{
this.AvailableSlotsCount--;
decreased = true;
}
}
}
if (decreased) _logger.Trace($"Semaphore slots decreased: {this.AvailableSlotsCount}");
_lock.ExitWriteLock();
}
}
return decreased;
}
#endregion
}
}
好的,我可以在单项目上解决我的问题。
// SemaphoreSlim.cs
//
// Copyright (c) 2008 Jérémie "Garuma" Laval
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
//
//
using System;
using System.Diagnostics;
using System.Threading.Tasks;
namespace System.Threading
{
public class SemaphoreSlimCustom : IDisposable
{
const int spinCount = 10;
const int deepSleepTime = 20;
private object _sync = new object();
int maxCount;
int currCount;
bool isDisposed;
public int MaxCount
{
get { lock (_sync) { return maxCount; } }
set
{
lock (_sync)
{
maxCount = value;
}
}
}
EventWaitHandle handle;
public SemaphoreSlimCustom (int initialCount) : this (initialCount, int.MaxValue)
{
}
public SemaphoreSlimCustom (int initialCount, int maxCount)
{
if (initialCount < 0 || initialCount > maxCount || maxCount < 0)
throw new ArgumentOutOfRangeException ("The initialCount argument is negative, initialCount is greater than maxCount, or maxCount is not positive.");
this.maxCount = maxCount;
this.currCount = initialCount;
this.handle = new ManualResetEvent (initialCount > 0);
}
public void Dispose ()
{
Dispose(true);
}
protected virtual void Dispose (bool disposing)
{
isDisposed = true;
}
void CheckState ()
{
if (isDisposed)
throw new ObjectDisposedException ("The SemaphoreSlim has been disposed.");
}
public int CurrentCount {
get {
return currCount;
}
}
public int Release ()
{
return Release(1);
}
public int Release (int releaseCount)
{
CheckState ();
if (releaseCount < 1)
throw new ArgumentOutOfRangeException ("releaseCount", "releaseCount is less than 1");
// As we have to take care of the max limit we resort to CAS
int oldValue, newValue;
do {
oldValue = currCount;
newValue = (currCount + releaseCount);
newValue = newValue > maxCount ? maxCount : newValue;
} while (Interlocked.CompareExchange (ref currCount, newValue, oldValue) != oldValue);
handle.Set ();
return oldValue;
}
public void Wait ()
{
Wait (CancellationToken.None);
}
public bool Wait (TimeSpan timeout)
{
return Wait ((int)timeout.TotalMilliseconds, CancellationToken.None);
}
public bool Wait (int millisecondsTimeout)
{
return Wait (millisecondsTimeout, CancellationToken.None);
}
public void Wait (CancellationToken cancellationToken)
{
Wait (-1, cancellationToken);
}
public bool Wait (TimeSpan timeout, CancellationToken cancellationToken)
{
CheckState();
return Wait ((int)timeout.TotalMilliseconds, cancellationToken);
}
public bool Wait (int millisecondsTimeout, CancellationToken cancellationToken)
{
CheckState ();
if (millisecondsTimeout < -1)
throw new ArgumentOutOfRangeException ("millisecondsTimeout",
"millisecondsTimeout is a negative number other than -1");
Stopwatch sw = Stopwatch.StartNew();
Func<bool> stopCondition = () => millisecondsTimeout >= 0 && sw.ElapsedMilliseconds > millisecondsTimeout;
do {
bool shouldWait;
int result;
do {
cancellationToken.ThrowIfCancellationRequested ();
if (stopCondition ())
return false;
shouldWait = true;
result = currCount;
if (result > 0)
shouldWait = false;
else
break;
} while (Interlocked.CompareExchange (ref currCount, result - 1, result) != result);
if (!shouldWait) {
if (result == 1)
handle.Reset ();
break;
}
SpinWait wait = new SpinWait ();
while (Thread.VolatileRead (ref currCount) <= 0) {
cancellationToken.ThrowIfCancellationRequested ();
if (stopCondition ())
return false;
if (wait.Count > spinCount) {
int diff = millisecondsTimeout - (int)sw.ElapsedMilliseconds;
int timeout = millisecondsTimeout < 0 ? deepSleepTime :
Math.Min (Math.Max (diff, 1), deepSleepTime);
handle.WaitOne (timeout);
} else
wait.SpinOnce ();
}
} while (true);
return true;
}
public WaitHandle AvailableWaitHandle {
get {
return handle;
}
}
public Task WaitAsync ()
{
return Task.Factory.StartNew (() => Wait ());
}
public Task WaitAsync (CancellationToken cancellationToken)
{
return Task.Factory.StartNew (() => Wait (cancellationToken), cancellationToken);
}
public Task<bool> WaitAsync (int millisecondsTimeout)
{
return Task.Factory.StartNew (() => Wait (millisecondsTimeout));
}
public Task<bool> WaitAsync (TimeSpan timeout)
{
return Task.Factory.StartNew (() => Wait (timeout));
}
public Task<bool> WaitAsync (int millisecondsTimeout, CancellationToken cancellationToken)
{
return Task.Factory.StartNew (() => Wait (millisecondsTimeout, cancellationToken), cancellationToken);
}
public Task<bool> WaitAsync (TimeSpan timeout, CancellationToken cancellationToken)
{
return Task.Factory.StartNew (() => Wait (timeout, cancellationToken), cancellationToken);
}
}
}
更新。net Core 5答案:
假设我想要一个最多有10个请求的锁,但大多数情况下我只想要1个。
private readonly static SemaphoreSlim semLock = new(1, 10);
现在当我想要释放一些资源时,我可以这样做:
semLock.Release(Math.Min(9, requiredAmount));
注意,9比10少1,因为我们已经有了一个初始版本。
当我想再次限制可用资源时,我可以调用:while(semLock.CurrentCount > 1)
{
await semLock.WaitAsync();
}
将等待它返回到1