“Redis:发布/订阅(pub/sub)”的版本间差异

来自Wikioe
跳到导航 跳到搜索
(Eijux移动页面Redis:发布/订阅Redis:发布/订阅(pub/sub),不留重定向)
 
(未显示同一用户的11个中间版本)
第3行: 第3行:
__TOC__
__TOC__


== 什么是发布订阅? ==
== 关于 ==
Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(publish)发送消息,订阅者(subscribe)接收消息。
SUBSCRIBE 、 UNSUBSCRIBE 和 PUBLISH 三个命令实现了发布与订阅信息泛型(Publish/Subscribe messaging paradigm),在这个实现中:
* 发布订阅也叫“'''生产者消费者'''”模式,是实现“'''消息队列'''”的一种方式;
1、发送者(发送信息的客户端)不是将信息直接发送给特定的接收者(接收信息的客户端),而是将信息发送给频道(channel),然后由频道将信息转发给所有对这个频道感兴趣的订阅者。
2、发送者无须知道任何关于订阅者的信息,而订阅者也无须知道是那个客户端给它发送信息,它只要关注自己感兴趣的频道即可。
对发布者和订阅者进行解构(decoupling), 可以极大地提高系统的扩展性(scalability), 并得到一个更动态的网络拓扑(network topology)。
 
发布订阅(pub/sub)是一种消息通信模式:发送者(publish)发送消息,订阅者(subscribe)接收消息。
:[[File:redis发布.png|left|thumb|200px|发布]] [[File:reids订阅.png|none|thumb|300px|订阅]]
 
 
=== “发布订阅”与“消息队列” ===
发布订阅也叫“'''生产者消费者'''”模式,是实现“'''消息队列'''”的一种方式。基于消息队列,可以实现系统解耦、削峰填谷,顶住流量洪峰。
* redis 的主业目前是基于键值对的数据存储、缓存等,消息队列可能是 redis 的一种尝试;
* 常用的流行的消息队列有:“'''ActiveMQ'''”、“'''RabbitMQ'''”等;




第13行: 第25行:
# 消息服务(broker)
# 消息服务(broker)


 
== Pub/Sub 命令 ==
发布:
正在订阅频道的客户端不应该发送除下述之外的其他命令:
:[[File:redis发布.png|200px]]
订阅:
:[[File:reids订阅.png|200px]]
 
== Redis发布和订阅 ==
发布订阅是'''消息队列'''的一种方式,基于消息队列的方式,可以实现系统解耦、削峰填谷,顶住流量洪峰;
* redis 的主业目前是基于键值对的数据存储、缓存等,消息队列可能是 redis 的一种尝试;
* 常用的流行的消息队列有:“'''ActiveMQ'''”、“'''RabbitMQ'''”等;
 
