Redis Stream
时间:2018/12/10 12:05:59
参考:
Redis Stream#
简介#
Redis Stream 是Redis实现的一种流式存储结构。特性如下:
- 消费者组。使用消费者组的控制消息的发送。一条消息只会发送给消费者组中的一个消费者。消费者组记录组内消费者消费的最新元素。
- 消息确认机制。Redis为每个消费者维护一个消费消息的列表 (Pending List),消费者发送确认之后,消息从列表中删除,但是不会从流中删除。
- 消息重分配。消费者停止服务之后,可以把消费者未确认的消息分配给其他消费者。
- ID自增。每条消息有一个id,可以自己指定也可以系统生成,后面一条消息的id必须大于前面一条消息的Id。因此可以高效的根据ID范围进行查询。
- 消息存入之后不能变更。只能追加和删除消息,不可以插入消息。
- 固定长度。当长度大于指定值时删除最早放入流中的多余的元素。
Redis Stream 操作命令#
命令行特殊字符#
在 Redis Stream 命令中,存在如下几个特殊id:
-
-
最小id0-1
。 -
+
最大id18446744073709551615-18446744073709551615
。 -
$
当前 Stream 中最大的id。 -
>
消费者组最后发布消息的id。 -
*
XADD 时使用,表示需要服务器初始化id。
命令#
-
添加消息:
XADD stream_name id filed1 value1 field2 value2
。示例:
XADD stream_name * name zhangsan age 24
添加成功之后返回消息的id,id传
*
表示自动生成id,生成id的格式时间戳(毫秒)-自增序列号(64位)
。如果服务器时间发生回退,自动生成的id的序列号会在之前最大的序列号的基础上开始增加。手动指定的id必须大于流中最大的id。 -
查看流长度:
XLEN stream_name
。 -
范围查找:
XRANGE stream_name start end [COUNT count]
-
倒序查询:
XREVRANGE stream_name end start [COUNT count]
-
读取流里面的数据:
XREAD [COUNT count] [BLOCK milliseconds] STREAMS stream_name id
。返回ID大于指定ID的数据,可以指定返回数据的数量,以及阻塞的时间。阻塞指的是没有数据的时候阻塞,有数据的时候即使数据的数量小于COUNT指定的个数也立即返回。
# 返回id大于 1595563390562-0 的所有元素 XREAD STREAMS maxwell_stream 1595563390562-0 # 返回id大于 0-0 的1条数据 XREAD COUNT 1 STREAMS maxwell_stream 0-0 # 阻塞获取直到有元素 0 表示阻塞知道有数据 $ 表示请求时流中最大的id XREAD BLOCK 0 STREAMS mystream $ # 返回id大于 1595562850569-0 的20条数据,阻塞20秒 XREAD COUNT 20 BLOCK 20000 STREAMS maxwell_stream 1595562850569-0
消费者组#
- 每一条消息只会发送给消费者组中的一个消费者。
- 通过消费者的名字区分消费者。
- 消费者组存储下一个没有消费的Id,因此可以快速的发送消息给消费者。
- 消费者需要处理完成之后需要返回确认信息确认消息已被消费。
- 消费者组记录每个消费者等待处理的消息。
消费者组的数据结构如下:
+----------------------------------------+
| consumer_group_name: mygroup |
| consumer_group_stream: somekey |
| last_delivered_id: 1292309234234-92 |
| |
| consumers: |
| "consumer-1" with pending messages |
| 1292309234234-4 |
| 1292309234232-8 |
| "consumer-42" with pending messages |
| ... (and so forth) |
+----------------------------------------+
消费者组相关命令#
# 创建一个 mystream 的 消费者组,从大于0的id开始消费
XGROUP CREATE mystream group_name 0
# 创建一个 mystream 的 消费者组,如果流不存在则创建
XGROUP CREATE mystream group_name 0 MKSTREAM
# 消费一条未消费的消息 > 表示从未消费的消息中获取,消息会被添加到消费者的正在处理列表
XREADGROUP GROUP group_name consumer_name COUNT 1 STREAMS stream_name >
# 重新消费一条消费者已消费但未确认的消息 0-0 表示从正在处理列表中读取
XREADGROUP GROUP maxwell_group c1 COUNT 1 STREAMS maxwell_stream 0-0
# 确认消费消息,确认之后消息将从消费者正在处理的消息列表(Pending List)中删除,但是不会从流中删除
XACK stream_name group_name 0-0
# 查看正在处理的消息
XPENDING mystream group_name
# 查看消费者正在处理的消息
XPENDING maxwell_stream group_name - + 10 consumer_name
# 改变正在处理消息的拥有者, min-idle-time 在正在处理列表中的时间
XCLAIM stream_name group_name consumer_name min-idle-time id
# 查看流信息(长度、分组、第一个元素、最后一个元素,最后初始化id)
XINFO STREAM stream_name
# 查看流分组信息(消费者个数、最后一个消费消息的Id、正在处理的消息的个数)
XINFO GROUPS stream_name
# 查看流的分组的消费者信息
XINFO CONSUMERS stream_name group_name
# 设置流长度 只保存1000个元素
XTRIM stream_name MAXLEN 1000
# 设置流长度 至少保存1000个元素,比精确长度效率更高
XTRIM stream_name MAXLEN ~ 1000
# 删除元素
XDEL stream_name 1526654999635-0