Redis:发布/订阅(pub/sub)

来自Wikioe
Eijux讨论 | 贡献2021年5月10日 (一) 15:01的版本 →‎结构
跳到导航 跳到搜索


什么是发布订阅?

Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(publish)发送消息,订阅者(subscribe)接收消息。

  • 发布订阅也叫“生产者消费者”模式,是实现“消息队列”的一种方式;


消息队列的三要素:

  1. 生产者(producer)
  2. 消费者(consumer)
  3. 消息服务(broker)


发布:

Redis发布.png

订阅:

Reids订阅.png

Redis发布和订阅

发布订阅是消息队列的一种方式,基于消息队列的方式,可以实现系统解耦、削峰填谷,顶住流量洪峰;

  • redis 的主业目前是基于键值对的数据存储、缓存等,消息队列可能是 redis 的一种尝试;
  • 常用的流行的消息队列有:“ActiveMQ”、“RabbitMQ”等;

常用命令

Option Name Description Introduced Deprecated
SUBSCRIBE channel [channel ...] 订阅给定的一个或多个频道的信息。
PSUBSCRIBE pattern [pattern ...] 订阅一个或多个符合给定模式的频道。
UNSUBSCRIBE [channel [channel ...]] 退订给定的频道。
PUNSUBSCRIBE [pattern [pattern ...]] 退订所有给定模式的频道。
PUBLISH channel message 将信息发送到指定的频道。
PUBSUB subcommand [argument [argument ...]] 查看订阅与发布系统状态。

模拟实现

(以命令行模拟实现):

  1. 开启 4 个 redis 客户端(“./redis-cli”),如上图,3 个客户端作为消息订阅者,1 个为消息发布者:
  2. 让 3 个消息订阅者订阅某个频道主题:“subscribe channelTest”;
    subscribe channel [channel ...]
    
    Redis订阅channelTest.jpg
  3. 让1个消息发布者向频道主题上发布消息:“publish channelTest message123”;
    publish channel message
    
    Redis发布消息到channelTest.jpg



如果是订阅匹配模式的频道主题:“psubscribe chan*

  1. 订阅:
    psubscribe pattern [pattern ...]
    
    Redis订阅(psubscribe).jpg
  2. 发布:
    Redis发布(psubscribe).jpg

Redis Stream

Redis 本身是有一个 Redis 发布订阅 (pub/sub) 来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。

简单来说发布订阅 (pub/sub) 可以分发消息,但无法记录历史消息。

Redis Stream 提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。

  • Redis Stream 主要用于消息队列(MQ,Message Queue).
  • Redis Stream 是 Redis 5.0 版本新增加的数据结构。

结构

Redis Stream 的结构如下所示:

Redis Stream结构.png

它有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容:

  • 每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用 xadd 指令追加消息时自动创建。

命令

消息队列相关命令:

Option Name Description Introduced Deprecated
XADD 添加消息到末尾
XDEL 删除消息
XLEN 获取流包含的元素数量,即消息长度
XTRIM 对流进行修剪,限制长度
XRANGE 获取消息列表,会自动过滤已经删除的消息
XREVRANGE 反向获取消息列表,ID 从大到小
XREAD 以阻塞或非阻塞方式获取消息列表

消费者组相关命令:

Option Name Description Introduced Deprecated
XGROUP CREATE 创建消费者组
XGROUP DESTROY 删除消费者组
XGROUP DELCONSUMER 删除消费者
XREADGROUP GROUP 读取消费者组中的消息
XGROUP SETID 为消费者组设置新的最后递送消息ID
XPENDING 显示待处理消息的相关信息
XACK 将消息标记为"已处理"
XCLAIM 转移消息的归属权
XINFO 查看流和消费者组的相关信息;
XINFO GROUPS 打印消费者组的信息;
XINFO STREAM 打印流信息