=== 常用命令 ===
{| class="wikitable"
{| class="wikitable"
! Option Name !! Description !! Introduced !! Deprecated
|+ Redis:发布/订阅 命令
! !! 命令 !! 描述
|-
|-
| SUBSCRIBE channel [channel ...] || '''订阅'''给定的一个或多个频道的信息。
| rowspan="2" | 订阅频道
| SUBSCRIBE <channel> [<channel> ...] || '''订阅'''给定的一个或多个频道的信息。
* 返回值:“信息”(参见下节“信息的格式”);
|-
|-
| PSUBSCRIBE pattern [pattern ...] || '''订阅'''一个或多个符合给定模式的频道。
| UNSUBSCRIBE [<channel> [<channel> ...]] || '''退订'''给定的频道。
* 返回值在不同的客户端中有不同的表现;
|-
|-
| UNSUBSCRIBE [channel [channel ...]] || '''退订'''给定的频道。
| rowspan="2" | 订阅模式
| PSUBSCRIBE <pattern> [<pattern> ...] || '''订阅'''一个或多个符合给定模式的频道。
* 返回值:“信息”;
|-
|-
| PUNSUBSCRIBE [pattern [pattern ...]] || '''退订'''所有给定模式的频道。
| PUNSUBSCRIBE [<pattern> [<pattern> ...]] || '''退订'''给定模式的频道。
* 返回值在不同的客户端中有不同的表现;
|-
|-
| PUBLISH channel message || 将信息发送到指定的频道。
| 发布
| PUBLISH <channel> <message> || 将信息'''发布'''到指定的频道。
* 返回值:接收到信息 <message> 的订阅者数量。
|-
|-
| PUBSUB subcommand [argument [argument ...]] || 查看订阅与发布系统状态。
| 系统状态
| PUBSUB <subcommand> [<argument> [<argument> ...]] || 它是一个查看订阅与发布系统状态的内省命令,由数个不同格式的子命令组成。
|}
|}
* 客户端订阅的模式里面可以包含多个 glob 风格的通配符, 比如: '''*''' 、 '''?''' 和 '''[...]''',等等。
* 在执行 SUBSCRIBE、UNSUBSCRIBE、PSUBSCRIBE 和 PUNSUBSCRIBE 命令时,返回结果的最后一个元素是客户端目前'''仍在订阅的频道和模式总数'''。
** 当客户端退订所有频道和模式,也即是这个总数值下降为 0 的时候,客户端将退出订阅与发布状态。
=== PUBSUB ===
PUBSUB 有多个可用的子命令:
# <syntaxhighlight lang="bash" highlight="">
# 列出当前的活跃频道:
PUBSUB CHANNELS [pattern]
# 返回值:一个由活跃频道组成的列表。
</syntaxhighlight>
#* 活跃频道:指的是那些至少有一个订阅者的频道(订阅模式的客户端不计算在内)。
#* 如果给出 pattern 参数,那么只列出和 pattern 相匹配的活跃频道,否则列出订阅与发布系统中的所有活跃频道。
#* 即使一个频道有多个订阅者,它也只输出一次。
#: 示例:
#: <syntaxhighlight lang="bash" highlight="">
# client-1 订阅 news.it 和 news.sport 两个频道
# client-2 订阅 news.it 和 news.internet 两个频道
# 使用 client-3 打印所有活跃频道:
redis> PUBSUB CHANNELS
1) "news.sport"
2) "news.internet"
3) "news.it"
# 打印匹配的活跃频道:
redis> PUBSUB CHANNELS news.i*
1) "news.internet"
2) "news.it"
</syntaxhighlight>
# <syntaxhighlight lang="bash" highlight="">
# 返回给定频道的订阅者数量(订阅模式的客户端不计算在内):
PUBSUB NUMSUB [channel-1 ... channel-N]
# 返回值:一个多条批量回复(Multi-bulk reply),回复中包含给定的频道,以及频道的订阅者数量。
#      格式为“频道channel-1,channel-1的订阅者数量,频道channel-2,channel-2的订阅者数量 . . .”。
</syntaxhighlight>
#* 回复中频道的排列顺序和执行命令时给定频道的排列顺序一致。
#* 不给定任何频道而直接调用这个命令也是可以的,在这种情况下,命令只返回一个空列表。
#: 示例:
#: <syntaxhighlight lang="bash" highlight="">
# client-1 订阅 news.it 和 news.sport 两个频道
# client-2 订阅 news.it 和 news.internet 两个频道
# 使用 client-3 打印各个频道的订阅者数量:
redis> PUBSUB NUMSUB news.it news.internet news.sport news.music
1) "news.it"    # 频道
2) "2"          # 订阅该频道的客户端数量
3) "news.internet"
4) "1"
5) "news.sport"
6) "1"
7) "news.music" # 没有任何订阅者
8) "0"
</syntaxhighlight>
# <syntaxhighlight lang="bash" highlight="">
# 返回订阅模式的数量:
PUBSUB NUMPAT
# 返回值:一个整数回复(Integer reply)。
</syntaxhighlight>
#* 这个命令返回的不是订阅模式的客户端的数量,而是“所有客户端订阅的模式的数量总和”(多个客户端订阅相同的模式时,会被重复计算)。
#: 示例:
#: <syntaxhighlight lang="bash" highlight="">
# client-1 订阅 news.* 和 discount.* 两个模式
# client-2 订阅 tweet.* 一个模式
# 使用 client-3 打印当前订阅模式的数量:
redis> PUBSUB NUMPAT
(integer) 3
# 再新建一个客户端 client-4 ,让它也订阅 news.* 频道
# 再使用 client-3 打印当前订阅模式的数量:
redis> PUBSUB NUMPAT
(integer) 4
</syntaxhighlight>
== 频道和模式 ==
=== 订阅频道 ===
<syntaxhighlight lang="java" highlight="">
// 订阅
subscribe <channel> [<channel> ...]
// 发布
publish <channel> <message>
</syntaxhighlight>


