查看“Redis:发布/订阅(pub/sub)”的源代码
←
Redis:发布/订阅(pub/sub)
跳到导航
跳到搜索
因为以下原因,您没有权限编辑本页:
您请求的操作仅限属于该用户组的用户执行:
用户
您可以查看和复制此页面的源代码。
[[category:Redis]] __TOC__ == 什么是发布订阅? == Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(publish)发送消息,订阅者(subscribe)接收消息。 * 发布订阅也叫“'''生产者消费者'''”模式,是实现“'''消息队列'''”的一种方式; 消息队列的三要素: # 生产者(producer) # 消费者(consumer) # 消息服务(broker) 发布: :[[File:redis发布.png|200px]] 订阅: :[[File:reids订阅.png|200px]] == Redis发布和订阅 == 发布订阅是'''消息队列'''的一种方式,基于消息队列的方式,可以实现系统解耦、削峰填谷,顶住流量洪峰; * redis 的主业目前是基于键值对的数据存储、缓存等,消息队列可能是 redis 的一种尝试; * 常用的流行的消息队列有:“'''ActiveMQ'''”、“'''RabbitMQ'''”等; === 常用命令 === {| class="wikitable" ! Option Name !! Description !! Introduced !! Deprecated |- | SUBSCRIBE channel [channel ...] || '''订阅'''给定的一个或多个频道的信息。 |- | PSUBSCRIBE pattern [pattern ...] || '''订阅'''一个或多个符合给定模式的频道。 |- | UNSUBSCRIBE [channel [channel ...]] || '''退订'''给定的频道。 |- | PUNSUBSCRIBE [pattern [pattern ...]] || '''退订'''所有给定模式的频道。 |- | PUBLISH channel message || 将信息发送到指定的频道。 |- | PUBSUB subcommand [argument [argument ...]] || 查看订阅与发布系统状态。 |} === 模拟实现 === (以命令行模拟实现): # 开启 4 个 redis 客户端(“./redis-cli”),如上图,3 个客户端作为消息订阅者,1 个为消息发布者: # 让 3 个消息订阅者订阅某个频道主题:“'''<code>subscribe channelTest</code>'''”; #: <syntaxhighlight lang="java" highlight=""> subscribe channel [channel ...] </syntaxhighlight> #: [[File:redis订阅channelTest.jpg|800px]] # 让1个消息发布者向频道主题上发布消息:“'''<code>publish channelTest message123</code>'''”; #: <syntaxhighlight lang="java" highlight=""> publish channel message </syntaxhighlight> #: [[File:redis发布消息到channelTest.jpg|800px]] 如果是订阅匹配模式的频道主题:“'''<code>psubscribe chan*</code>'''” # 订阅: #: <syntaxhighlight lang="java" highlight=""> psubscribe pattern [pattern ...] </syntaxhighlight> #:[[File:redis订阅(psubscribe).jpg|800px]] # 发布: #:[[File:redis发布(psubscribe).jpg|800px]] == Redis Stream == <pre> Redis 本身是有一个 Redis 发布订阅 (pub/sub) 来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。 简单来说发布订阅 (pub/sub) 可以分发消息,但无法记录历史消息。 </pre> Redis Stream 提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。 * Redis Stream 主要用于消息队列(MQ,Message Queue). * Redis Stream 是 Redis 5.0 版本新增加的数据结构。 === 结构 === Redis Stream 的结构如下所示: : [[File:Redis Stream结构.png|600px]] 它有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容: * 每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用 xadd 指令追加消息时自动创建。 === 命令 === 消息队列相关命令: {| class="wikitable" ! Option Name !! Description !! Introduced !! Deprecated |- | XADD || 添加消息到末尾 |- | XDEL || 删除消息 |- | XLEN || 获取流包含的元素数量,即消息长度 |- | XTRIM || 对流进行修剪,限制长度 |- | XRANGE || 获取消息列表,会自动过滤已经删除的消息 |- | XREVRANGE || 反向获取消息列表,ID 从大到小 |- | XREAD || 以阻塞或非阻塞方式获取消息列表 |} 消费者组相关命令: {| class="wikitable" ! Option Name !! Description !! Introduced !! Deprecated |- | XGROUP CREATE || 创建消费者组 |- | XGROUP DESTROY || 删除消费者组 |- | XGROUP DELCONSUMER || 删除消费者 |- | XREADGROUP GROUP || 读取消费者组中的消息 |- | XGROUP SETID || 为消费者组设置新的最后递送消息ID |- | XPENDING || 显示待处理消息的相关信息 |- | XACK || 将消息标记为"已处理" |- | XCLAIM || 转移消息的归属权 |- | XINFO || 查看流和消费者组的相关信息; |- | XINFO GROUPS || 打印消费者组的信息; |- | XINFO STREAM || 打印流信息 |} ==== 示例 ==== # '''XADD''': #: 使用 XADD 向队列添加消息,如果指定的队列不存在,则创建一个队列。XADD 语法格式: #: <syntaxhighlight lang="java" highlight=""> XADD key ID field value [field value ...] </syntaxhighlight> #* key :'''队列名称''',如果不存在就创建。 #* ID :'''消息 id'''。用 * 表示由 redis 生成;可以自定义,但是要自己保证递增性。 #* field value : '''记录'''。 #: <syntaxhighlight lang="java" highlight=""> redis> XADD mystream * name Sara surname OConnor "1601372323627-0" redis> XADD mystream * field1 value1 field2 value2 field3 value3 "1601372323627-1" redis> XLEN mystream (integer) 2 redis> XRANGE mystream - + 1) 1) "1601372323627-0" 2) 1) "name" 2) "Sara" 3) "surname" 4) "OConnor" 2) 1) "1601372323627-1" 2) 1) "field1" 2) "value1" 3) "field2" 4) "value2" 5) "field3" 6) "value3" redis> </syntaxhighlight> # '''XTRIM''':【???】 #: 使用 XTRIM 对流进行修剪,限制长度。语法格式: #: <syntaxhighlight lang="java" highlight=""> XTRIM key MAXLEN [~] count </syntaxhighlight> #* key :队列名称 #* MAXLEN :长度 #* count :数量 #: <syntaxhighlight lang="java" highlight=""> 127.0.0.1:6379> XADD mystream * field1 A field2 B field3 C field4 D "1601372434568-0" 127.0.0.1:6379> XTRIM mystream MAXLEN 2 (integer) 0 127.0.0.1:6379> XRANGE mystream - + 1) 1) "1601372434568-0" 2) 1) "field1" 2) "A" 3) "field2" 4) "B" 5) "field3" 6) "C" 7) "field4" 8) "D" 127.0.0.1:6379> redis> </syntaxhighlight> # '''XDEL''': #: 使用 XDEL 删除消息。语法格式: #: <syntaxhighlight lang="java" highlight=""> XDEL key ID [ID ...] </syntaxhighlight> #* key:队列名称 #* ID :消息 ID #: <syntaxhighlight lang="java" highlight=""> > XADD mystream * a 1 1538561698944-0 > XADD mystream * b 2 1538561700640-0 > XADD mystream * c 3 1538561701744-0 > XDEL mystream 1538561700640-0 (integer) 1 127.0.0.1:6379> XRANGE mystream - + 1) 1) 1538561698944-0 2) 1) "a" 2) "1" 2) 1) 1538561701744-0 2) 1) "c" 2) "3" </syntaxhighlight> # '''XLEN''': #: 使用 XLEN 获取流包含的元素数量,即消息长度。语法格式: #: <syntaxhighlight lang="java" highlight=""> XLEN key </syntaxhighlight> #* key:队列名称 #: <syntaxhighlight lang="java" highlight=""> redis> XADD mystream * item 1 "1601372563177-0" redis> XADD mystream * item 2 "1601372563178-0" redis> XADD mystream * item 3 "1601372563178-1" redis> XLEN mystream (integer) 3 redis> </syntaxhighlight> # '''XRANGE''':【???】 #: 使用 XRANGE 获取消息列表,会自动过滤已经删除的消息。语法格式: #: <syntaxhighlight lang="java" highlight=""> XRANGE key start end [COUNT count] </syntaxhighlight> #* key :队列名 #* start :开始值, - 表示最小值 #* end :结束值, + 表示最大值 #* count :数量 #: <syntaxhighlight lang="java" highlight=""> redis> XADD writers * name Virginia surname Woolf "1601372577811-0" redis> XADD writers * name Jane surname Austen "1601372577811-1" redis> XADD writers * name Toni surname Morrison "1601372577811-2" redis> XADD writers * name Agatha surname Christie "1601372577812-0" redis> XADD writers * name Ngozi surname Adichie "1601372577812-1" redis> XLEN writers (integer) 5 redis> XRANGE writers - + COUNT 2 1) 1) "1601372577811-0" 2) 1) "name" 2) "Virginia" 3) "surname" 4) "Woolf" 2) 1) "1601372577811-1" 2) 1) "name" 2) "Jane" 3) "surname" 4) "Austen" redis> </syntaxhighlight> # '''XREVRANGE''': #: 使用 XREVRANGE 获取消息列表,会自动过滤已经删除的消息。语法格式: #: <syntaxhighlight lang="java" highlight=""> </syntaxhighlight> #* #* #* #: <syntaxhighlight lang="java" highlight=""> </syntaxhighlight> # '''eeeeeeeeeeeeeeeee''': #: #: <syntaxhighlight lang="java" highlight=""> </syntaxhighlight> #* #* #* #: <syntaxhighlight lang="java" highlight=""> </syntaxhighlight> # '''eeeeeeeeeeeeeeeee''': #: #: <syntaxhighlight lang="java" highlight=""> </syntaxhighlight> #* #* #* #: <syntaxhighlight lang="java" highlight=""> </syntaxhighlight> # '''eeeeeeeeeeeeeeeee''': #: #: <syntaxhighlight lang="java" highlight=""> </syntaxhighlight> #* #* #* #: <syntaxhighlight lang="java" highlight=""> </syntaxhighlight> # '''eeeeeeeeeeeeeeeee''': #: #: <syntaxhighlight lang="java" highlight=""> </syntaxhighlight> #* #* #* #: <syntaxhighlight lang="java" highlight=""> </syntaxhighlight> # '''eeeeeeeeeeeeeeeee''': #: #: <syntaxhighlight lang="java" highlight=""> </syntaxhighlight> #* #* #* #: <syntaxhighlight lang="java" highlight=""> </syntaxhighlight> # '''eeeeeeeeeeeeeeeee''': #: #: <syntaxhighlight lang="java" highlight=""> </syntaxhighlight> #* #* #* #: <syntaxhighlight lang="java" highlight=""> </syntaxhighlight>
返回至“
Redis:发布/订阅(pub/sub)
”。
导航菜单
个人工具
登录
命名空间
页面
讨论
大陆简体
已展开
已折叠
查看
阅读
查看源代码
查看历史
更多
已展开
已折叠
搜索
导航
首页
最近更改
随机页面
MediaWiki帮助
笔记
服务器
数据库
后端
前端
工具
《To do list》
日常
阅读
电影
摄影
其他
Software
Windows
WIKIOE
所有分类
所有页面
侧边栏
站点日志
工具
链入页面
相关更改
特殊页面
页面信息