“Redis:发布/订阅(pub/sub)”的版本间差异

来自Wikioe
跳到导航 跳到搜索
无编辑摘要
第23行: 第23行:
* redis 的主业目前是基于键值对的数据存储、缓存等,消息队列可能是 redis 的一种尝试;
* redis 的主业目前是基于键值对的数据存储、缓存等,消息队列可能是 redis 的一种尝试;
* 常用的流行的消息队列有:“'''ActiveMQ'''”、“'''RabbitMQ'''”等;
* 常用的流行的消息队列有:“'''ActiveMQ'''”、“'''RabbitMQ'''”等;
=== 常用命令 ===
{| class="wikitable"
! Option Name !! Description !! Introduced !! Deprecated
|-
| SUBSCRIBE channel [channel ...] || '''订阅'''给定的一个或多个频道的信息。
|-
| PSUBSCRIBE pattern [pattern ...] || '''订阅'''一个或多个符合给定模式的频道。
|-
| UNSUBSCRIBE [channel [channel ...]] || '''退订'''给定的频道。
|-
| PUNSUBSCRIBE [pattern [pattern ...]] || '''退订'''所有给定模式的频道。
|-
| PUBLISH channel message || 将信息发送到指定的频道。
|-
| PUBSUB subcommand [argument [argument ...]] || 查看订阅与发布系统状态。
|}


=== 模拟实现 ===
=== 模拟实现 ===
第49行: 第66行:
# 发布:
# 发布:
#:[[File:redis发布(psubscribe).jpg|800px]]
#:[[File:redis发布(psubscribe).jpg|800px]]
== Redis Stream ==
<pre>
Redis 本身是有一个 Redis 发布订阅 (pub/sub) 来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。
简单来说发布订阅 (pub/sub) 可以分发消息,但无法记录历史消息。
</pre>
Redis Stream 提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。
* Redis Stream 主要用于消息队列(MQ,Message Queue).
* Redis Stream 是 Redis 5.0 版本新增加的数据结构。
=== 结构 ===
Redis Stream 的结构如下所示:
: [[File:Redis Stream结构.png|400px]]
它有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容:
* 每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用 xadd 指令追加消息时自动创建。
=== 命令 ===
消息队列相关命令:
{| class="wikitable"
! Option Name !! Description !! Introduced !! Deprecated
|-
| XADD || 添加消息到末尾
|-
| XDEL || 删除消息
|-
| XLEN || 获取流包含的元素数量,即消息长度
|-
| XTRIM || 对流进行修剪,限制长度
|-
| XRANGE || 获取消息列表,会自动过滤已经删除的消息
|-
| XREVRANGE || 反向获取消息列表,ID 从大到小
|-
| XREAD || 以阻塞或非阻塞方式获取消息列表
|}
消费者组相关命令:
{| class="wikitable"
! Option Name !! Description !! Introduced !! Deprecated
|-
| XGROUP CREATE || 创建消费者组
|-
| XGROUP DESTROY || 删除消费者组
|-
| XGROUP DELCONSUMER || 删除消费者
|-
| XREADGROUP GROUP || 读取消费者组中的消息
|-
| XGROUP SETID || 为消费者组设置新的最后递送消息ID
|-
| XPENDING || 显示待处理消息的相关信息
|-
| XACK || 将消息标记为"已处理"
|-
| XCLAIM || 转移消息的归属权
|-
| XINFO || 查看流和消费者组的相关信息;
|-
| XINFO GROUPS || 打印消费者组的信息;
|-
| XINFO STREAM || 打印流信息
|}
==== 示例 ====
# '''XADD''':
#: 使用 XADD 向队列添加消息,如果指定的队列不存在,则创建一个队列。XADD 语法格式:
#: <syntaxhighlight lang="java" highlight="">
XADD key ID field value [field value ...]
</syntaxhighlight>
#* key :'''队列名称''',如果不存在就创建。
#* ID :'''消息 id'''。用 * 表示由 redis 生成;可以自定义,但是要自己保证递增性。
#* field value : '''记录'''。
#: <syntaxhighlight lang="java" highlight="">
redis> XADD mystream * name Sara surname OConnor
"1601372323627-0"
redis> XADD mystream * field1 value1 field2 value2 field3 value3
"1601372323627-1"
redis> XLEN mystream
(integer) 2
redis> XRANGE mystream - +
1) 1) "1601372323627-0"
  2) 1) "name"
      2) "Sara"
      3) "surname"
      4) "OConnor"