=== 模拟实现 ===
(以命令行模拟实现):
(以命令行模拟实现):
# 开启 4 个 redis 客户端(“./redis-cli”),如上图,3 个客户端作为消息订阅者,1 个为消息发布者:
# 开启 4 个 redis 客户端(“./redis-cli”),如上图,3 个客户端作为消息订阅者,1 个为消息发布者:
# 让 3 个消息订阅者订阅某个频道主题:“'''<code>subscribe channelTest</code>'''”;
# 让 3 个消息订阅者订阅某个频道主题:“'''<code>subscribe channelTest</code>'''”;
#: <syntaxhighlight lang="java" highlight="">
subscribe channel [channel ...]
</syntaxhighlight>
#: [[File:redis订阅channelTest.jpg|800px]]
#: [[File:redis订阅channelTest.jpg|800px]]
# 让1个消息发布者向频道主题上发布消息:“'''<code>publish channelTest message123</code>'''”;
# 让 1 个消息发布者向频道主题上发布消息:“'''<code>publish channelTest message123</code>'''”;
#: <syntaxhighlight lang="java" highlight="">
publish channel message
</syntaxhighlight>
#: [[File:redis发布消息到channelTest.jpg|800px]]
#: [[File:redis发布消息到channelTest.jpg|800px]]


=== 订阅模式 ===
<syntaxhighlight lang="java" highlight="">
// 订阅
psubscribe <pattern> [<pattern> ...]


// 发布
publish <channel> <message>
</syntaxhighlight>


 
(以命令行模拟实现):
如果是订阅匹配模式的频道主题:“'''<code>psubscribe chan*</code>'''”
# 订阅匹配模式的频道:“'''<code>psubscribe chan*</code>'''”
# 订阅:
#: <syntaxhighlight lang="java" highlight="">
psubscribe pattern [pattern ...]
</syntaxhighlight>
#:[[File:redis订阅(psubscribe).jpg|800px]]
#:[[File:redis订阅(psubscribe).jpg|800px]]
# 发布:
# 发布:
#:[[File:redis发布(psubscribe).jpg|800px]]
#:[[File:redis发布(psubscribe).jpg|800px]]


== Redis Stream【???】 ==
== 信息的格式 ==
<pre>
频道转发的每条信息都是一条带有三个元素的多条批量回复(multi-bulk reply)。
Redis 本身是有一个 Redis 发布订阅 (pub/sub) 来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。
 
简单来说发布订阅 (pub/sub) 可以分发消息,但无法记录历史消息。
</pre>
 
Redis Stream 提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。
* Redis Stream 主要用于消息队列(MQ,Message Queue).
* Redis Stream 是 Redis 5.0 版本新增加的数据结构。
 
=== 结构 ===
Redis Stream 的结构如下所示:


: [[File:Redis Stream结构.png|600px]]
信息类型:(信息的第一个元素)
它有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容:
# “'''subscribe'''”:表示当前客户端成功地订阅了(信息第二个元素所指示的)频道。
* 每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用 xadd 指令追加消息时自动创建。
#: 第二个元素:订阅的频道;
#: 第三个元素:客户端目前已订阅的频道数量。
# “'''unsubscribe'''”:表示当前客户端成功地退订了(信息第二个元素所指示的)频道。
#: 第二个元素:退订的频道;
#: 第三个元素:客户端目前仍在订阅的频道数量。
#* 当客户端订阅的频道数量降为 0 时,客户端不再订阅任何频道,它可以像往常一样,执行任何 Redis 命令。
# “'''message'''”:表示这条信息是由某个客户端执行 PUBLISH 命令所发送的,真正的信息。
#: 第二个元素:信息来源的频道;
#: 第三个元素:信息的内容。
# “'''pmessage '''”:表示有某个客户端通过 PUBLISH 向某个频道发送了信息,而这个频道刚好匹配了当前客户端所订阅的某个模式。
#: 第二个元素:被匹配的模式;
#: 第三个元素:被匹配的频道的名字;
#: 最后一个元素:信息的实际内容。


