Redis(五):Redis Stream

Published on 2025-07-03 14:00 in 分类: 博客 with 狂盗一枝梅
分类: 博客

在之前的文章《Redis(二):Redis消息队列》中已经介绍过Redis中使用List以及发布订阅两种模式实现的消息队列,其中发布订阅模式比List模式功能更强大,但是有很大的缺陷:

  1. 消息没有持久化,如果消费者断线重连,消息会丢失
  2. 没有ack机制,无法保证消息被成功消费。
  3. Redis服务重启后消息会丢失。

Redis Stream是在5.0版本引入的新消息队列,它能持久化消息并且带ACK机制,甚至还有类似Kafka一样的ConsumerGroup(消费者组)协同消费消息,可以说完全弥补了发布订阅模式的所有缺点。

这篇文章单独把Redis Stream拿出来说,是因为我认为到目前为止,Redis Stream是Redis最复杂的类型了。Stream类型不像BitMap或者GEO操作底层存储还是五种基本数据类型之一,Stream类型的key使用type命令得到的类型就是"stream",它是一个独立的类型。

以下内容均基于Redis 6.2.1。

一、Stream基本命令

1、xadd

xadd命令用于向Stream中添加新消息,其完整命令如下所示:

XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID field value [field value ...]

该命令比较复杂,下面一部分一部分来看:

key: 指定要添加消息的 Stream 名称

NOMKSTREAM: 可选参数,如果指定,当 key 不存在时不会自动创建新的 Stream

长度限制参数:

  • MAXLEN [=|~] threshold:限制 Stream 的最大长度,超过时会删除旧消息
    • = 表示精确限制(性能较低)
    • ~ 表示近似限制(性能更高,默认)
  • MINID [=|~] threshold:只保留 ID 大于等于指定值的消息(Redis 6.2+)

LIMIT count:与 MAXLEN/MINID 配合使用,限制每次修剪的消息数量

消息ID

  • *:让 Redis 自动生成 ID(格式为<毫秒时间戳>-<序列号>,如1717020000000-5)
  • 手动指定 ID:必须大于 Stream 中已有最大 ID,否则会报错

field value [field value ...]:键值对形式的消息内容,可以包含多个字段

举例:向Stream新增一条数据:

127.0.0.1:6379> xadd stream * name zhangsan age 15 sex man
1751425687108-0  #返回消息id
127.0.0.1:6379> type stream
stream
127.0.0.1:6379> 

注意,这里添加了多个field value,但是在Stream中还是一个元素。

2、xrange

xrange用于按照消息 ID 的范围查询 Stream 中的消息。其具体命令格式如下:

xrange key start end [COUNT count]

key:要查询的 Stream 名称(必填)

start:起始消息 ID(必填)

  • 特殊值 - 表示最小 ID(即 Stream 中的第一条消息)
  • 可以指定具体的消息 ID(如 1526919030474-55

end:结束消息 ID(必填)

  • 特殊值 + 表示最大 ID(即 Stream 中的最新消息)
  • 可以指定具体的消息 ID

[COUNT count]:可选参数,限制返回的消息数量

  • 如果不指定,默认返回范围内的所有消息
  • 指定后只返回最多 count 条消息

举例:查询stream中的所有消息

127.0.0.1:6379> xrange stream - +
1751425687108-0   #消息id,之后是消息体
name
zhangsan
age
15
sex
man
127.0.0.1:6379> 

3、xrevrange

xrevrange命令用于按照消息 ID 的反向顺序(从大到小)查询 Stream 中的消息。它与 xrange命令功能相似但遍历方向相反,特别适合获取最新的消息。其命令格式如下:

xrevrange key end start [COUNT count]

key:要查询的 Stream 名称(必填)

end:结束消息 ID(必填)

  • 特殊值 + 表示最大 ID(即 Stream 中的最新消息)
  • 可以指定具体的消息 ID(如 1526919030474-55

start:起始消息 ID(必填)

  • 特殊值 - 表示最小 ID(即 Stream 中的第一条消息)
  • 可以指定具体的消息 ID

[COUNT count]:可选参数,限制返回的消息数量

  • 如果不指定,默认返回范围内的所有消息
  • 指定后只返回最多 count 条消息

举例:查询stream中的数据

127.0.0.1:6379> xrevrange stream + -
1751425687108-0
name
zhangsan
age
15
sex
man
127.0.0.1:6379> 

4、xlen

xlen命令用于查询stream的长度,命令格式如下:

xlen key

xlen命令比较简单,举例:

127.0.0.1:6379> xlen stream
1
127.0.0.1:6379>

5、xread

xread是 Redis Stream 数据结构中用于读取消息的核心命令,支持阻塞和非阻塞模式,能够从一个或多个 Stream 中获取消息。其命令格式如下所示:

xread [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

[COUNT count]:可选参数,限制从每个 Stream 中返回的消息数量

[BLOCK milliseconds]:可选参数,设置阻塞模式及超时时间(毫秒)

  • 设置为0表示无限期阻塞
  • 不设置此参数则为非阻塞模式

STREAMS key [key ...]:指定要读取的一个或多个 Stream 名称

ID [ID ...]:指定每个 Stream 的起始消息ID

  • $ 表示只读取最新消息(阻塞模式下常用)
  • 00-0 表示从第一条消息开始读取
  • 指定具体ID则读取该ID之后的消息

举例:非阻塞监听

127.0.0.1:6379> xread streams stream 0-0  #从第一条消息开始读取
stream
1751425687108-0 #第一条消息id
name
zhangsan
age
15
sex
man
1751436694812-0 #第二条消息id
zhangsan
age
127.0.0.1:6379> 

举例:阻塞监听

阻塞监听有个问题,它和pub/sub模式不一样的是,pub/sub模式下的subscribe命令会持续监听,而xread命令监听到一条消息之后命令就结束了。

redis的xread命令阻塞式案例

在实际使用中,为了能持续监听消费消息,而且不漏掉消息,需要记住消费的消息id,伪代码如下所示:

String lastId = "0-0"; // 初始从第一条消息开始
while(true) {
    // 从上次消费的位置继续读取
    Object msg = redis.execute("XREAD COUNT 10 BLOCK 5000 STREAMS users " + lastId);
    if (msg == null) continue;
    // 更新最后消费的ID
    lastId = getLastIdFrom(msg); 
    handleMessage(msg);
}

6、xtrim

xtrim是 Redis Stream 数据结构中用于修剪(trim)流长度的命令,它可以限制 Stream 中保存的消息数量或基于消息ID进行裁剪。Stream的使用过程中,其消息长度会越来越大,应当定期使用xtrim命令对其进行修剪。

xtrim命令的完整格式如下所示:

XTRIM key MAXLEN|MINID [=|~] threshold [LIMIT count]

key:Stream 的名称(必填)

策略选择

  • MAXLEN:基于消息数量进行修剪
  • MINID:基于消息ID进行修剪(Redis 6.2+)

[=|~]:精确或近似修剪模式

  • =:精确修剪(默认)
  • ~:近似修剪(提高性能)

threshold:阈值

  • 对于 MAXLEN:保留的最大消息数量
  • 对于 MINID:保留的最小消息ID(所有小于此ID的消息将被删除)

[LIMIT count]:每次迭代删除的消息数量限制(Redis 6.2+)

7、xdel

XDEL 是 Redis Stream 数据结构中用于删除指定消息的命令。其语法结构如下所示:

XDEL key ID [ID ...]

XDEL 命令会从指定的 Stream 中删除给定的消息ID,删除的消息会从内存中释放,如果被删除的消息存在于任何消费者组的 Pending Entries List (PEL) 中,它也会从这些PEL中移除,但不会影响消费者组的 last_delivered_id。

二、消费者组

前面介绍了xread命令已经能够从消息队列中读取消息了,但是xread命令有几个很大的缺陷:

  1. 无法记住已经消费的消息。它每次消费必须指定id;若是指定$,则实际上还是和发布订阅模式有一样的问题,若是客户端连接中断则有可能漏掉部分消息。
  2. 无ACK机制。

使用消费者组则能解决以上问题。消费者组消费消息的示意图如下所示:

Redis消费者组协同消费消息示意图

每个消费者组都有若干个消费者,这些消费者协同消费消息。若是开启ACK,每个消费者接收到消息之后会将消息id放入自己维护的pending_ids列表中,表示还未ACK,待ACK后就将处理完的消息从pending_ids列表中移除。另外,每个消费者组还维护了last_deliverred_id表示最后一个投递给消费者的id,下次再消费则从此处开始。

如果某个消费者的pending_ids列表中有消息长时间没有ack,则可以将该消息重新转移给别的消费者让其重试解决。

1、xgroup

xgroup是 Redis Stream 中用于管理消费者组(Consumer Group)的核心命令,它提供了创建、配置和销毁消费者组的功能。其完整命令如下所示:

XGROUP 
[CREATE key groupname ID|$ [MKSTREAM]] 
[SETID key groupname ID|$] 
[DESTROY key groupname] 
[CREATECONSUMER key groupname consumername] 
[DELCONSUMER key groupname consumername]

注意,上述命令中的ID|$实际上是id-or-$

create子命令

create子命令用于创建消费者组,完整命令如下:

XGROUP [CREATE key groupname ID|$ [MKSTREAM]] 

key: Stream 名称(必填)

groupname: 消费者组名称(必填)

id-or-$: 起始消息ID(必填)

  • $ 表示只消费新消息(创建后添加的消息,当前Stream中的消息会全部忽略)
  • 00-0 表示从第一条消息开始消费
  • 指定具体ID则从该ID之后开始消费

[MKSTREAM]: 可选参数,如果Stream不存在则自动创建

127.0.0.1:6379> xgroup create stream consumer_group 0-0 #创建consumer_group消费者组从头开始消费stream的消息
OK
127.0.0.1:6379>

setid子命令

setid子命令用于重置消费者组读取位置,实际上是修改了消费者组的 last_delivered_id,完整命令如下:

XGROUP [SETID key groupname ID|$] 

key: Stream 名称(必填)

groupname: 消费者组名称(必填)

id-or-$: 起始消息ID(必填)

  • $ 表示只消费新消息(创建后添加的消息)
  • 00-0 表示从第一条消息开始消费
  • 指定具体ID则从该ID之后开始消费

destroy子命令

destroy子命令用于删除消费者组,同时组内所有消费者也会被删除。完整命令如下所示:

XGROUP [DESTROY key groupname] 

key: Stream 名称(必填)

groupname: 消费者组名称(必填)

createconsumer子命令

createconsumer子命令用于在现有消费者组中显式创建一个消费者,通常是不需要使用该命令显式创建消费者的,使用XREADGROUP命令会自动创建消费者。createconsumer子命令如下所示:

XGROUP [CREATECONSUMER key groupname consumername] 

key: Stream 名称(必填)

groupname: 消费者组名称(必填)

consumername:消费者名称

delconsumer子命令

delconsumer子命令用于从消费者组中删除指定的消费者。delconsumer子命令如下所示:

XGROUP [DELCONSUMER key groupname consumername]

key: Stream 名称(必填)

groupname: 消费者组名称(必填)

consumername:消费者名称

2、xreadgroup

xreadgroup是 Redis Stream 中用于消费者组(Consumer Group)模式读取消息的核心命令, 也是构建可靠消息系统的关键命令,通过消费者组模式实现了消息分发、负载均衡和消息确认机制,是Redis Stream最强大的特性之一。

该命令会自动过滤已经消费过的消息。

完整命令如下所示:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]

GROUP group consumer:指定消费者组名称和消费者名称

  • group:消费者组名称
  • consumer:消费者名称(自动创建不存在的消费者)

[COUNT count]:限制从每个 Stream 中返回的消息数量

[BLOCK milliseconds]:设置阻塞模式及超时时间(毫秒)

  • 设置为0表示无限期阻塞
  • 不设置此参数则为非阻塞模式

[NOACK]:不将消息添加到 PEL (Pending Entries List)

  • 使用此参数后消息会被视为已确认,无需后续 XACK
  • 适用于不需要消息确认的场景

STREAMS key [key ...]:指定要读取的一个或多个 Stream 名称

ID [ID ...]:指定每个 Stream 的起始消息ID

  • >:特殊ID,表示只读取从未分发给该消费者的新消息
  • 00-0:从消费者组的 PEL (Pending Entries List) 中重新读取未确认的消息
  • 具体ID:从指定ID之后的消息开始读取
127.0.0.1:6379> xreadgroup group consumer_group kdyzm_consumer_1 block 0 streams stream >
stream
1751425687108-0
name
zhangsan
age
15
sex
man
1751436694812-0
zhangsan
age
1751438226276-0
name
zhangsan
1751438232352-0
name
zhangsan
1751438371472-0
name
zhangsan
127.0.0.1:6379>

3、xpending

xpending是 Redis Stream 数据结构中用于查询消费者组(Consumer Group)内待处理消息(Pending Entries List, PEL)的关键命令。其完整命令格式如下所示:

XPENDING key group [[IDLE min-idle-time] start end count [consumer]]

key: Stream 名称(必填)

group: 消费者组名称(必填)

[IDLE min-idle-time]: 筛选空闲时间超过指定毫秒数的消息

start end: 消息ID范围,支持特殊符号:

  • - 表示最小ID
  • + 表示最大ID

count: 限制返回的消息数量

[consumer]: 指定只查看某个消费者的待处理消息

先看一个简单查询:

127.0.0.1:6379> xpending stream consumer_group 
5 #共有五条消息未被确认
1751425687108-0   # 最小消息id
1751438371472-0		# 最大消息id
kdyzm_consumer_1	#具体消费者
5					#kdyzm_consumer_1共有五条消息未确认

简单查询结果是一个简化的统计结果,如果我们想查询所有的未ACK的消息列表,可以使用如下命令:

127.0.0.1:6379> xpending stream consumer_group - + 10
1751425687108-0     #消息id
kdyzm_consumer_1    #消费者名称:当前持有该消息但未确认的消费者
1304133             #空闲时间(毫秒):自该消息最后一次被交付给消费者后经过的时间(约 21.7 分钟)
1                   #投递次数:该消息被读取的次数。若大于 1,说明消息曾被多次重试或转移
1751436694812-0
kdyzm_consumer_1
1304133
1
1751438226276-0
kdyzm_consumer_1
1304133
1
1751438232352-0
kdyzm_consumer_1
1304133
1
1751438371472-0
kdyzm_consumer_1
1304133
1

4、xack

xack是 Redis Stream 数据结构中用于确认消息处理完成的关键命令,它属于消费者组(Consumer Group)功能的一部分,在非消费者组消费消息的场景下无法使用该命令。该命令完整格式如下所示:

xack key group ID [ID ...]

key:Stream 的名称(必填)

group:消费者组名称(必填)

ID [ID ...]:一个或多个需要确认的消息ID(必填)

xack命令返回值是一个整数,表示返回成功确认的消息数量。

xack工作原理: 当消费者通过 XREADGROUP 读取消息后,这些消息会被添加到 Pending Entries List (PEL) 中,表示"已读取但未确认";XACK 命令会从消费者组的 PEL 中移除指定的消息ID,表示这些消息已被成功处理;确认后的消息会从 PEL 中删除,释放相关内存。

127.0.0.1:6379> xpending stream consumer_group - + 10  #查询出五条消息在PEL中
1751425687108-0
kdyzm_consumer_1
1671250
1
1751436694812-0
kdyzm_consumer_1
1671250
1
1751438226276-0
kdyzm_consumer_1
1671250
1
1751438232352-0
kdyzm_consumer_1
1671250
1
1751438371472-0
kdyzm_consumer_1
1671250
1
127.0.0.1:6379> 
127.0.0.1:6379> 
127.0.0.1:6379> xack stream consumer_group 1751425687108-0 1751436694812-0 1751438226276-0 #手动ACK三条
3
127.0.0.1:6379> 
127.0.0.1:6379> 
127.0.0.1:6379> xpending stream consumer_group - + 10 #再查还有两条消息在PEL中
1751438232352-0
kdyzm_consumer_1
1713880
1
1751438371472-0
kdyzm_consumer_1
1713880
1
127.0.0.1:6379> 

5、xclaim

xclaim是 Redis Stream 消费者组功能中的一个重要命令,用于将待处理消息(Pending Entries List, PEL)的所有权从一个消费者转移到另一个消费者。该命令用于某些消息长时间未ACK需要消息重投的场景。命令的完整格式如下:

XCLAIM key group consumer min-idle-time ID [ID ...] [IDLE ms] [TIME ms-unix-time] [RETRYCOUNT count] [force] [justid]

key:Stream 的名称(必填)

group:消费者组名称(必填)

consumer:新的消费者名称(必填)

min-idle-time:消息最小空闲时间(毫秒),只有空闲时间超过此值的消息才会被转移

ID [ID ...]:一个或多个需要转移的消息ID(必填)

[IDLE ms]:设置消息的新空闲时间(毫秒),默认使用当前时间减去消息最后交付时间

[TIME ms-unix-time]:与IDLE相同,但使用绝对Unix时间戳(毫秒)

[RETRYCOUNT count]:设置消息的重试计数器,通常用于监控长时间未处理的消息

[force]:强制创建PEL条目,即使指定的ID尚未分配给任何消费者

[justid]:只返回成功转移的消息ID,不返回消息内容

127.0.0.1:6379> xpending stream consumer_group - + 10 #查询出两条在PEL中的消息都被kdyzm_consumer_1持有
1751438232352-0
kdyzm_consumer_1
1885915
1
1751438371472-0
kdyzm_consumer_1
1885915
1
127.0.0.1:6379> xclaim stream consumer_group kdyzm_consumer_2 0 1751438232352-0 #将第一条消息转移给kdyzm_consumer_2
1751438232352-0
name
zhangsan
127.0.0.1:6379> xpending stream consumer_group - + 10 #再次查询
1751438232352-0
kdyzm_consumer_2 #第一条消息被转移到了kdyzm_consuer_2
30169			#空闲时间被重置,从0开始重新计算
2				#投递次数变成了2
1751438371472-0
kdyzm_consumer_1
1984128
1

6、xinfo

xinfo是 Redis Stream 中用于获取流和消费者组相关信息的命令,它提供了多种子命令来查询 Stream 的内部状态。该命令的完整格式如下所示:

XINFO [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP]

consumers子命令

consumers子命令用于查询指定消费者组中所有消费者的详细信息。

XINFO CONSUMERS <key> <groupname>

key: Stream 名称(必填)

groupname: 消费者组名称(必填)

127.0.0.1:6379> xinfo consumers stream consumer_group

127.0.0.1:6379> 

groups子命令

groups子命令用于查询指定Stream的所有消费者组信息。

XINFO GROUPS <key>

key: Stream 名称(必填)

127.0.0.1:6379> xinfo groups stream
name
consumer_group
consumers
0
pending
0
last-delivered-id
0-0
127.0.0.1:6379> 

stream子命令

stream子命令用于查询指定Stream的信息,包括长度、最后生成id等。

XINFO STREAM <key> [FULL [COUNT <count>]

key: Stream 名称(必填)

127.0.0.1:6379> xinfo stream stream
length
5
radix-tree-keys
1
radix-tree-nodes
2
last-generated-id
1751438371472-0
groups
1
first-entry
1751425687108-0
name
zhangsan
age
15
sex
man
last-entry
1751438371472-0
name
zhangsan
127.0.0.1:6379> 

help子命令

help子命令用于查询xinfo命令的使用帮助信息。其输出如下:

127.0.0.1:6379> xinfo help
XINFO <subcommand> [<arg> [value] [opt] ...]. Subcommands are:
CONSUMERS <key> <groupname>
    Show consumers of <groupname>.
GROUPS <key>
    Show the stream consumer groups.
STREAM <key> [FULL [COUNT <count>]
    Show information about the stream.
HELP
    Prints this help.
127.0.0.1:6379> 

7、消费者组综合练习

使用xadd命令创建一个Stream:kdyzm_stream并添加一条消息:

127.0.0.1:6379> xadd kdyzm_stream * name kdyzm
1751469707635-0

查看stream的长度:

127.0.0.1:6379> xlen kdyzm_stream 
1

创建一个消费者组kdyzm_consumer_group监听kdyzm_stream消息

127.0.0.1:6379> xgroup create kdyzm_stream kdyzm_consumer_group 0-0
OK

监听消息

127.0.0.1:6379> xreadgroup group kdyzm_consumer_group kdyzm_consumer_1 block 0 streams kdyzm_stream >
kdyzm_stream
1751469707635-0
name
kdyzm

使用>符号首次监听会接收到第一条消息。

使用xpending命令查看PEL中的数据

127.0.0.1:6379> xpending kdyzm_stream kdyzm_consumer_group - + 10
1751469707635-0
kdyzm_consumer_1
278229
3

使用xack命令确认下

127.0.0.1:6379> xack kdyzm_stream kdyzm_consumer_group 1751469707635-0
1

再次使用xpending命令查看PEL中的数据就已经看不到数据了。

接下来再创建一个消费者,共同监听kdyzm_stream

xreadgroup group kdyzm_consumer_group kdyzm_consumer_2 block 0 streams kdyzm_stream >

可以看到由于kdyzm_consume_1已经消费过第一条数据了,所以kdyzm_consumer_2就没有再重复消费,而是进行了一个阻塞行为。

接下来开三个窗口,两个消费者窗口,一个发送消息的窗口,演示Stream负载均衡发送和接受消息的场景。

第一个窗口运行命令:

xreadgroup group kdyzm_consumer_group kdyzm_consumer_1 block 0 streams kdyzm_stream >

使用kdyzm_consumer_group下的kdyzm_consumer_1消费者阻塞式监听kdyzm_stream

第二个窗口运行相似的命令,只是使用消费者kdyzm_consumer_2:

xreadgroup group kdyzm_consumer_group kdyzm_consumer_2 block 0 streams kdyzm_stream >

运行完两个命令,两个命令都会阻塞式监听,等待接收消息了。

第三个窗口发送命令发送消息数据

xadd kdyzm_stream * name zhangsan

然后查看消费者的行为:

redis Stream负载均衡发送消息

可以看到Redis负载均衡将四次消息发送分别发送了给两个消费者各两次。

通过xpending命令能更直观的看到各个消费者未ack的情况:

127.0.0.1:6379> xpending kdyzm_stream kdyzm_consumer_group
4
1751521545678-0
1751521563585-0
kdyzm_consumer_1
2
kdyzm_consumer_2
2
127.0.0.1:6379> xpending kdyzm_stream kdyzm_consumer_group - + 100
1751521545678-0
kdyzm_consumer_1
677247
1
1751521551737-0
kdyzm_consumer_2
671188
1
1751521558157-0
kdyzm_consumer_1
664768
1
1751521563585-0
kdyzm_consumer_2
659340
1

接下来将这四个消息都ack掉:

127.0.0.1:6379> xack kdyzm_stream kdyzm_consumer_group 1751521545678-0 1751521551737-0 1751521558157-0 1751521563585-0
4

整个过程基本上就结束了。



END.
#redis
复制 复制成功