PubSub如何在BookSleeve/ Redis中工作

本文关键字:Redis 工作 BookSleeve PubSub | 更新日期: 2023-09-27 18:07:58

我想知道使用BookSleeve发布和订阅频道的最佳方法是什么。我目前实现了几个静态方法(见下文),这些方法允许我将内容发布到特定的通道,并将新创建的通道存储在private static Dictionary<string, RedisSubscriberConnection> subscribedChannels;中。

如果我想在同一个应用程序中向通道发布和订阅通道(注意:我的包装器是一个静态类),这是正确的方法吗?即使我想要发布和订阅,创建一个频道是否足够?显然,我不会发布到与我在同一应用程序中订阅的频道相同的频道。但我测试了它,它工作:

 RedisClient.SubscribeToChannel("Test").Wait();
 RedisClient.Publish("Test", "Test Message");

,它成功了。

这是我的问题:

1)设置一个专用的发布通道和一个专用的订阅通道比同时使用一个通道更有效吗?

2)"channel"answers"PatternSubscription"在语义上有什么区别?我的理解是我可以通过同一频道的PatternSubscription()订阅几个"主题",对吗?但是,如果我想为每个"主题"调用不同的回调,我就必须为每个主题设置一个通道,对吗?这是有效的还是你会反对?

下面是代码片段。

谢谢! !

    public static Task<long> Publish(string channel, byte[] message)
    {
        return connection.Publish(channel, message);
    }
    public static Task SubscribeToChannel(string channelName)
    {
        string subscriptionString = ChannelSubscriptionString(channelName);
        RedisSubscriberConnection channel = connection.GetOpenSubscriberChannel();
        subscribedChannels[subscriptionString] = channel;
        return channel.PatternSubscribe(subscriptionString, OnSubscribedChannelMessage);
    }
    public static Task UnsubscribeFromChannel(string channelName)
    {
        string subscriptionString = ChannelSubscriptionString(channelName);
        if (subscribedChannels.Keys.Contains(subscriptionString))
        {
            RedisSubscriberConnection channel = subscribedChannels[subscriptionString];
            Task  task = channel.PatternUnsubscribe(subscriptionString);
            //remove channel subscription
            channel.Close(true);
            subscribedChannels.Remove(subscriptionString);
            return task;
        }
        else
        {
            return null;
        }
    }
    private static string ChannelSubscriptionString(string channelName)
    {
        return channelName + "*";
    }

PubSub如何在BookSleeve/ Redis中工作

1:在你的例子中只有一个通道(Test);信道只是用于特定发布/订阅交换的名称。然而,由于redis API的具体工作方式,有必要使用2个连接。一个有任何订阅的连接不能做任何事情,除了:

  • 监听消息
  • 管理自己的订阅(subscribe, psubscribe, unsubscribe, punsubscribe)

然而,我不明白这个:

private static Dictionary<string, RedisSubscriberConnection>

你不应该需要多于一个的订阅者连接,除非你正在迎合一些特定的东西。单个订阅者连接可以处理任意数量的订阅。快速检查一下我的一个服务器上的client list,我有一个连接(在撰写本文时)23,002个订阅。这可能会减少,但是:它是有效的。

2:模式订阅支持通配符;所以你可以订阅/topic/*,而不是订阅/topic/1, /topic/2/等。publish使用的实际通道的名称作为回调签名的一部分提供给接收方。

都可以。应该注意的是,publish的性能受到唯一订阅的总数的影响,但坦率地说,即使您使用subscribe而不是psubscribe有成千上万的订阅通道,它仍然快得惊人(如:0ms)。

But from publish

时间复杂度:O(N+M),其中N为订阅接收通道的客户端数量,M为订阅模式的总数(任何客户端)。

我建议阅读pub/sub的redis文档。


编辑后续问题:

a)我假设我必须同步"发布"(使用Result或Wait()),如果我想保证在接收项目时保留从同一发布者发送项目的顺序,正确吗?

不会有任何区别;既然你提到了Result/Wait(),我假设你说的是BookSleeve——在这种情况下,多路复用器已经保留了命令顺序。Redis本身是单线程的,并且总是按顺序在单个连接上处理命令。然而,订阅者上的回调可以异步执行,并且可以(单独)传递给工作线程。我目前正在调查是否可以从RedisSubscriberConnection强制执行此命令。

更新:从1.3.22开始,你可以将CompletionMode设置为PreserveOrder -然后所有回调将顺序完成,而不是并发。

b)根据你的建议做出调整后,我得到了一个很好的性能时,无论有效载荷的大小发布几个项目。然而,当同一发布者发送100,000或更多的条目时,性能会迅速下降(从我的机器发送到7-8秒)。

首先,这个时间听起来很高-在本地测试我得到(对于100,000个出版物,包括等待所有出版物的响应)1766ms(本地)或1219ms(远程)(这可能听起来违反直觉,但我的"本地"没有运行相同版本的redis;我的"远程"是2.6.12在Centos;我的"本地"是2.6.8-pre2 (Windows).

我不能让你的实际服务器更快或加快网络速度,但是:以防这是数据包碎片,我已经添加了(只为你)一个SuspendFlush()/ResumeFlush()对。这将禁用急于刷新(即当发送队列为空时;其他类型的冲洗仍然会发生);您可能会发现这很有帮助:

conn.SuspendFlush();
try {
    // start lots of operations...
} finally {
    conn.ResumeFlush();
}

请注意,在恢复之前不应该执行Wait,因为在调用ResumeFlush()之前,发送缓冲区中可能仍有一些操作。有了这些,我得到(100,000个操作):

local: 1766ms (eager-flush) vs 1554ms (suspend-flush)
remote: 1219ms (eager-flush) vs 796ms (suspend-flush)

正如您所看到的,它对远程服务器更有帮助,因为它将通过网络发送更少的数据包。

我不能使用事务,因为稍后要发布的项目不是一次全部可用。是否有一种方法可以根据这些知识进行优化?

认为上面已经解决了-但请注意最近也添加了CreateBatch。批处理的操作很像事务——只是没有事务。同样,这是另一种减少数据包碎片的机制。在您的特殊情况下,我怀疑suspend/resume (on flush)是您最好的选择。

您是否建议使用一个通用RedisConnection和一个RedisSubscriberConnection或任何其他配置来让这样的包装器执行所需的功能?

只要你不执行阻塞操作(blpop, brpop, brpoplpush等),或者把超大的blob放在电线上(可能会延迟其他操作,而它清除),那么每种类型的单个连接通常工作得很好。但是YMMV取决于你的确切使用要求。