2) 1) "1601372323627-1"
  2) 1) "field1"
      2) "value1"
      3) "field2"
      4) "value2"
      5) "field3"
      6) "value3"
redis>
</syntaxhighlight>
# '''XTRIM''':【???】
#: 使用 XTRIM 对流进行修剪,限制长度。语法格式:
#: <syntaxhighlight lang="java" highlight="">
XTRIM key MAXLEN [~] count
</syntaxhighlight>
#* key :队列名称
#* MAXLEN :长度
#* count :数量
#: <syntaxhighlight lang="java" highlight="">
127.0.0.1:6379> XADD mystream * field1 A field2 B field3 C field4 D
"1601372434568-0"
127.0.0.1:6379> XTRIM mystream MAXLEN 2
(integer) 0
127.0.0.1:6379> XRANGE mystream - +
1) 1) "1601372434568-0"
  2) 1) "field1"
      2) "A"
      3) "field2"
      4) "B"
      5) "field3"
      6) "C"
      7) "field4"
      8) "D"
127.0.0.1:6379>
redis>
</syntaxhighlight>
# '''XDEL''':
#: 使用 XDEL 删除消息。语法格式:
#: <syntaxhighlight lang="java" highlight="">
XDEL key ID [ID ...]
</syntaxhighlight>
#* key:队列名称
#* ID :消息 ID
#: <syntaxhighlight lang="java" highlight="">
> XADD mystream * a 1
1538561698944-0
> XADD mystream * b 2
1538561700640-0
> XADD mystream * c 3
1538561701744-0
> XDEL mystream 1538561700640-0
(integer) 1
127.0.0.1:6379> XRANGE mystream - +
1) 1) 1538561698944-0
  2) 1) "a"
      2) "1"
2) 1) 1538561701744-0
  2) 1) "c"
      2) "3"
</syntaxhighlight>
# '''XLEN''':
#: 使用 XLEN 获取流包含的元素数量,即消息长度。语法格式:
#: <syntaxhighlight lang="java" highlight="">
XLEN key
</syntaxhighlight>
#* key:队列名称
#: <syntaxhighlight lang="java" highlight="">
redis> XADD mystream * item 1
"1601372563177-0"
redis> XADD mystream * item 2
"1601372563178-0"
redis> XADD mystream * item 3
"1601372563178-1"
redis> XLEN mystream
(integer) 3
redis>
</syntaxhighlight>
# '''XRANGE''':【???】
#: 使用 XRANGE 获取消息列表,会自动过滤已经删除的消息。语法格式:
#: <syntaxhighlight lang="java" highlight="">
XRANGE key start end [COUNT count]
</syntaxhighlight>
#* key :队列名
#* start :开始值, - 表示最小值
#* end :结束值, + 表示最大值
#* count :数量
#: <syntaxhighlight lang="java" highlight="">
redis> XADD writers * name Virginia surname Woolf
"1601372577811-0"
redis> XADD writers * name Jane surname Austen
"1601372577811-1"
redis> XADD writers * name Toni surname Morrison
"1601372577811-2"
redis> XADD writers * name Agatha surname Christie
"1601372577812-0"
redis> XADD writers * name Ngozi surname Adichie
"1601372577812-1"
redis> XLEN writers
(integer) 5
redis> XRANGE writers - + COUNT 2
1) 1) "1601372577811-0"
  2) 1) "name"
      2) "Virginia"
      3) "surname"
      4) "Woolf"
2) 1) "1601372577811-1"
  2) 1) "name"
      2) "Jane"
      3) "surname"
      4) "Austen"
