“Redis:发布/订阅(pub/sub)”的版本间差异
跳到导航
跳到搜索
无编辑摘要 |
|||
第23行: | 第23行: | ||
* redis 的主业目前是基于键值对的数据存储、缓存等,消息队列可能是 redis 的一种尝试; | * redis 的主业目前是基于键值对的数据存储、缓存等,消息队列可能是 redis 的一种尝试; | ||
* 常用的流行的消息队列有:“'''ActiveMQ'''”、“'''RabbitMQ'''”等; | * 常用的流行的消息队列有:“'''ActiveMQ'''”、“'''RabbitMQ'''”等; | ||
=== 常用命令 === | |||
{| class="wikitable" | |||
! Option Name !! Description !! Introduced !! Deprecated | |||
|- | |||
| SUBSCRIBE channel [channel ...] || '''订阅'''给定的一个或多个频道的信息。 | |||
|- | |||
| PSUBSCRIBE pattern [pattern ...] || '''订阅'''一个或多个符合给定模式的频道。 | |||
|- | |||
| UNSUBSCRIBE [channel [channel ...]] || '''退订'''给定的频道。 | |||
|- | |||
| PUNSUBSCRIBE [pattern [pattern ...]] || '''退订'''所有给定模式的频道。 | |||
|- | |||
| PUBLISH channel message || 将信息发送到指定的频道。 | |||
|- | |||
| PUBSUB subcommand [argument [argument ...]] || 查看订阅与发布系统状态。 | |||
|} | |||
=== 模拟实现 === | === 模拟实现 === | ||
第49行: | 第66行: | ||
# 发布: | # 发布: | ||
#:[[File:redis发布(psubscribe).jpg|800px]] | #:[[File:redis发布(psubscribe).jpg|800px]] | ||
== Redis Stream == | |||
<pre> | |||
Redis 本身是有一个 Redis 发布订阅 (pub/sub) 来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。 | |||
简单来说发布订阅 (pub/sub) 可以分发消息,但无法记录历史消息。 | |||
</pre> | |||
Redis Stream 提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。 | |||
* Redis Stream 主要用于消息队列(MQ,Message Queue). | |||
* Redis Stream 是 Redis 5.0 版本新增加的数据结构。 | |||
=== 结构 === | |||
Redis Stream 的结构如下所示: | |||
: [[File:Redis Stream结构.png|400px]] | |||
它有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容: | |||
* 每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用 xadd 指令追加消息时自动创建。 | |||
=== 命令 === | |||
消息队列相关命令: | |||
{| class="wikitable" | |||
! Option Name !! Description !! Introduced !! Deprecated | |||
|- | |||
| XADD || 添加消息到末尾 | |||
|- | |||
| XDEL || 删除消息 | |||
|- | |||
| XLEN || 获取流包含的元素数量,即消息长度 | |||
|- | |||
| XTRIM || 对流进行修剪,限制长度 | |||
|- | |||
| XRANGE || 获取消息列表,会自动过滤已经删除的消息 | |||
|- | |||
| XREVRANGE || 反向获取消息列表,ID 从大到小 | |||
|- | |||
| XREAD || 以阻塞或非阻塞方式获取消息列表 | |||
|} | |||
消费者组相关命令: | |||
{| class="wikitable" | |||
! Option Name !! Description !! Introduced !! Deprecated | |||
|- | |||
| XGROUP CREATE || 创建消费者组 | |||
|- | |||
| XGROUP DESTROY || 删除消费者组 | |||
|- | |||
| XGROUP DELCONSUMER || 删除消费者 | |||
|- | |||
| XREADGROUP GROUP || 读取消费者组中的消息 | |||
|- | |||
| XGROUP SETID || 为消费者组设置新的最后递送消息ID | |||
|- | |||
| XPENDING || 显示待处理消息的相关信息 | |||
|- | |||
| XACK || 将消息标记为"已处理" | |||
|- | |||
| XCLAIM || 转移消息的归属权 | |||
|- | |||
| XINFO || 查看流和消费者组的相关信息; | |||
|- | |||
| XINFO GROUPS || 打印消费者组的信息; | |||
|- | |||
| XINFO STREAM || 打印流信息 | |||
|} | |||
==== 示例 ==== | |||
# '''XADD''': | |||
#: 使用 XADD 向队列添加消息,如果指定的队列不存在,则创建一个队列。XADD 语法格式: | |||
#: <syntaxhighlight lang="java" highlight=""> | |||
XADD key ID field value [field value ...] | |||
</syntaxhighlight> | |||
#* key :'''队列名称''',如果不存在就创建。 | |||
#* ID :'''消息 id'''。用 * 表示由 redis 生成;可以自定义,但是要自己保证递增性。 | |||
#* field value : '''记录'''。 | |||
#: <syntaxhighlight lang="java" highlight=""> | |||
redis> XADD mystream * name Sara surname OConnor | |||
"1601372323627-0" | |||
redis> XADD mystream * field1 value1 field2 value2 field3 value3 | |||
"1601372323627-1" | |||
redis> XLEN mystream | |||
(integer) 2 | |||
redis> XRANGE mystream - + | |||
1) 1) "1601372323627-0" | |||
2) 1) "name" | |||
2) "Sara" | |||
3) "surname" | |||
4) "OConnor" | |||
2) 1) "1601372323627-1" | |||
2) 1) "field1" | |||
2) "value1" | |||
3) "field2" | |||
4) "value2" | |||
5) "field3" | |||
6) "value3" | |||
redis> | |||
</syntaxhighlight> | |||
# '''XTRIM''':【???】 | |||
#: 使用 XTRIM 对流进行修剪,限制长度。语法格式: | |||
#: <syntaxhighlight lang="java" highlight=""> | |||
XTRIM key MAXLEN [~] count | |||
</syntaxhighlight> | |||
#* key :队列名称 | |||
#* MAXLEN :长度 | |||
#* count :数量 | |||
#: <syntaxhighlight lang="java" highlight=""> | |||
127.0.0.1:6379> XADD mystream * field1 A field2 B field3 C field4 D | |||
"1601372434568-0" | |||
127.0.0.1:6379> XTRIM mystream MAXLEN 2 | |||
(integer) 0 | |||
127.0.0.1:6379> XRANGE mystream - + | |||
1) 1) "1601372434568-0" | |||
2) 1) "field1" | |||
2) "A" | |||
3) "field2" | |||
4) "B" | |||
5) "field3" | |||
6) "C" | |||
7) "field4" | |||
8) "D" | |||
127.0.0.1:6379> | |||
redis> | |||
</syntaxhighlight> | |||
# '''XDEL''': | |||
#: 使用 XDEL 删除消息。语法格式: | |||
#: <syntaxhighlight lang="java" highlight=""> | |||
XDEL key ID [ID ...] | |||
</syntaxhighlight> | |||
#* key:队列名称 | |||
#* ID :消息 ID | |||
#: <syntaxhighlight lang="java" highlight=""> | |||
> XADD mystream * a 1 | |||
1538561698944-0 | |||
> XADD mystream * b 2 | |||
1538561700640-0 | |||
> XADD mystream * c 3 | |||
1538561701744-0 | |||
> XDEL mystream 1538561700640-0 | |||
(integer) 1 | |||
127.0.0.1:6379> XRANGE mystream - + | |||
1) 1) 1538561698944-0 | |||
2) 1) "a" | |||
2) "1" | |||
2) 1) 1538561701744-0 | |||
2) 1) "c" | |||
2) "3" | |||
</syntaxhighlight> | |||
# '''XLEN''': | |||
#: 使用 XLEN 获取流包含的元素数量,即消息长度。语法格式: | |||
#: <syntaxhighlight lang="java" highlight=""> | |||
XLEN key | |||
</syntaxhighlight> | |||
#* key:队列名称 | |||
#: <syntaxhighlight lang="java" highlight=""> | |||
redis> XADD mystream * item 1 | |||
"1601372563177-0" | |||
redis> XADD mystream * item 2 | |||
"1601372563178-0" | |||
redis> XADD mystream * item 3 | |||
"1601372563178-1" | |||
redis> XLEN mystream | |||
(integer) 3 | |||
redis> | |||
</syntaxhighlight> | |||
# '''XRANGE''':【???】 | |||
#: 使用 XRANGE 获取消息列表,会自动过滤已经删除的消息。语法格式: | |||
#: <syntaxhighlight lang="java" highlight=""> | |||
XRANGE key start end [COUNT count] | |||
</syntaxhighlight> | |||
#* key :队列名 | |||
#* start :开始值, - 表示最小值 | |||
#* end :结束值, + 表示最大值 | |||
#* count :数量 | |||
#: <syntaxhighlight lang="java" highlight=""> | |||
redis> XADD writers * name Virginia surname Woolf | |||
"1601372577811-0" | |||
redis> XADD writers * name Jane surname Austen | |||
"1601372577811-1" | |||
redis> XADD writers * name Toni surname Morrison | |||
"1601372577811-2" | |||
redis> XADD writers * name Agatha surname Christie | |||
"1601372577812-0" | |||
redis> XADD writers * name Ngozi surname Adichie | |||
"1601372577812-1" | |||
redis> XLEN writers | |||
(integer) 5 | |||
redis> XRANGE writers - + COUNT 2 | |||
1) 1) "1601372577811-0" | |||
2) 1) "name" | |||
2) "Virginia" | |||
3) "surname" | |||
4) "Woolf" | |||
2) 1) "1601372577811-1" | |||
2) 1) "name" | |||
2) "Jane" | |||
3) "surname" | |||
4) "Austen" | |||
redis> | |||
</syntaxhighlight> | |||
# '''XREVRANGE''': | |||
#: 使用 XREVRANGE 获取消息列表,会自动过滤已经删除的消息。语法格式: | |||
#: <syntaxhighlight lang="java" highlight=""> | |||
</syntaxhighlight> | |||
#* | |||
#* | |||
#* | |||
#: <syntaxhighlight lang="java" highlight=""> | |||
</syntaxhighlight> | |||
# '''eeeeeeeeeeeeeeeee''': | |||
#: | |||
#: <syntaxhighlight lang="java" highlight=""> | |||
</syntaxhighlight> | |||
#* | |||
#* | |||
#* | |||
#: <syntaxhighlight lang="java" highlight=""> | |||
</syntaxhighlight> | |||
# '''eeeeeeeeeeeeeeeee''': | |||
#: | |||
#: <syntaxhighlight lang="java" highlight=""> | |||
</syntaxhighlight> | |||
#* | |||
#* | |||
#* | |||
#: <syntaxhighlight lang="java" highlight=""> | |||
</syntaxhighlight> | |||
# '''eeeeeeeeeeeeeeeee''': | |||
#: | |||
#: <syntaxhighlight lang="java" highlight=""> | |||
</syntaxhighlight> | |||
#* | |||
#* | |||
#* | |||
#: <syntaxhighlight lang="java" highlight=""> | |||
</syntaxhighlight> | |||
# '''eeeeeeeeeeeeeeeee''': | |||
#: | |||
#: <syntaxhighlight lang="java" highlight=""> | |||
</syntaxhighlight> | |||
#* | |||
#* | |||
#* | |||
#: <syntaxhighlight lang="java" highlight=""> | |||
</syntaxhighlight> | |||
# '''eeeeeeeeeeeeeeeee''': | |||
#: | |||
#: <syntaxhighlight lang="java" highlight=""> | |||
</syntaxhighlight> | |||
#* | |||
#* | |||
#* | |||
#: <syntaxhighlight lang="java" highlight=""> | |||
</syntaxhighlight> | |||
# '''eeeeeeeeeeeeeeeee''': | |||
#: | |||
#: <syntaxhighlight lang="java" highlight=""> | |||
</syntaxhighlight> | |||
#* | |||
#* | |||
#* | |||
#: <syntaxhighlight lang="java" highlight=""> | |||
</syntaxhighlight> |
2021年5月10日 (一) 15:01的版本
什么是发布订阅?
Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(publish)发送消息,订阅者(subscribe)接收消息。
- 发布订阅也叫“生产者消费者”模式,是实现“消息队列”的一种方式;
消息队列的三要素:
- 生产者(producer)
- 消费者(consumer)
- 消息服务(broker)
发布:
订阅:
Redis发布和订阅
发布订阅是消息队列的一种方式,基于消息队列的方式,可以实现系统解耦、削峰填谷,顶住流量洪峰;
- redis 的主业目前是基于键值对的数据存储、缓存等,消息队列可能是 redis 的一种尝试;
- 常用的流行的消息队列有:“ActiveMQ”、“RabbitMQ”等;
常用命令
Option Name | Description | Introduced | Deprecated |
---|---|---|---|
SUBSCRIBE channel [channel ...] | 订阅给定的一个或多个频道的信息。 | ||
PSUBSCRIBE pattern [pattern ...] | 订阅一个或多个符合给定模式的频道。 | ||
UNSUBSCRIBE [channel [channel ...]] | 退订给定的频道。 | ||
PUNSUBSCRIBE [pattern [pattern ...]] | 退订所有给定模式的频道。 | ||
PUBLISH channel message | 将信息发送到指定的频道。 | ||
PUBSUB subcommand [argument [argument ...]] | 查看订阅与发布系统状态。 |
模拟实现
(以命令行模拟实现):
- 开启 4 个 redis 客户端(“./redis-cli”),如上图,3 个客户端作为消息订阅者,1 个为消息发布者:
- 让 3 个消息订阅者订阅某个频道主题:“
subscribe channelTest
”; - 让1个消息发布者向频道主题上发布消息:“
publish channelTest message123
”;
如果是订阅匹配模式的频道主题:“psubscribe chan*
”
Redis Stream
Redis 本身是有一个 Redis 发布订阅 (pub/sub) 来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。 简单来说发布订阅 (pub/sub) 可以分发消息,但无法记录历史消息。
Redis Stream 提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。
- Redis Stream 主要用于消息队列(MQ,Message Queue).
- Redis Stream 是 Redis 5.0 版本新增加的数据结构。
结构
Redis Stream 的结构如下所示:
它有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容:
- 每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用 xadd 指令追加消息时自动创建。
命令
消息队列相关命令:
Option Name | Description | Introduced | Deprecated |
---|---|---|---|
XADD | 添加消息到末尾 | ||
XDEL | 删除消息 | ||
XLEN | 获取流包含的元素数量,即消息长度 | ||
XTRIM | 对流进行修剪,限制长度 | ||
XRANGE | 获取消息列表,会自动过滤已经删除的消息 | ||
XREVRANGE | 反向获取消息列表,ID 从大到小 | ||
XREAD | 以阻塞或非阻塞方式获取消息列表 |
消费者组相关命令:
Option Name | Description | Introduced | Deprecated |
---|---|---|---|
XGROUP CREATE | 创建消费者组 | ||
XGROUP DESTROY | 删除消费者组 | ||
XGROUP DELCONSUMER | 删除消费者 | ||
XREADGROUP GROUP | 读取消费者组中的消息 | ||
XGROUP SETID | 为消费者组设置新的最后递送消息ID | ||
XPENDING | 显示待处理消息的相关信息 | ||
XACK | 将消息标记为"已处理" | ||
XCLAIM | 转移消息的归属权 | ||
XINFO | 查看流和消费者组的相关信息; | ||
XINFO GROUPS | 打印消费者组的信息; | ||
XINFO STREAM | 打印流信息 |
示例
- XADD:
- 使用 XADD 向队列添加消息,如果指定的队列不存在,则创建一个队列。XADD 语法格式:
XADD key ID field value [field value ...]
- key :队列名称,如果不存在就创建。
- ID :消息 id。用 * 表示由 redis 生成;可以自定义,但是要自己保证递增性。
- field value : 记录。
redis> XADD mystream * name Sara surname OConnor "1601372323627-0" redis> XADD mystream * field1 value1 field2 value2 field3 value3 "1601372323627-1" redis> XLEN mystream (integer) 2 redis> XRANGE mystream - + 1) 1) "1601372323627-0" 2) 1) "name" 2) "Sara" 3) "surname" 4) "OConnor" 2) 1) "1601372323627-1" 2) 1) "field1" 2) "value1" 3) "field2" 4) "value2" 5) "field3" 6) "value3" redis>
- XTRIM:【???】
- 使用 XTRIM 对流进行修剪,限制长度。语法格式:
XTRIM key MAXLEN [~] count
- key :队列名称
- MAXLEN :长度
- count :数量
127.0.0.1:6379> XADD mystream * field1 A field2 B field3 C field4 D "1601372434568-0" 127.0.0.1:6379> XTRIM mystream MAXLEN 2 (integer) 0 127.0.0.1:6379> XRANGE mystream - + 1) 1) "1601372434568-0" 2) 1) "field1" 2) "A" 3) "field2" 4) "B" 5) "field3" 6) "C" 7) "field4" 8) "D" 127.0.0.1:6379> redis>
- XDEL:
- 使用 XDEL 删除消息。语法格式:
XDEL key ID [ID ...]
- key:队列名称
- ID :消息 ID
> XADD mystream * a 1 1538561698944-0 > XADD mystream * b 2 1538561700640-0 > XADD mystream * c 3 1538561701744-0 > XDEL mystream 1538561700640-0 (integer) 1 127.0.0.1:6379> XRANGE mystream - + 1) 1) 1538561698944-0 2) 1) "a" 2) "1" 2) 1) 1538561701744-0 2) 1) "c" 2) "3"
- XLEN:
- 使用 XLEN 获取流包含的元素数量,即消息长度。语法格式:
XLEN key
- key:队列名称
redis> XADD mystream * item 1 "1601372563177-0" redis> XADD mystream * item 2 "1601372563178-0" redis> XADD mystream * item 3 "1601372563178-1" redis> XLEN mystream (integer) 3 redis>
- XRANGE:【???】
- 使用 XRANGE 获取消息列表,会自动过滤已经删除的消息。语法格式:
XRANGE key start end [COUNT count]
- key :队列名
- start :开始值, - 表示最小值
- end :结束值, + 表示最大值
- count :数量
redis> XADD writers * name Virginia surname Woolf "1601372577811-0" redis> XADD writers * name Jane surname Austen "1601372577811-1" redis> XADD writers * name Toni surname Morrison "1601372577811-2" redis> XADD writers * name Agatha surname Christie "1601372577812-0" redis> XADD writers * name Ngozi surname Adichie "1601372577812-1" redis> XLEN writers (integer) 5 redis> XRANGE writers - + COUNT 2 1) 1) "1601372577811-0" 2) 1) "name" 2) "Virginia" 3) "surname" 4) "Woolf" 2) 1) "1601372577811-1" 2) 1) "name" 2) "Jane" 3) "surname" 4) "Austen" redis>
- XREVRANGE:
- 使用 XREVRANGE 获取消息列表,会自动过滤已经删除的消息。语法格式:
- eeeeeeeeeeeeeeeee:
- eeeeeeeeeeeeeeeee:
- eeeeeeeeeeeeeeeee:
- eeeeeeeeeeeeeeeee:
- eeeeeeeeeeeeeeeee:
- eeeeeeeeeeeeeeeee: