在之前的文章《Redis(二):Redis消息队列》中已经介绍过Redis中使用List以及发布订阅两种模式实现的消息队列,其中发布订阅模式比List模式功能更强大,但是有很大的缺陷:
- 消息没有持久化,如果消费者断线重连,消息会丢失
- 没有ack机制,无法保证消息被成功消费。
- Redis服务重启后消息会丢失。
Redis Stream是在5.0版本引入的新消息队列,它能持久化消息并且带ACK机制,甚至还有类似Kafka一样的ConsumerGroup(消费者组)协同消费消息,可以说完全弥补了发布订阅模式的所有缺点。
这篇文章单独把Redis Stream拿出来说,是因为我认为到目前为止,Redis Stream是Redis最复杂的类型了。Stream类型不像BitMap或者GEO操作底层存储还是五种基本数据类型之一,Stream类型的key使用type命令得到的类型就是"stream",它是一个独立的类型。
以下内容均基于Redis 6.2.1。
一、Stream基本命令
1、xadd
xadd命令用于向Stream中添加新消息,其完整命令如下所示:
XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID field value [field value ...]
该命令比较复杂,下面一部分一部分来看:
key: 指定要添加消息的 Stream 名称
NOMKSTREAM: 可选参数,如果指定,当 key 不存在时不会自动创建新的 Stream
长度限制参数:
- MAXLEN [=|~] threshold:限制 Stream 的最大长度,超过时会删除旧消息
- = 表示精确限制(性能较低)
- ~ 表示近似限制(性能更高,默认)
- MINID [=|~] threshold:只保留 ID 大于等于指定值的消息(Redis 6.2+)
LIMIT count:与 MAXLEN/MINID 配合使用,限制每次修剪的消息数量
消息ID:
- *:让 Redis 自动生成 ID(格式为<毫秒时间戳>-<序列号>,如1717020000000-5)
- 手动指定 ID:必须大于 Stream 中已有最大 ID,否则会报错
field value [field value ...]:键值对形式的消息内容,可以包含多个字段
举例:向Stream新增一条数据:
127.0.0.1:6379> xadd stream * name zhangsan age 15 sex man
1751425687108-0 #返回消息id
127.0.0.1:6379> type stream
stream
127.0.0.1:6379>
注意,这里添加了多个field value,但是在Stream中还是一个元素。
2、xrange
xrange用于按照消息 ID 的范围查询 Stream 中的消息。其具体命令格式如下:
xrange key start end [COUNT count]
key:要查询的 Stream 名称(必填)
start:起始消息 ID(必填)
- 特殊值
-
表示最小 ID(即 Stream 中的第一条消息) - 可以指定具体的消息 ID(如
1526919030474-55
)
end:结束消息 ID(必填)
- 特殊值
+
表示最大 ID(即 Stream 中的最新消息) - 可以指定具体的消息 ID
[COUNT count]:可选参数,限制返回的消息数量
- 如果不指定,默认返回范围内的所有消息
- 指定后只返回最多 count 条消息
举例:查询stream中的所有消息
127.0.0.1:6379> xrange stream - +
1751425687108-0 #消息id,之后是消息体
name
zhangsan
age
15
sex
man
127.0.0.1:6379>
3、xrevrange
xrevrange命令用于按照消息 ID 的反向顺序(从大到小)查询 Stream 中的消息。它与 xrange命令功能相似但遍历方向相反,特别适合获取最新的消息。其命令格式如下:
xrevrange key end start [COUNT count]
key:要查询的 Stream 名称(必填)
end:结束消息 ID(必填)
- 特殊值
+
表示最大 ID(即 Stream 中的最新消息) - 可以指定具体的消息 ID(如
1526919030474-55
)
start:起始消息 ID(必填)
- 特殊值
-
表示最小 ID(即 Stream 中的第一条消息) - 可以指定具体的消息 ID
[COUNT count]:可选参数,限制返回的消息数量
- 如果不指定,默认返回范围内的所有消息
- 指定后只返回最多 count 条消息
举例:查询stream中的数据
127.0.0.1:6379> xrevrange stream + -
1751425687108-0
name
zhangsan
age
15
sex
man
127.0.0.1:6379>
4、xlen
xlen命令用于查询stream的长度,命令格式如下:
xlen key
xlen命令比较简单,举例:
127.0.0.1:6379> xlen stream
1
127.0.0.1:6379>
5、xread
xread是 Redis Stream 数据结构中用于读取消息的核心命令,支持阻塞和非阻塞模式,能够从一个或多个 Stream 中获取消息。其命令格式如下所示:
xread [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
[COUNT count]:可选参数,限制从每个 Stream 中返回的消息数量
[BLOCK milliseconds]:可选参数,设置阻塞模式及超时时间(毫秒)
- 设置为0表示无限期阻塞
- 不设置此参数则为非阻塞模式
STREAMS key [key ...]:指定要读取的一个或多个 Stream 名称
ID [ID ...]:指定每个 Stream 的起始消息ID
$
表示只读取最新消息(阻塞模式下常用)0
或0-0
表示从第一条消息开始读取- 指定具体ID则读取该ID之后的消息
举例:非阻塞监听
127.0.0.1:6379> xread streams stream 0-0 #从第一条消息开始读取
stream
1751425687108-0 #第一条消息id
name
zhangsan
age
15
sex
man
1751436694812-0 #第二条消息id
zhangsan
age
127.0.0.1:6379>
举例:阻塞监听
阻塞监听有个问题,它和pub/sub模式不一样的是,pub/sub模式下的subscribe
命令会持续监听,而xread命令监听到一条消息之后命令就结束了。

在实际使用中,为了能持续监听消费消息,而且不漏掉消息,需要记住消费的消息id,伪代码如下所示:
String lastId = "0-0"; // 初始从第一条消息开始
while(true) {
// 从上次消费的位置继续读取
Object msg = redis.execute("XREAD COUNT 10 BLOCK 5000 STREAMS users " + lastId);
if (msg == null) continue;
// 更新最后消费的ID
lastId = getLastIdFrom(msg);
handleMessage(msg);
}
6、xtrim
xtrim是 Redis Stream 数据结构中用于修剪(trim)流长度的命令,它可以限制 Stream 中保存的消息数量或基于消息ID进行裁剪。Stream的使用过程中,其消息长度会越来越大,应当定期使用xtrim命令对其进行修剪。
xtrim命令的完整格式如下所示:
XTRIM key MAXLEN|MINID [=|~] threshold [LIMIT count]
key:Stream 的名称(必填)
策略选择:
- MAXLEN:基于消息数量进行修剪
- MINID:基于消息ID进行修剪(Redis 6.2+)
[=|~]:精确或近似修剪模式
=
:精确修剪(默认)~
:近似修剪(提高性能)
threshold:阈值
- 对于 MAXLEN:保留的最大消息数量
- 对于 MINID:保留的最小消息ID(所有小于此ID的消息将被删除)
[LIMIT count]:每次迭代删除的消息数量限制(Redis 6.2+)
7、xdel
XDEL 是 Redis Stream 数据结构中用于删除指定消息的命令。其语法结构如下所示:
XDEL key ID [ID ...]
XDEL 命令会从指定的 Stream 中删除给定的消息ID,删除的消息会从内存中释放,如果被删除的消息存在于任何消费者组的 Pending Entries List (PEL) 中,它也会从这些PEL中移除,但不会影响消费者组的 last_delivered_id。
二、消费者组
前面介绍了xread命令已经能够从消息队列中读取消息了,但是xread命令有几个很大的缺陷:
- 无法记住已经消费的消息。它每次消费必须指定id;若是指定$,则实际上还是和发布订阅模式有一样的问题,若是客户端连接中断则有可能漏掉部分消息。
- 无ACK机制。
使用消费者组则能解决以上问题。消费者组消费消息的示意图如下所示:
每个消费者组都有若干个消费者,这些消费者协同消费消息。若是开启ACK,每个消费者接收到消息之后会将消息id放入自己维护的pending_ids列表中,表示还未ACK,待ACK后就将处理完的消息从pending_ids列表中移除。另外,每个消费者组还维护了last_deliverred_id表示最后一个投递给消费者的id,下次再消费则从此处开始。
如果某个消费者的pending_ids列表中有消息长时间没有ack,则可以将该消息重新转移给别的消费者让其重试解决。
1、xgroup
xgroup是 Redis Stream 中用于管理消费者组(Consumer Group)的核心命令,它提供了创建、配置和销毁消费者组的功能。其完整命令如下所示:
XGROUP
[CREATE key groupname ID|$ [MKSTREAM]]
[SETID key groupname ID|$]
[DESTROY key groupname]
[CREATECONSUMER key groupname consumername]
[DELCONSUMER key groupname consumername]
注意,上述命令中的ID|$
实际上是id-or-$
。
create子命令
create子命令用于创建消费者组,完整命令如下:
XGROUP [CREATE key groupname ID|$ [MKSTREAM]]
key: Stream 名称(必填)
groupname: 消费者组名称(必填)
id-or-$: 起始消息ID(必填)
$
表示只消费新消息(创建后添加的消息,当前Stream中的消息会全部忽略)0
或0-0
表示从第一条消息开始消费- 指定具体ID则从该ID之后开始消费
[MKSTREAM]: 可选参数,如果Stream不存在则自动创建
127.0.0.1:6379> xgroup create stream consumer_group 0-0 #创建consumer_group消费者组从头开始消费stream的消息
OK
127.0.0.1:6379>
setid子命令
setid子命令用于重置消费者组读取位置,实际上是修改了消费者组的 last_delivered_id
,完整命令如下:
XGROUP [SETID key groupname ID|$]
key: Stream 名称(必填)
groupname: 消费者组名称(必填)
id-or-$: 起始消息ID(必填)
$
表示只消费新消息(创建后添加的消息)0
或0-0
表示从第一条消息开始消费- 指定具体ID则从该ID之后开始消费
destroy子命令
destroy子命令用于删除消费者组,同时组内所有消费者也会被删除。完整命令如下所示:
XGROUP [DESTROY key groupname]
key: Stream 名称(必填)
groupname: 消费者组名称(必填)
createconsumer子命令
createconsumer子命令用于在现有消费者组中显式创建一个消费者,通常是不需要使用该命令显式创建消费者的,使用XREADGROUP
命令会自动创建消费者。createconsumer子命令如下所示:
XGROUP [CREATECONSUMER key groupname consumername]
key: Stream 名称(必填)
groupname: 消费者组名称(必填)
consumername:消费者名称
delconsumer子命令
delconsumer子命令用于从消费者组中删除指定的消费者。delconsumer子命令如下所示:
XGROUP [DELCONSUMER key groupname consumername]
key: Stream 名称(必填)
groupname: 消费者组名称(必填)
consumername:消费者名称
2、xreadgroup
xreadgroup是 Redis Stream 中用于消费者组(Consumer Group)模式读取消息的核心命令, 也是构建可靠消息系统的关键命令,通过消费者组模式实现了消息分发、负载均衡和消息确认机制,是Redis Stream最强大的特性之一。
该命令会自动过滤已经消费过的消息。
完整命令如下所示:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
GROUP group consumer:指定消费者组名称和消费者名称
group
:消费者组名称consumer
:消费者名称(自动创建不存在的消费者)
[COUNT count]:限制从每个 Stream 中返回的消息数量
[BLOCK milliseconds]:设置阻塞模式及超时时间(毫秒)
- 设置为0表示无限期阻塞
- 不设置此参数则为非阻塞模式
[NOACK]:不将消息添加到 PEL (Pending Entries List)
- 使用此参数后消息会被视为已确认,无需后续 XACK
- 适用于不需要消息确认的场景
STREAMS key [key ...]:指定要读取的一个或多个 Stream 名称
ID [ID ...]:指定每个 Stream 的起始消息ID
>
:特殊ID,表示只读取从未分发给该消费者的新消息0
或0-0
:从消费者组的 PEL (Pending Entries List) 中重新读取未确认的消息- 具体ID:从指定ID之后的消息开始读取
127.0.0.1:6379> xreadgroup group consumer_group kdyzm_consumer_1 block 0 streams stream >
stream
1751425687108-0
name
zhangsan
age
15
sex
man
1751436694812-0
zhangsan
age
1751438226276-0
name
zhangsan
1751438232352-0
name
zhangsan
1751438371472-0
name
zhangsan
127.0.0.1:6379>
3、xpending
xpending是 Redis Stream 数据结构中用于查询消费者组(Consumer Group)内待处理消息(Pending Entries List, PEL)的关键命令。其完整命令格式如下所示:
XPENDING key group [[IDLE min-idle-time] start end count [consumer]]
key: Stream 名称(必填)
group: 消费者组名称(必填)
[IDLE min-idle-time]: 筛选空闲时间超过指定毫秒数的消息
start end: 消息ID范围,支持特殊符号:
-
表示最小ID+
表示最大ID
count: 限制返回的消息数量
[consumer]: 指定只查看某个消费者的待处理消息
先看一个简单查询:
127.0.0.1:6379> xpending stream consumer_group
5 #共有五条消息未被确认
1751425687108-0 # 最小消息id
1751438371472-0 # 最大消息id
kdyzm_consumer_1 #具体消费者
5 #kdyzm_consumer_1共有五条消息未确认
简单查询结果是一个简化的统计结果,如果我们想查询所有的未ACK的消息列表,可以使用如下命令:
127.0.0.1:6379> xpending stream consumer_group - + 10
1751425687108-0 #消息id
kdyzm_consumer_1 #消费者名称:当前持有该消息但未确认的消费者
1304133 #空闲时间(毫秒):自该消息最后一次被交付给消费者后经过的时间(约 21.7 分钟)
1 #投递次数:该消息被读取的次数。若大于 1,说明消息曾被多次重试或转移
1751436694812-0
kdyzm_consumer_1
1304133
1
1751438226276-0
kdyzm_consumer_1
1304133
1
1751438232352-0
kdyzm_consumer_1
1304133
1
1751438371472-0
kdyzm_consumer_1
1304133
1
4、xack
xack是 Redis Stream 数据结构中用于确认消息处理完成的关键命令,它属于消费者组(Consumer Group)功能的一部分,在非消费者组消费消息的场景下无法使用该命令。该命令完整格式如下所示:
xack key group ID [ID ...]
key:Stream 的名称(必填)
group:消费者组名称(必填)
ID [ID ...]:一个或多个需要确认的消息ID(必填)
xack命令返回值是一个整数,表示返回成功确认的消息数量。
xack工作原理: 当消费者通过 XREADGROUP 读取消息后,这些消息会被添加到 Pending Entries List (PEL) 中,表示"已读取但未确认";XACK 命令会从消费者组的 PEL 中移除指定的消息ID,表示这些消息已被成功处理;确认后的消息会从 PEL 中删除,释放相关内存。
127.0.0.1:6379> xpending stream consumer_group - + 10 #查询出五条消息在PEL中
1751425687108-0
kdyzm_consumer_1
1671250
1
1751436694812-0
kdyzm_consumer_1
1671250
1
1751438226276-0
kdyzm_consumer_1
1671250
1
1751438232352-0
kdyzm_consumer_1
1671250
1
1751438371472-0
kdyzm_consumer_1
1671250
1
127.0.0.1:6379>
127.0.0.1:6379>
127.0.0.1:6379> xack stream consumer_group 1751425687108-0 1751436694812-0 1751438226276-0 #手动ACK三条
3
127.0.0.1:6379>
127.0.0.1:6379>
127.0.0.1:6379> xpending stream consumer_group - + 10 #再查还有两条消息在PEL中
1751438232352-0
kdyzm_consumer_1
1713880
1
1751438371472-0
kdyzm_consumer_1
1713880
1
127.0.0.1:6379>
5、xclaim
xclaim是 Redis Stream 消费者组功能中的一个重要命令,用于将待处理消息(Pending Entries List, PEL)的所有权从一个消费者转移到另一个消费者。该命令用于某些消息长时间未ACK需要消息重投的场景。命令的完整格式如下:
XCLAIM key group consumer min-idle-time ID [ID ...] [IDLE ms] [TIME ms-unix-time] [RETRYCOUNT count] [force] [justid]
key:Stream 的名称(必填)
group:消费者组名称(必填)
consumer:新的消费者名称(必填)
min-idle-time:消息最小空闲时间(毫秒),只有空闲时间超过此值的消息才会被转移
ID [ID ...]:一个或多个需要转移的消息ID(必填)
[IDLE ms]:设置消息的新空闲时间(毫秒),默认使用当前时间减去消息最后交付时间
[TIME ms-unix-time]:与IDLE相同,但使用绝对Unix时间戳(毫秒)
[RETRYCOUNT count]:设置消息的重试计数器,通常用于监控长时间未处理的消息
[force]:强制创建PEL条目,即使指定的ID尚未分配给任何消费者
[justid]:只返回成功转移的消息ID,不返回消息内容
127.0.0.1:6379> xpending stream consumer_group - + 10 #查询出两条在PEL中的消息都被kdyzm_consumer_1持有
1751438232352-0
kdyzm_consumer_1
1885915
1
1751438371472-0
kdyzm_consumer_1
1885915
1
127.0.0.1:6379> xclaim stream consumer_group kdyzm_consumer_2 0 1751438232352-0 #将第一条消息转移给kdyzm_consumer_2
1751438232352-0
name
zhangsan
127.0.0.1:6379> xpending stream consumer_group - + 10 #再次查询
1751438232352-0
kdyzm_consumer_2 #第一条消息被转移到了kdyzm_consuer_2
30169 #空闲时间被重置,从0开始重新计算
2 #投递次数变成了2
1751438371472-0
kdyzm_consumer_1
1984128
1
6、xinfo
xinfo是 Redis Stream 中用于获取流和消费者组相关信息的命令,它提供了多种子命令来查询 Stream 的内部状态。该命令的完整格式如下所示:
XINFO [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP]
consumers子命令
consumers子命令用于查询指定消费者组中所有消费者的详细信息。
XINFO CONSUMERS <key> <groupname>
key: Stream 名称(必填)
groupname: 消费者组名称(必填)
127.0.0.1:6379> xinfo consumers stream consumer_group
127.0.0.1:6379>
groups子命令
groups子命令用于查询指定Stream的所有消费者组信息。
XINFO GROUPS <key>
key: Stream 名称(必填)
127.0.0.1:6379> xinfo groups stream
name
consumer_group
consumers
0
pending
0
last-delivered-id
0-0
127.0.0.1:6379>
stream子命令
stream子命令用于查询指定Stream的信息,包括长度、最后生成id等。
XINFO STREAM <key> [FULL [COUNT <count>]
key: Stream 名称(必填)
127.0.0.1:6379> xinfo stream stream
length
5
radix-tree-keys
1
radix-tree-nodes
2
last-generated-id
1751438371472-0
groups
1
first-entry
1751425687108-0
name
zhangsan
age
15
sex
man
last-entry
1751438371472-0
name
zhangsan
127.0.0.1:6379>
help子命令
help子命令用于查询xinfo命令的使用帮助信息。其输出如下:
127.0.0.1:6379> xinfo help
XINFO <subcommand> [<arg> [value] [opt] ...]. Subcommands are:
CONSUMERS <key> <groupname>
Show consumers of <groupname>.
GROUPS <key>
Show the stream consumer groups.
STREAM <key> [FULL [COUNT <count>]
Show information about the stream.
HELP
Prints this help.
127.0.0.1:6379>
7、消费者组综合练习
使用xadd命令创建一个Stream:kdyzm_stream并添加一条消息:
127.0.0.1:6379> xadd kdyzm_stream * name kdyzm
1751469707635-0
查看stream的长度:
127.0.0.1:6379> xlen kdyzm_stream
1
创建一个消费者组kdyzm_consumer_group监听kdyzm_stream消息
127.0.0.1:6379> xgroup create kdyzm_stream kdyzm_consumer_group 0-0
OK
监听消息
127.0.0.1:6379> xreadgroup group kdyzm_consumer_group kdyzm_consumer_1 block 0 streams kdyzm_stream >
kdyzm_stream
1751469707635-0
name
kdyzm
使用>符号首次监听会接收到第一条消息。
使用xpending命令查看PEL中的数据
127.0.0.1:6379> xpending kdyzm_stream kdyzm_consumer_group - + 10
1751469707635-0
kdyzm_consumer_1
278229
3
使用xack命令确认下
127.0.0.1:6379> xack kdyzm_stream kdyzm_consumer_group 1751469707635-0
1
再次使用xpending命令查看PEL中的数据就已经看不到数据了。
接下来再创建一个消费者,共同监听kdyzm_stream
xreadgroup group kdyzm_consumer_group kdyzm_consumer_2 block 0 streams kdyzm_stream >
可以看到由于kdyzm_consume_1已经消费过第一条数据了,所以kdyzm_consumer_2就没有再重复消费,而是进行了一个阻塞行为。
接下来开三个窗口,两个消费者窗口,一个发送消息的窗口,演示Stream负载均衡发送和接受消息的场景。
第一个窗口运行命令:
xreadgroup group kdyzm_consumer_group kdyzm_consumer_1 block 0 streams kdyzm_stream >
使用kdyzm_consumer_group下的kdyzm_consumer_1消费者阻塞式监听kdyzm_stream
第二个窗口运行相似的命令,只是使用消费者kdyzm_consumer_2:
xreadgroup group kdyzm_consumer_group kdyzm_consumer_2 block 0 streams kdyzm_stream >
运行完两个命令,两个命令都会阻塞式监听,等待接收消息了。
第三个窗口发送命令发送消息数据
xadd kdyzm_stream * name zhangsan
然后查看消费者的行为:

可以看到Redis负载均衡将四次消息发送分别发送了给两个消费者各两次。
通过xpending命令能更直观的看到各个消费者未ack的情况:
127.0.0.1:6379> xpending kdyzm_stream kdyzm_consumer_group
4
1751521545678-0
1751521563585-0
kdyzm_consumer_1
2
kdyzm_consumer_2
2
127.0.0.1:6379> xpending kdyzm_stream kdyzm_consumer_group - + 100
1751521545678-0
kdyzm_consumer_1
677247
1
1751521551737-0
kdyzm_consumer_2
671188
1
1751521558157-0
kdyzm_consumer_1
664768
1
1751521563585-0
kdyzm_consumer_2
659340
1
接下来将这四个消息都ack掉:
127.0.0.1:6379> xack kdyzm_stream kdyzm_consumer_group 1751521545678-0 1751521551737-0 1751521558157-0 1751521563585-0
4
整个过程基本上就结束了。
END.
注意:本文归作者所有,未经作者允许,不得转载