redis>
</syntaxhighlight>
# '''XREVRANGE''':
#: 使用 XREVRANGE 获取消息列表,会自动过滤已经删除的消息。语法格式:
#: <syntaxhighlight lang="java" highlight="">
</syntaxhighlight>
#*
#*
#*
#: <syntaxhighlight lang="java" highlight="">
</syntaxhighlight>
# '''eeeeeeeeeeeeeeeee''':
#:
#: <syntaxhighlight lang="java" highlight="">
</syntaxhighlight>
#*
#*
#*
#: <syntaxhighlight lang="java" highlight="">
</syntaxhighlight>
# '''eeeeeeeeeeeeeeeee''':
#:
#: <syntaxhighlight lang="java" highlight="">
</syntaxhighlight>
#*
#*
#*
#: <syntaxhighlight lang="java" highlight="">
</syntaxhighlight>
# '''eeeeeeeeeeeeeeeee''':
#:
#: <syntaxhighlight lang="java" highlight="">
</syntaxhighlight>
#*
#*
#*
#: <syntaxhighlight lang="java" highlight="">
</syntaxhighlight>
# '''eeeeeeeeeeeeeeeee''':
#:
#: <syntaxhighlight lang="java" highlight="">
</syntaxhighlight>
#*
#*
#*
#: <syntaxhighlight lang="java" highlight="">
</syntaxhighlight>
# '''eeeeeeeeeeeeeeeee''':
#:
#: <syntaxhighlight lang="java" highlight="">
</syntaxhighlight>
#*
#*
#*
#: <syntaxhighlight lang="java" highlight="">
</syntaxhighlight>
# '''eeeeeeeeeeeeeeeee''':
#:
#: <syntaxhighlight lang="java" highlight="">
</syntaxhighlight>
#*
#*
#*
#: <syntaxhighlight lang="java" highlight="">
</syntaxhighlight>

2021年5月10日 (一) 15:01的版本


什么是发布订阅?

Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(publish)发送消息,订阅者(subscribe)接收消息。

  • 发布订阅也叫“生产者消费者”模式,是实现“消息队列”的一种方式;


消息队列的三要素:

  1. 生产者(producer)
  2. 消费者(consumer)
  3. 消息服务(broker)


发布:

Redis发布.png

订阅:

Reids订阅.png

Redis发布和订阅

发布订阅是消息队列的一种方式,基于消息队列的方式,可以实现系统解耦、削峰填谷,顶住流量洪峰;

  • redis 的主业目前是基于键值对的数据存储、缓存等,消息队列可能是 redis 的一种尝试;
  • 常用的流行的消息队列有:“ActiveMQ”、“RabbitMQ”等;

常用命令

Option Name Description Introduced Deprecated
SUBSCRIBE channel [channel ...] 订阅给定的一个或多个频道的信息。
PSUBSCRIBE pattern [pattern ...] 订阅一个或多个符合给定模式的频道。
UNSUBSCRIBE [channel [channel ...]] 退订给定的频道。
PUNSUBSCRIBE [pattern [pattern ...]] 退订所有给定模式的频道。
PUBLISH channel message 将信息发送到指定的频道。
PUBSUB subcommand [argument [argument ...]] 查看订阅与发布系统状态。

模拟实现

(以命令行模拟实现):

  1. 开启 4 个 redis 客户端(“./redis-cli”),如上图,3 个客户端作为消息订阅者,1 个为消息发布者:
  2. 让 3 个消息订阅者订阅某个频道主题:“subscribe channelTest”;
    subscribe channel [channel ...]
    
    Redis订阅channelTest.jpg
  3. 让1个消息发布者向频道主题上发布消息:“publish channelTest message123”;
    publish channel message
    
    Redis发布消息到channelTest.jpg



如果是订阅匹配模式的频道主题:“psubscribe chan*

  1. 订阅:
    psubscribe pattern [pattern ...]
    
    Redis订阅(psubscribe).jpg
  2. 发布:
    Redis发布(psubscribe).jpg

Redis Stream

Redis 本身是有一个 Redis 发布订阅 (pub/sub) 来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。

简单来说发布订阅 (pub/sub) 可以分发消息,但无法记录历史消息。

Redis Stream 提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。

  • Redis Stream 主要用于消息队列(MQ,Message Queue).
  • Redis Stream 是 Redis 5.0 版本新增加的数据结构。

结构

Redis Stream 的结构如下所示:

Redis Stream结构.png

它有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容:

  • 每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用 xadd 指令追加消息时自动创建。

命令

消息队列相关命令:

Option Name Description Introduced Deprecated
XADD 添加消息到末尾
XDEL 删除消息
XLEN 获取流包含的元素数量,即消息长度
XTRIM 对流进行修剪,限制长度
XRANGE 获取消息列表,会自动过滤已经删除的消息
XREVRANGE 反向获取消息列表,ID 从大到小
XREAD 以阻塞或非阻塞方式获取消息列表

消费者组相关命令:

Option Name Description Introduced Deprecated
XGROUP CREATE 创建消费者组
XGROUP DESTROY 删除消费者组
XGROUP DELCONSUMER 删除消费者
XREADGROUP GROUP 读取消费者组中的消息
XGROUP SETID 为消费者组设置新的最后递送消息ID
XPENDING 显示待处理消息的相关信息
XACK 将消息标记为"已处理"
XCLAIM 转移消息的归属权
XINFO 查看流和消费者组的相关信息;
XINFO GROUPS 打印消费者组的信息;
XINFO STREAM 打印流信息