=== 命令 ===
消息队列相关命令:
{| class="wikitable"
! 命令 !! 描述
|-
| XADD || 添加消息到末尾
|-
| XDEL || 删除消息
|-
| XLEN || 获取流包含的元素数量,即消息长度
|-
| XTRIM || 对流进行修剪,限制长度
|-
| XRANGE || 获取消息列表,会自动过滤已经删除的消息
|-
| XREVRANGE || 反向获取消息列表,ID 从大到小
|-
| XREAD || 以阻塞或非阻塞方式获取消息列表
|}


消费者组相关命令:
示例:
{| class="wikitable"
# subscribe 类型的信息:
! 命令 !! 描述
#: <syntaxhighlight lang="bash" highlight="">
|-
redis> SUBSCRIBE first second
| XGROUP CREATE || 创建消费者组
1) "subscribe"
|-
2) "first"
| XGROUP DESTROY || 删除消费者组
3) (integer) 1
|-
| XGROUP DELCONSUMER || 删除消费者
|-
| XREADGROUP GROUP || 读取消费者组中的消息
|-
| XGROUP SETID || 为消费者组设置新的最后递送消息ID
|-
| XPENDING || 显示待处理消息的相关信息
|-
| XACK || 将消息标记为"已处理"
|-
| XCLAIM || 转移消息的归属权
|-
| XINFO || 查看流和消费者组的相关信息;
|-
| XINFO GROUPS || 打印消费者组的信息;
|-
| XINFO STREAM || 打印流信息
|}


==== 示例 ====
1) "subscribe"
消息队列相关命令:
2) "second"
# '''XADD''':
3) (integer) 2
#: 使用 XADD 向队列添加消息,如果指定的队列不存在,则创建一个队列。XADD 语法格式:
#: <syntaxhighlight lang="java" highlight="">
XADD key ID field value [field value ...]
</syntaxhighlight>
</syntaxhighlight>
#* key :'''队列名称''',如果不存在就创建。
# message 类型的信息:
#* ID :'''消息 id'''。用 * 表示由 redis 生成;可以自定义,但是要自己保证递增性。
#: 客户端B订阅频道、客户端A项频道发送数据后,客户端B所接受到信息
#* field value : '''记录'''。
#: <syntaxhighlight lang="bash" highlight="">
#: <syntaxhighlight lang="java" highlight="">
redis> SUBSCRIBE hello
redis> XADD mystream * name Sara surname OConnor
1) "subscribe"
"1601372323627-0"
2) "hello"
redis> XADD mystream * field1 value1 field2 value2 field3 value3
3) (integer) 1
"1601372323627-1"
1) "message"
redis> XLEN mystream
2) "second"
(integer) 2
3) "hello"
redis> XRANGE mystream - +
1) 1) "1601372323627-0"
  2) 1) "name"
      2) "Sara"
      3) "surname"
      4) "OConnor"
2) 1) "1601372323627-1"
  2) 1) "field1"
      2) "value1"
      3) "field2"
      4) "value2"
      5) "field3"
      6) "value3"
redis>
</syntaxhighlight>
</syntaxhighlight>
# '''XTRIM''':【???】
# unsubscribe 类型的信息:
#: 使用 XTRIM 对流进行修剪,限制长度。语法格式:
#: <syntaxhighlight lang="bash" highlight="">
#: <syntaxhighlight lang="java" highlight="">
redis> UNSUBSCRIBE
XTRIM key MAXLEN [~] count
1) "unsubscribe"
2) "second"
3) (integer) 1
 