示例

  1. XADD
    使用 XADD 向队列添加消息,如果指定的队列不存在,则创建一个队列。XADD 语法格式:
    XADD key ID field value [field value ...]
    
    • key :队列名称,如果不存在就创建。
    • ID :消息 id。用 * 表示由 redis 生成;可以自定义,但是要自己保证递增性。
    • field value : 记录
    redis> XADD mystream * name Sara surname OConnor
    "1601372323627-0"
    redis> XADD mystream * field1 value1 field2 value2 field3 value3
    "1601372323627-1"
    redis> XLEN mystream
    (integer) 2
    redis> XRANGE mystream - +
    1) 1) "1601372323627-0"
       2) 1) "name"
          2) "Sara"
          3) "surname"
          4) "OConnor"
    2) 1) "1601372323627-1"
       2) 1) "field1"
          2) "value1"
          3) "field2"
          4) "value2"
          5) "field3"
          6) "value3"
    redis>
    
  2. XTRIM:【???】
    使用 XTRIM 对流进行修剪,限制长度。语法格式:
    XTRIM key MAXLEN [~] count
    
    • key :队列名称
    • MAXLEN :长度
    • count :数量
    127.0.0.1:6379> XADD mystream * field1 A field2 B field3 C field4 D
    "1601372434568-0"
    127.0.0.1:6379> XTRIM mystream MAXLEN 2
    (integer) 0
    127.0.0.1:6379> XRANGE mystream - +
    1) 1) "1601372434568-0"
       2) 1) "field1"
          2) "A"
          3) "field2"
          4) "B"
          5) "field3"
          6) "C"
          7) "field4"
          8) "D"
    127.0.0.1:6379>
    
    redis>
    
  3. XDEL
    使用 XDEL 删除消息。语法格式:
    XDEL key ID [ID ...]
    
    • key:队列名称
    • ID :消息 ID
    > XADD mystream * a 1
    1538561698944-0
    > XADD mystream * b 2
    1538561700640-0
    > XADD mystream * c 3
    1538561701744-0
    > XDEL mystream 1538561700640-0
    (integer) 1
    127.0.0.1:6379> XRANGE mystream - +
    1) 1) 1538561698944-0
       2) 1) "a"
          2) "1"
    2) 1) 1538561701744-0
       2) 1) "c"
          2) "3"
    
  4. XLEN
    使用 XLEN 获取流包含的元素数量,即消息长度。语法格式:
    XLEN key
    
    • key:队列名称
    redis> XADD mystream * item 1
    "1601372563177-0"
    redis> XADD mystream * item 2
    "1601372563178-0"
    redis> XADD mystream * item 3
    "1601372563178-1"
    redis> XLEN mystream
    (integer) 3
    redis>
    
  5. XRANGE:【???】
    使用 XRANGE 获取消息列表,会自动过滤已经删除的消息。语法格式:
    XRANGE key start end [COUNT count]
    
    • key :队列名
    • start :开始值, - 表示最小值
    • end :结束值, + 表示最大值
    • count :数量
    redis> XADD writers * name Virginia surname Woolf
    "1601372577811-0"
    redis> XADD writers * name Jane surname Austen
    "1601372577811-1"
    redis> XADD writers * name Toni surname Morrison
    "1601372577811-2"
    redis> XADD writers * name Agatha surname Christie
    "1601372577812-0"
    redis> XADD writers * name Ngozi surname Adichie
    "1601372577812-1"
    redis> XLEN writers
    (integer) 5
    redis> XRANGE writers - + COUNT 2
    1) 1) "1601372577811-0"
       2) 1) "name"
          2) "Virginia"
          3) "surname"
          4) "Woolf"
    2) 1) "1601372577811-1"
       2) 1) "name"
          2) "Jane"
          3) "surname"
          4) "Austen"
    redis>
    
  6. XREVRANGE
    使用 XREVRANGE 获取消息列表,会自动过滤已经删除的消息。语法格式:
  7. eeeeeeeeeeeeeeeee
  8. eeeeeeeeeeeeeeeee
  9. eeeeeeeeeeeeeeeee
  10. eeeeeeeeeeeeeeeee
  11. eeeeeeeeeeeeeeeee
  12. eeeeeeeeeeeeeeeee