带锁定的 Redis 分布式增量
本文关键字:分布式 Redis 锁定 | 更新日期: 2023-09-27 18:30:52
我需要生成一个计数器,该计数器将发送到一些api调用。我的应用程序在多个节点上运行,所以我想生成唯一的计数器。我尝试过以下代码
public static long GetTransactionCountForUser(int telcoId)
{
long valreturn = 0;
string key = "TelcoId:" + telcoId + ":Sequence";
if (Muxer != null && Muxer.IsConnected && (Muxer.GetDatabase()) != null)
{
IDatabase db = Muxer.GetDatabase();
var val = db.StringGet(key);
int maxVal = 999;
if (Convert.ToInt32(val) < maxVal)
{
valreturn = db.StringIncrement(key);
}
else
{
bool isdone = db.StringSet(key, valreturn);
//db.SetAdd(key,new RedisValue) .StringIncrement(key, Convert.ToDouble(val))
}
}
return valreturn;
}
并通过任务并行库对其进行了运行测试。当我有边界值时,我看到的是设置了多次 0 条目
请让我知道我需要做什么更正
更新:我的最终逻辑如下
public static long GetSequenceNumberForTelcoApiCallViaLuaScript(int telcoId)
{
long valreturn = 0;
long maxIncrement = 9999;//todo via configuration
if (true)//todo via configuration
{
IDatabase db;
string key = "TelcoId:" + telcoId + ":SequenceNumber";
if (Muxer != null && Muxer.IsConnected && (db = Muxer.GetDatabase()) != null)
{
valreturn = (long)db.ScriptEvaluate(@"
local result = redis.call('incr', KEYS[1])
if result > tonumber(ARGV[1]) then
result = 1
redis.call('set', KEYS[1], result)
end
return result", new RedisKey[] { key }, flags: CommandFlags.HighPriority, values: new RedisValue[] { maxIncrement });
}
}
return valreturn;
}
事实上,你的代码在翻转边界附近是不安全的,因为你正在做一个"get",(延迟和思考),"设置" - 没有检查你的"get"中的条件是否仍然适用。如果服务器在项目1000左右忙碌,则可以获得各种疯狂的输出,包括以下内容:
1
2
...
999
1000 // when "get" returns 998, so you do an incr
1001 // ditto
1002 // ditto
0 // when "get" returns 999 or above, so you do a set
0 // ditto
0 // ditto
1
选项:
- 使用事务和约束 API 使逻辑并发安全
- 通过
ScriptEvaluate
将逻辑重写为 Lua 脚本
现在,Redis 事务(根据选项 1)很难。就个人而言,我会使用"2" - 除了编码和调试更简单之外,这意味着您只有 1 次往返和操作,而不是"获取、监视、获取、多、输入/设置、执行/丢弃",以及"从头开始重试"循环来说明中止场景。如果您愿意,我可以尝试为您将其写为 Lua - 它应该大约 4 行。
下面是 Lua 实现:
string key = ...
for(int i = 0; i < 2000; i++) // just a test loop for me; you'd only do it once etc
{
int result = (int) db.ScriptEvaluate(@"
local result = redis.call('incr', KEYS[1])
if result > 999 then
result = 0
redis.call('set', KEYS[1], result)
end
return result", new RedisKey[] { key });
Console.WriteLine(result);
}
注意:如果需要参数化最大值,请使用:
if result > tonumber(ARGV[1]) then
和:
int result = (int)db.ScriptEvaluate(...,
new RedisKey[] { key }, new RedisValue[] { max });
(所以ARGV[1]
从max
中获取值)
有必要了解eval
/evalsha
(这就是ScriptEvaluate
所说的)不会与其他服务器请求竞争,因此incr
和可能的set
之间没有任何变化。这意味着我们不需要复杂的watch
等逻辑。
这是通过事务/约束API相同的(我认为!):
static int IncrementAndLoopToZero(IDatabase db, RedisKey key, int max)
{
int result;
bool success;
do
{
RedisValue current = db.StringGet(key);
var tran = db.CreateTransaction();
// assert hasn't changed - note this handles "not exists" correctly
tran.AddCondition(Condition.StringEqual(key, current));
if(((int)current) > max)
{
result = 0;
tran.StringSetAsync(key, result, flags: CommandFlags.FireAndForget);
}
else
{
result = ((int)current) + 1;
tran.StringIncrementAsync(key, flags: CommandFlags.FireAndForget);
}
success = tran.Execute(); // if assertion fails, returns false and aborts
} while (!success); // and if it aborts, we need to redo
return result;
}
很复杂,嗯?这里简单的成功案例是:
GET {key} # get the current value
WATCH {key} # assertion stating that {key} should be guarded
GET {key} # used by the assertion to check the value
MULTI # begin a block
INCR {key} # increment {key}
EXEC # execute the block *if WATCH is happy*
这是...相当多的工作,并且涉及多路复用器上的管道停滞。更复杂的情况(断言失败、监视失败、环绕)的输出会略有不同,但应该可以工作。
您可以使用 WATCH 命令 - 这样,如果值发生变化,您将收到通知