1) "unsubscribe"
2) "first"
3) (integer) 0
</syntaxhighlight>
</syntaxhighlight>
#* key :队列名称
#* MAXLEN :长度
#* count :数量
#: <syntaxhighlight lang="java" highlight="">
127.0.0.1:6379> XADD mystream * field1 A field2 B field3 C field4 D
"1601372434568-0"
127.0.0.1:6379> XTRIM mystream MAXLEN 2
(integer) 0
127.0.0.1:6379> XRANGE mystream - +
1) 1) "1601372434568-0"
  2) 1) "field1"
      2) "A"
      3) "field2"
      4) "B"
      5) "field3"
      6) "C"
      7) "field4"
      8) "D"
127.0.0.1:6379>


redis>
== FAQ ==
</syntaxhighlight>
=== 通过频道和模式接收同一条信息 ===
# '''XDEL''':
如果客户端订阅的多个模式匹配了同一个频道,或者客户端同时订阅了某个频道、以及匹配这个频道的某个模式,那么它可能会多次接收到同一条信息。
#: 使用 XDEL 删除消息。语法格式:
#: <syntaxhighlight lang="java" highlight="">
XDEL key ID [ID ...]
</syntaxhighlight>
#* key:队列名称
#* ID :消息 ID
#: <syntaxhighlight lang="java" highlight="">
> XADD mystream * a 1
1538561698944-0
> XADD mystream * b 2
1538561700640-0
> XADD mystream * c 3
1538561701744-0
> XDEL mystream 1538561700640-0
(integer) 1
127.0.0.1:6379> XRANGE mystream - +
1) 1) 1538561698944-0
  2) 1) "a"
      2) "1"
2) 1) 1538561701744-0
  2) 1) "c"
      2) "3"
</syntaxhighlight>
# '''XLEN''':
#: 使用 XLEN 获取流包含的元素数量,即消息长度。语法格式:
#: <syntaxhighlight lang="java" highlight="">
XLEN key
</syntaxhighlight>
#* key:队列名称
#: <syntaxhighlight lang="java" highlight="">
redis> XADD mystream * item 1
"1601372563177-0"
redis> XADD mystream * item 2
"1601372563178-0"
redis> XADD mystream * item 3
"1601372563178-1"
redis> XLEN mystream
(integer) 3
redis>
</syntaxhighlight>
# '''XRANGE''':【按 id 顺序(即插入顺序)获取】
#: 使用 XRANGE 获取消息列表,会自动过滤已经删除的消息。语法格式:
#: <syntaxhighlight lang="java" highlight="">
XRANGE key start end [COUNT count]
</syntaxhighlight>
#* key :队列名
#* start :开始值, - 表示最小值
#* end :结束值, + 表示最大值
#* count :数量
#: <syntaxhighlight lang="java" highlight="">
redis> XADD writers * name Virginia surname Woolf
"1601372577811-0"
redis> XADD writers * name Jane surname Austen
"1601372577811-1"
redis> XADD writers * name Toni surname Morrison
"1601372577811-2"
redis> XADD writers * name Agatha surname Christie
"1601372577812-0"
redis> XADD writers * name Ngozi surname Adichie
"1601372577812-1"
redis> XLEN writers
(integer) 5
redis> XRANGE writers - + COUNT 2
1) 1) "1601372577811-0"
  2) 1) "name"
      2) "Virginia"
      3) "surname"
      4) "Woolf"
2) 1) "1601372577811-1"
  2) 1) "name"
      2) "Jane"
      3) "surname"
      4) "Austen"
redis>
</syntaxhighlight>
# '''XREVRANGE''':
#: 使用 XREVRANGE 获取消息列表,会自动过滤已经删除的消息。语法格式:
#: <syntaxhighlight lang="java" highlight="">
XREVRANGE key end start [COUNT count]
</syntaxhighlight>
#* key :队列名
#* end :结束值, + 表示最大值
#* start :开始值, - 表示最小值
#* count :数量
#: <syntaxhighlight lang="java" highlight="">
redis> XADD writers * name Virginia surname Woolf
"1601372731458-0"
redis> XADD writers * name Jane surname Austen
"1601372731459-0"
redis> XADD writers * name Toni surname Morrison
"1601372731459-1"
redis> XADD writers * name Agatha surname Christie
"1601372731459-2"
redis> XADD writers * name Ngozi surname Adichie
"1601372731459-3"
redis> XLEN writers
(integer) 5
redis> XREVRANGE writers + - COUNT 1
1) 1) "1601372731459-3"
  2) 1) "name"
      2) "Ngozi"
      3) "surname"
      4) "Adichie"
redis>
</syntaxhighlight>
# '''XREAD''':
#: 使用 XREAD 以'''阻塞或非阻塞'''方式获取消息列表。语法格式:
#: <syntaxhighlight lang="java" highlight="">
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
</syntaxhighlight>
#* count :数量
#* milliseconds :可选,阻塞毫秒数,没有设置就是非阻塞模式
#* key :队列名
#* id :消息 ID
#: <syntaxhighlight lang="java" highlight="">
# 从 Stream 头部读取两条消息
> XREAD COUNT 2 STREAMS mystream writers 0-0 0-0
1) 1) "mystream"
  2) 1) 1) 1526984818136-0
        2) 1) "duration"
            2) "1532"
            3) "event-id"
            4) "5"
            5) "user-id"
            6) "7782813"
      2) 1) 1526999352406-0
        2) 1) "duration"
            2) "812"
            3) "event-id"
            4) "9"
            5) "user-id"
            6) "388234"
2) 1) "writers"
  2) 1) 1) 1526985676425-0
        2) 1) "name"
            2) "Virginia"
            3) "surname"
            4) "Woolf"
      2) 1) 1526985685298-0
        2) 1) "name"
            2) "Jane"
            3) "surname"
            4) "Austen"
</syntaxhighlight>




消费者组相关命令:
示例:
# '''XGROUP CREATE''':
: 如果客户端执行了以下命令:
#: 使用 XGROUP CREATE 创建消费者组。语法格式:
: <syntaxhighlight lang="bash" highlight="">
#: <syntaxhighlight lang="java" highlight="">
SUBSCRIBE foo
XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
PSUBSCRIBE f*
</syntaxhighlight>
#* key :队列名称,如果不存在就创建
#* groupname :组名
#* $ : '''表示从尾部开始消费,只接受新消息,当前 Stream 消息会全部忽略'''
#: 从头开始消费:
#: <syntaxhighlight lang="java" highlight="">
XGROUP CREATE mystream consumer-group-name 0-0 
</syntaxhighlight>
#: 从尾开始消费:
#: <syntaxhighlight lang="java" highlight="">
XGROUP CREATE mystream consumer-group-name $
</syntaxhighlight>
# '''XREADGROUP GROUP''':
#: 使用 XREADGROUP GROUP 读取消费组中的消息。语法格式:
#: <syntaxhighlight lang="java" highlight="">
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
</syntaxhighlight>
#* group :消费组名
#* consumer :消费者名
#* count : 读取数量
#* milliseconds : 阻塞毫秒数
#* key : 队列名
#* ID : 消息 ID
#: <syntaxhighlight lang="java" highlight="">
XREADGROUP GROUP consumer-group-name consumer-name COUNT 1 STREAMS mystream >
</syntaxhighlight>
</syntaxhighlight>
: 当有信息发送到频道 foo 时,客户端将收到两条信息:一条来自频道 foo,信息类型为 message;另一条来自模式 f*,信息类型为 pmessage 。

2021年11月3日 (三) 01:10的最新版本


关于

SUBSCRIBE 、 UNSUBSCRIBE 和 PUBLISH 三个命令实现了发布与订阅信息泛型(Publish/Subscribe messaging paradigm),在这个实现中:
1、发送者(发送信息的客户端)不是将信息直接发送给特定的接收者(接收信息的客户端),而是将信息发送给频道(channel),然后由频道将信息转发给所有对这个频道感兴趣的订阅者。
2、发送者无须知道任何关于订阅者的信息,而订阅者也无须知道是那个客户端给它发送信息,它只要关注自己感兴趣的频道即可。

对发布者和订阅者进行解构(decoupling), 可以极大地提高系统的扩展性(scalability), 并得到一个更动态的网络拓扑(network topology)。

发布订阅(pub/sub)是一种消息通信模式:发送者(publish)发送消息,订阅者(subscribe)接收消息。

发布
订阅


“发布订阅”与“消息队列”

发布订阅也叫“生产者消费者”模式,是实现“消息队列”的一种方式。基于消息队列,可以实现系统解耦、削峰填谷,顶住流量洪峰。

  • redis 的主业目前是基于键值对的数据存储、缓存等,消息队列可能是 redis 的一种尝试;
  • 常用的流行的消息队列有:“ActiveMQ”、“RabbitMQ”等;


消息队列的三要素:

  1. 生产者(producer)
  2. 消费者(consumer)
  3. 消息服务(broker)

Pub/Sub 命令

正在订阅频道的客户端不应该发送除下述之外的其他命令:

Redis:发布/订阅 命令
命令 描述
订阅频道 SUBSCRIBE <channel> [<channel> ...] 订阅给定的一个或多个频道的信息。
  • 返回值:“信息”(参见下节“信息的格式”);
UNSUBSCRIBE [<channel> [<channel> ...]] 退订给定的频道。
  • 返回值在不同的客户端中有不同的表现;
订阅模式 PSUBSCRIBE <pattern> [<pattern> ...] 订阅一个或多个符合给定模式的频道。
  • 返回值:“信息”;
PUNSUBSCRIBE [<pattern> [<pattern> ...]] 退订给定模式的频道。
  • 返回值在不同的客户端中有不同的表现;
发布 PUBLISH <channel> <message> 将信息发布到指定的频道。
  • 返回值:接收到信息 <message> 的订阅者数量。
系统状态 PUBSUB <subcommand> [<argument> [<argument> ...]] 它是一个查看订阅与发布系统状态的内省命令,由数个不同格式的子命令组成。
  • 客户端订阅的模式里面可以包含多个 glob 风格的通配符, 比如: *?[...],等等。
  • 在执行 SUBSCRIBE、UNSUBSCRIBE、PSUBSCRIBE 和 PUNSUBSCRIBE 命令时,返回结果的最后一个元素是客户端目前仍在订阅的频道和模式总数
    • 当客户端退订所有频道和模式,也即是这个总数值下降为 0 的时候,客户端将退出订阅与发布状态。

PUBSUB

PUBSUB 有多个可用的子命令:

  1. # 列出当前的活跃频道:
    PUBSUB CHANNELS [pattern]
    
    # 返回值:一个由活跃频道组成的列表。
    
    • 活跃频道:指的是那些至少有一个订阅者的频道(订阅模式的客户端不计算在内)。
    • 如果给出 pattern 参数,那么只列出和 pattern 相匹配的活跃频道,否则列出订阅与发布系统中的所有活跃频道。
    • 即使一个频道有多个订阅者,它也只输出一次。
    示例:
    # client-1 订阅 news.it 和 news.sport 两个频道
    # client-2 订阅 news.it 和 news.internet 两个频道
    
    # 使用 client-3 打印所有活跃频道:
    redis> PUBSUB CHANNELS
    1) "news.sport"
    2) "news.internet"
    3) "news.it"
    
    # 打印匹配的活跃频道:
    redis> PUBSUB CHANNELS news.i*
    1) "news.internet"
    2) "news.it"
    
  2. # 返回给定频道的订阅者数量(订阅模式的客户端不计算在内):
    PUBSUB NUMSUB [channel-1 ... channel-N]
    
    # 返回值:一个多条批量回复(Multi-bulk reply),回复中包含给定的频道,以及频道的订阅者数量。
    #      格式为“频道channel-1,channel-1的订阅者数量,频道channel-2,channel-2的订阅者数量 . . .”。
    
    • 回复中频道的排列顺序和执行命令时给定频道的排列顺序一致。
    • 不给定任何频道而直接调用这个命令也是可以的,在这种情况下,命令只返回一个空列表。
    示例:
    # client-1 订阅 news.it 和 news.sport 两个频道
    # client-2 订阅 news.it 和 news.internet 两个频道
    
    # 使用 client-3 打印各个频道的订阅者数量:
    redis> PUBSUB NUMSUB news.it news.internet news.sport news.music
    1) "news.it"    # 频道
    2) "2"          # 订阅该频道的客户端数量
    3) "news.internet"
    4) "1"
    5) "news.sport"
    6) "1"
    7) "news.music" # 没有任何订阅者
    8) "0"
    
  3. # 返回订阅模式的数量:
    PUBSUB NUMPAT
    
    # 返回值:一个整数回复(Integer reply)。
    
    • 这个命令返回的不是订阅模式的客户端的数量,而是“所有客户端订阅的模式的数量总和”(多个客户端订阅相同的模式时,会被重复计算)。
    示例:
    # client-1 订阅 news.* 和 discount.* 两个模式
    # client-2 订阅 tweet.* 一个模式
    
    # 使用 client-3 打印当前订阅模式的数量:
    redis> PUBSUB NUMPAT
    (integer) 3
    
    # 再新建一个客户端 client-4 ,让它也订阅 news.* 频道
    
    # 再使用 client-3 打印当前订阅模式的数量:
    redis> PUBSUB NUMPAT
    (integer) 4
    

