带锁定的 Redis 分布式增量

本文关键字:分布式 Redis 锁定 | 更新日期: 2023-09-27 18:30:52

我需要生成一个计数器,该计数器将发送到一些api调用。我的应用程序在多个节点上运行,所以我想生成唯一的计数器。我尝试过以下代码

public static long GetTransactionCountForUser(int telcoId)
{
    long valreturn = 0;
    string key = "TelcoId:" + telcoId + ":Sequence";
    if (Muxer != null && Muxer.IsConnected && (Muxer.GetDatabase()) != null)
    {
        IDatabase db = Muxer.GetDatabase();
        var val = db.StringGet(key);
        int maxVal = 999;
        if (Convert.ToInt32(val) < maxVal)
        {
            valreturn = db.StringIncrement(key);
        }
        else
        {
            bool isdone = db.StringSet(key, valreturn);
            //db.SetAdd(key,new RedisValue) .StringIncrement(key, Convert.ToDouble(val))
        }
    }
    return valreturn;
}

并通过任务并行库对其进行了运行测试。当我有边界值时,我看到的是设置了多次 0 条目

请让我知道我需要做什么更正

更新:我的最终逻辑如下

public static long GetSequenceNumberForTelcoApiCallViaLuaScript(int telcoId)
{
    long valreturn = 0;
    long maxIncrement = 9999;//todo via configuration
    if (true)//todo via configuration
    {
        IDatabase db;
        string key = "TelcoId:" + telcoId + ":SequenceNumber";
        if (Muxer != null && Muxer.IsConnected && (db = Muxer.GetDatabase()) != null)
        {
            valreturn = (long)db.ScriptEvaluate(@"
                local result = redis.call('incr', KEYS[1])
                if result > tonumber(ARGV[1]) then
                result = 1
                redis.call('set', KEYS[1], result)
                end
                return result", new RedisKey[] { key }, flags: CommandFlags.HighPriority, values: new RedisValue[] { maxIncrement });
        }
    }
    return valreturn;
}

带锁定的 Redis 分布式增量

事实上,你的代码在翻转边界附近是不安全的,因为你正在做一个"get",(延迟和思考),"设置" - 没有检查你的"get"中的条件是否仍然适用。如果服务器在项目1000左右忙碌,则可以获得各种疯狂的输出,包括以下内容:

1
2
...
999
1000 // when "get" returns 998, so you do an incr
1001 // ditto
1002 // ditto
0 // when "get" returns 999 or above, so you do a set
0 // ditto
0 // ditto
1

选项:

  1. 使用事务和约束 API 使逻辑并发安全
  2. 通过ScriptEvaluate将逻辑重写为 Lua 脚本

现在,Redis 事务(根据选项 1)很难。就个人而言,我会使用"2" - 除了编码和调试更简单之外,这意味着您只有 1 次往返和操作,而不是"获取、监视、获取、多、输入/设置、执行/丢弃",以及"从头开始重试"循环来说明中止场景。如果您愿意,我可以尝试为您将其写为 Lua - 它应该大约 4 行。


下面是 Lua 实现:

string key = ...
for(int i = 0; i < 2000; i++) // just a test loop for me; you'd only do it once etc
{
    int result = (int) db.ScriptEvaluate(@"
local result = redis.call('incr', KEYS[1])
if result > 999 then
    result = 0
    redis.call('set', KEYS[1], result)
end
return result", new RedisKey[] { key });
    Console.WriteLine(result);
}

注意:如果需要参数化最大值,请使用:

if result > tonumber(ARGV[1]) then

和:

int result = (int)db.ScriptEvaluate(...,
    new RedisKey[] { key }, new RedisValue[] { max });

(所以ARGV[1]max中获取值)

有必要了解eval/evalsha(这就是ScriptEvaluate所说的)不会与其他服务器请求竞争,因此incr和可能的set之间没有任何变化。这意味着我们不需要复杂的watch等逻辑。

这是通过事务/约束API相同的(我认为!):

static int IncrementAndLoopToZero(IDatabase db, RedisKey key, int max)
{
    int result;
    bool success;
    do
    {
        RedisValue current = db.StringGet(key);
        var tran = db.CreateTransaction();
        // assert hasn't changed - note this handles "not exists" correctly
        tran.AddCondition(Condition.StringEqual(key, current));
        if(((int)current) > max)
        {
            result = 0;
            tran.StringSetAsync(key, result, flags: CommandFlags.FireAndForget);
        }
        else
        {
            result = ((int)current) + 1;
            tran.StringIncrementAsync(key, flags: CommandFlags.FireAndForget);
        }
        success = tran.Execute(); // if assertion fails, returns false and aborts
    } while (!success); // and if it aborts, we need to redo
    return result;
}

很复杂,嗯?这里简单的成功案例是:

GET {key}    # get the current value
WATCH {key}  # assertion stating that {key} should be guarded
GET {key}    # used by the assertion to check the value
MULTI        # begin a block
INCR {key}   # increment {key}
EXEC         # execute the block *if WATCH is happy*

这是...相当多的工作,并且涉及多路复用器上的管道停滞。更复杂的情况(断言失败、监视失败、环绕)的输出会略有不同,但应该可以工作。

您可以使用 WATCH 命令 - 这样,如果值发生变化,您将收到通知