示例

  1. XADD
    使用 XADD 向队列添加消息,如果指定的队列不存在,则创建一个队列。XADD 语法格式:
    XADD key ID field value [field value ...]
    
    • key :队列名称,如果不存在就创建。
    • ID :消息 id。用 * 表示由 redis 生成;可以自定义,但是要自己保证递增性。
    • field value : 记录
    redis> XADD mystream * name Sara surname OConnor
    "1601372323627-0"
    redis> XADD mystream * field1 value1 field2 value2 field3 value3
    "1601372323627-1"
    redis> XLEN mystream
    (integer) 2
    redis> XRANGE mystream - +
    1) 1) "1601372323627-0"
       2) 1) "name"
          2) "Sara"
          3) "surname"
          4) "OConnor"
    2) 1) "1601372323627-1"
       2) 1) "field1"
          2) "value1"
          3) "field2"
          4) "value2"
          5) "field3"
          6) "value3"
    redis>
    
  2. XTRIM:【???】
    使用 XTRIM 对流进行修剪,限制长度。语法格式:
    XTRIM key MAXLEN [~] count
    
    • key :队列名称
    • MAXLEN :长度
    • count :数量
    127.0.0.1:6379> XADD mystream * field1 A field2 B field3 C field4 D
    "1601372434568-0"
    127.0.0.1:6379> XTRIM mystream MAXLEN 2
    (integer) 0
    127.0.0.1:6379> XRANGE mystream - +
    1) 1) "1601372434568-0"
       2) 1) "field1"
          2) "A"
          3) "field2"
          4) "B"
          5) "field3"
          6) "C"
          7) "field4"
          8) "D"
    127.0.0.1:6379>
    
    redis>
    
  3. XDEL
    使用 XDEL 删除消息。语法格式:
    XDEL key ID [ID ...]
    
    • key:队列名称
    • ID :消息 ID
    > XADD mystream * a 1
    1538561698944-0
    > XADD mystream * b 2
    1538561700640-0
    > XADD mystream * c 3
    1538561701744-0
    > XDEL mystream 1538561700640-0
    (integer) 1
    127.0.0.1:6379> XRANGE mystream - +
    1) 1) 1538561698944-0
       2) 1) "a"
          2) "1"
    2) 1) 1538561701744-0
       2) 1) "c"
          2) "3"
    
  4. XLEN
    使用 XLEN 获取流包含的元素数量,即消息长度。语法格式:
    XLEN key
    
    • key:队列名称
    redis> XADD mystream * item 1
    "1601372563177-0"
    redis> XADD mystream * item 2
    "1601372563178-0"
    redis> XADD mystream * item 3
    "1601372563178-1"
    redis> XLEN mystream
    (integer) 3
    redis>
    
  5. XRANGE:【???】
    使用 XRANGE 获取消息列表,会自动过滤已经删除的消息。语法格式:
    XRANGE key start end [COUNT count]
    
    • key :队列名
    • start :开始值, - 表示最小值
    • end :结束值, + 表示最大值
    • count :数量
    redis> XADD writers * name Virginia surname Woolf
    "1601372577811-0"
    redis> XADD writers * name Jane surname Austen
    "1601372577811-1"
    redis> XADD writers * name Toni surname Morrison
    "1601372577811-2"
    redis> XADD writers * name Agatha surname Christie
    "1601372577812-0"
    redis> XADD writers * name Ngozi surname Adichie
    "1601372577812-1"
    redis> XLEN writers
    (integer) 5
    redis> XRANGE writers - + COUNT 2
    1) 1) "1601372577811-0"
       2) 1) "name"
          2) "Virginia"
          3) "surname"
          4) "Woolf"
    2) 1) "1601372577811-1"
       2) 1) "name"
          2) "Jane"
          3) "surname"
          4) "Austen"
    redis>
    
  6. XREVRANGE
    使用 XREVRANGE 获取消息列表,会自动过滤已经删除的消息。语法格式:
  7. eeeeeeeeeeeeeeeee
  8. eeeeeeeeeeeeeeeee
  9. eeeeeeeeeeeeeeeee
  10. eeeeeeeeeeeeeeeee
  11. eeeeeeeeeeeeeeeee
  12. eeeeeeeeeeeeeeeee