频道和模式

订阅频道

// 订阅
subscribe <channel> [<channel> ...]

// 发布
publish <channel> <message>

(以命令行模拟实现):

  1. 开启 4 个 redis 客户端(“./redis-cli”),如上图,3 个客户端作为消息订阅者,1 个为消息发布者:
  2. 让 3 个消息订阅者订阅某个频道主题:“subscribe channelTest”;
    Redis订阅channelTest.jpg
  3. 让 1 个消息发布者向频道主题上发布消息:“publish channelTest message123”;
    Redis发布消息到channelTest.jpg

订阅模式

// 订阅
psubscribe <pattern> [<pattern> ...]

// 发布
publish <channel> <message>

(以命令行模拟实现):

  1. 订阅匹配模式的频道:“psubscribe chan*
    Redis订阅(psubscribe).jpg
  2. 发布:
    Redis发布(psubscribe).jpg

信息的格式

频道转发的每条信息都是一条带有三个元素的多条批量回复(multi-bulk reply)。

信息类型:(信息的第一个元素)

  1. subscribe”:表示当前客户端成功地订阅了(信息第二个元素所指示的)频道。
    第二个元素:订阅的频道;
    第三个元素:客户端目前已订阅的频道数量。
  2. unsubscribe”:表示当前客户端成功地退订了(信息第二个元素所指示的)频道。
    第二个元素:退订的频道;
    第三个元素:客户端目前仍在订阅的频道数量。
    • 当客户端订阅的频道数量降为 0 时,客户端不再订阅任何频道,它可以像往常一样,执行任何 Redis 命令。
  3. message”:表示这条信息是由某个客户端执行 PUBLISH 命令所发送的,真正的信息。
    第二个元素:信息来源的频道;
    第三个元素:信息的内容。
  4. pmessage ”:表示有某个客户端通过 PUBLISH 向某个频道发送了信息,而这个频道刚好匹配了当前客户端所订阅的某个模式。
    第二个元素:被匹配的模式;
    第三个元素:被匹配的频道的名字;
    最后一个元素:信息的实际内容。


示例:

  1. subscribe 类型的信息:
    redis> SUBSCRIBE first second
    1) "subscribe"
    2) "first"
    3) (integer) 1
    
    1) "subscribe"
    2) "second"
    3) (integer) 2
    
  2. message 类型的信息:
    客户端B订阅频道、客户端A项频道发送数据后,客户端B所接受到信息
    redis> SUBSCRIBE hello
    1) "subscribe"
    2) "hello"
    3) (integer) 1
    1) "message"
    2) "second"
    3) "hello"
    
  3. unsubscribe 类型的信息:
    redis> UNSUBSCRIBE
    1) "unsubscribe"
    2) "second"
    3) (integer) 1
    
    1) "unsubscribe"
    2) "first"
    3) (integer) 0
    

FAQ

通过频道和模式接收同一条信息

如果客户端订阅的多个模式匹配了同一个频道,或者客户端同时订阅了某个频道、以及匹配这个频道的某个模式,那么它可能会多次接收到同一条信息。


示例:

如果客户端执行了以下命令:
SUBSCRIBE foo
PSUBSCRIBE f*
当有信息发送到频道 foo 时,客户端将收到两条信息:一条来自频道 foo,信息类型为 message;另一条来自模式 f*,信息类型为 pmessage 。