“Redis:发布/订阅(pub/sub)”的版本间差异

来自Wikioe
跳到导航 跳到搜索
第66行: 第66行:
# 发布:
# 发布:
#:[[File:redis发布(psubscribe).jpg|800px]]
#:[[File:redis发布(psubscribe).jpg|800px]]
== Redis Stream【???】 ==
<pre>
Redis 本身是有一个 Redis 发布订阅 (pub/sub) 来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。
简单来说发布订阅 (pub/sub) 可以分发消息,但无法记录历史消息。
</pre>
Redis Stream 提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。
* Redis Stream 主要用于消息队列(MQ,Message Queue).
* Redis Stream 是 Redis 5.0 版本新增加的数据结构。
=== 结构 ===
Redis Stream 的结构如下所示:
: [[File:Redis Stream结构.png|600px]]
它有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容:
* 每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用 xadd 指令追加消息时自动创建。
=== 命令 ===
消息队列相关命令:
{| class="wikitable"
! 命令 !! 描述
|-
| XADD || 添加消息到末尾
|-
| XDEL || 删除消息
|-
| XLEN || 获取流包含的元素数量,即消息长度
|-
| XTRIM || 对流进行修剪,限制长度
|-
| XRANGE || 获取消息列表,会自动过滤已经删除的消息
|-
| XREVRANGE || 反向获取消息列表,ID 从大到小
|-
| XREAD || 以阻塞或非阻塞方式获取消息列表
|}
消费者组相关命令:
{| class="wikitable"
! 命令 !! 描述
|-
| XGROUP CREATE || 创建消费者组
|-
| XGROUP DESTROY || 删除消费者组
|-
| XGROUP DELCONSUMER || 删除消费者
|-
| XREADGROUP GROUP || 读取消费者组中的消息
|-
| XGROUP SETID || 为消费者组设置新的最后递送消息ID
|-
| XPENDING || 显示待处理消息的相关信息
|-
| XACK || 将消息标记为"已处理"
|-
| XCLAIM || 转移消息的归属权
|-
| XINFO || 查看流和消费者组的相关信息;
|-
| XINFO GROUPS || 打印消费者组的信息;
|-
| XINFO STREAM || 打印流信息
|}
==== 示例 ====
消息队列相关命令:
# '''XADD''':
#: 使用 XADD 向队列添加消息,如果指定的队列不存在,则创建一个队列。XADD 语法格式:
#: <syntaxhighlight lang="java" highlight="">
XADD key ID field value [field value ...]
</syntaxhighlight>
#* key :'''队列名称''',如果不存在就创建。
#* ID :'''消息 id'''。用 * 表示由 redis 生成;可以自定义,但是要自己保证递增性。
#* field value : '''记录'''。
#: <syntaxhighlight lang="java" highlight="">
redis> XADD mystream * name Sara surname OConnor
"1601372323627-0"
redis> XADD mystream * field1 value1 field2 value2 field3 value3
"1601372323627-1"
redis> XLEN mystream
(integer) 2
redis> XRANGE mystream - +
1) 1) "1601372323627-0"
  2) 1) "name"
      2) "Sara"
      3) "surname"
      4) "OConnor"
2) 1) "1601372323627-1"
  2) 1) "field1"
      2) "value1"
      3) "field2"
      4) "value2"
      5) "field3"
      6) "value3"
redis>
</syntaxhighlight>
# '''XTRIM''':【???】
#: 使用 XTRIM 对流进行修剪,限制长度。语法格式:
#: <syntaxhighlight lang="java" highlight="">
XTRIM key MAXLEN [~] count
</syntaxhighlight>
#* key :队列名称
#* MAXLEN :长度
#* count :数量
#: <syntaxhighlight lang="java" highlight="">
127.0.0.1:6379> XADD mystream * field1 A field2 B field3 C field4 D
"1601372434568-0"
127.0.0.1:6379> XTRIM mystream MAXLEN 2
(integer) 0
127.0.0.1:6379> XRANGE mystream - +
1) 1) "1601372434568-0"
  2) 1) "field1"
      2) "A"
      3) "field2"
      4) "B"
      5) "field3"
      6) "C"
      7) "field4"
      8) "D"
127.0.0.1:6379>
redis>
</syntaxhighlight>
# '''XDEL''':
#: 使用 XDEL 删除消息。语法格式:
#: <syntaxhighlight lang="java" highlight="">
XDEL key ID [ID ...]
</syntaxhighlight>
#* key:队列名称
#* ID :消息 ID
#: <syntaxhighlight lang="java" highlight="">
> XADD mystream * a 1
1538561698944-0
> XADD mystream * b 2
1538561700640-0
> XADD mystream * c 3
1538561701744-0
> XDEL mystream 1538561700640-0
(integer) 1
127.0.0.1:6379> XRANGE mystream - +
1) 1) 1538561698944-0
  2) 1) "a"
      2) "1"
2) 1) 1538561701744-0
  2) 1) "c"
      2) "3"
</syntaxhighlight>
# '''XLEN''':
#: 使用 XLEN 获取流包含的元素数量,即消息长度。语法格式:
#: <syntaxhighlight lang="java" highlight="">
XLEN key
</syntaxhighlight>
#* key:队列名称
#: <syntaxhighlight lang="java" highlight="">
redis> XADD mystream * item 1
"1601372563177-0"
redis> XADD mystream * item 2
"1601372563178-0"
redis> XADD mystream * item 3
"1601372563178-1"
redis> XLEN mystream
(integer) 3
redis>
</syntaxhighlight>
# '''XRANGE''':【按 id 顺序(即插入顺序)获取】
#: 使用 XRANGE 获取消息列表,会自动过滤已经删除的消息。语法格式:
#: <syntaxhighlight lang="java" highlight="">
XRANGE key start end [COUNT count]
</syntaxhighlight>
#* key :队列名
#* start :开始值, - 表示最小值
#* end :结束值, + 表示最大值
#* count :数量
#: <syntaxhighlight lang="java" highlight="">
redis> XADD writers * name Virginia surname Woolf
"1601372577811-0"
redis> XADD writers * name Jane surname Austen
"1601372577811-1"
redis> XADD writers * name Toni surname Morrison
"1601372577811-2"
redis> XADD writers * name Agatha surname Christie
"1601372577812-0"
redis> XADD writers * name Ngozi surname Adichie
"1601372577812-1"
redis> XLEN writers
(integer) 5
redis> XRANGE writers - + COUNT 2
1) 1) "1601372577811-0"
  2) 1) "name"
      2) "Virginia"
      3) "surname"
      4) "Woolf"
2) 1) "1601372577811-1"
  2) 1) "name"
      2) "Jane"
      3) "surname"
      4) "Austen"
redis>
</syntaxhighlight>
# '''XREVRANGE''':
#: 使用 XREVRANGE 获取消息列表,会自动过滤已经删除的消息。语法格式:
#: <syntaxhighlight lang="java" highlight="">
XREVRANGE key end start [COUNT count]
</syntaxhighlight>
#* key :队列名
#* end :结束值, + 表示最大值
#* start :开始值, - 表示最小值
#* count :数量
#: <syntaxhighlight lang="java" highlight="">
redis> XADD writers * name Virginia surname Woolf
"1601372731458-0"
redis> XADD writers * name Jane surname Austen
"1601372731459-0"
redis> XADD writers * name Toni surname Morrison
"1601372731459-1"
redis> XADD writers * name Agatha surname Christie
"1601372731459-2"
redis> XADD writers * name Ngozi surname Adichie
"1601372731459-3"
redis> XLEN writers
(integer) 5
redis> XREVRANGE writers + - COUNT 1
1) 1) "1601372731459-3"
  2) 1) "name"
      2) "Ngozi"
      3) "surname"
      4) "Adichie"
redis>
</syntaxhighlight>
# '''XREAD''':
#: 使用 XREAD 以'''阻塞或非阻塞'''方式获取消息列表。语法格式:
#: <syntaxhighlight lang="java" highlight="">
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
</syntaxhighlight>
#* count :数量
#* milliseconds :可选,阻塞毫秒数,没有设置就是非阻塞模式
#* key :队列名
#* id :消息 ID
#: <syntaxhighlight lang="java" highlight="">
# 从 Stream 头部读取两条消息
> XREAD COUNT 2 STREAMS mystream writers 0-0 0-0
1) 1) "mystream"
  2) 1) 1) 1526984818136-0
        2) 1) "duration"
            2) "1532"
            3) "event-id"
            4) "5"
            5) "user-id"
            6) "7782813"
      2) 1) 1526999352406-0
        2) 1) "duration"
            2) "812"
            3) "event-id"
            4) "9"
            5) "user-id"
            6) "388234"
2) 1) "writers"
  2) 1) 1) 1526985676425-0
        2) 1) "name"
            2) "Virginia"
            3) "surname"
            4) "Woolf"
      2) 1) 1526985685298-0
        2) 1) "name"
            2) "Jane"
            3) "surname"
            4) "Austen"
</syntaxhighlight>
消费者组相关命令:
# '''XGROUP CREATE''':
#: 使用 XGROUP CREATE 创建消费者组。语法格式:
#: <syntaxhighlight lang="java" highlight="">
XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
</syntaxhighlight>
#* key :队列名称,如果不存在就创建
#* groupname :组名
#* $ : '''表示从尾部开始消费,只接受新消息,当前 Stream 消息会全部忽略'''
#: 从头开始消费:
#: <syntaxhighlight lang="java" highlight="">
XGROUP CREATE mystream consumer-group-name 0-0 
</syntaxhighlight>
#: 从尾开始消费:
#: <syntaxhighlight lang="java" highlight="">
XGROUP CREATE mystream consumer-group-name $
</syntaxhighlight>
# '''XREADGROUP GROUP''':
#: 使用 XREADGROUP GROUP 读取消费组中的消息。语法格式:
#: <syntaxhighlight lang="java" highlight="">
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
</syntaxhighlight>
#* group :消费组名
#* consumer :消费者名
#* count : 读取数量
#* milliseconds : 阻塞毫秒数
#* key : 队列名
#* ID : 消息 ID
#: <syntaxhighlight lang="java" highlight="">
XREADGROUP GROUP consumer-group-name consumer-name COUNT 1 STREAMS mystream >
</syntaxhighlight>

2021年5月11日 (二) 22:46的版本


什么是发布订阅?

Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(publish)发送消息,订阅者(subscribe)接收消息。

  • 发布订阅也叫“生产者消费者”模式,是实现“消息队列”的一种方式;


消息队列的三要素:

  1. 生产者(producer)
  2. 消费者(consumer)
  3. 消息服务(broker)


发布:

Redis发布.png

订阅:

Reids订阅.png

Redis发布和订阅

发布订阅是消息队列的一种方式,基于消息队列的方式,可以实现系统解耦、削峰填谷,顶住流量洪峰;

  • redis 的主业目前是基于键值对的数据存储、缓存等,消息队列可能是 redis 的一种尝试;
  • 常用的流行的消息队列有:“ActiveMQ”、“RabbitMQ”等;

常用命令

Option Name Description Introduced Deprecated
SUBSCRIBE channel [channel ...] 订阅给定的一个或多个频道的信息。
PSUBSCRIBE pattern [pattern ...] 订阅一个或多个符合给定模式的频道。
UNSUBSCRIBE [channel [channel ...]] 退订给定的频道。
PUNSUBSCRIBE [pattern [pattern ...]] 退订所有给定模式的频道。
PUBLISH channel message 将信息发送到指定的频道。
PUBSUB subcommand [argument [argument ...]] 查看订阅与发布系统状态。

模拟实现

(以命令行模拟实现):

  1. 开启 4 个 redis 客户端(“./redis-cli”),如上图,3 个客户端作为消息订阅者,1 个为消息发布者:
  2. 让 3 个消息订阅者订阅某个频道主题:“subscribe channelTest”;
    subscribe channel [channel ...]
    
    Redis订阅channelTest.jpg
  3. 让1个消息发布者向频道主题上发布消息:“publish channelTest message123”;
    publish channel message
    
    Redis发布消息到channelTest.jpg



如果是订阅匹配模式的频道主题:“psubscribe chan*

  1. 订阅:
    psubscribe pattern [pattern ...]
    
    Redis订阅(psubscribe).jpg
  2. 发布:
    Redis发布(psubscribe).jpg