Redis 的消息队列方案
参考:
# 1. 消息队列的考验:Redis有哪些解决方案?
消息队列是分布式系统的基础软件,要能支持组件通信消息的快速读写。Redis 本身支持数据的高速访问,因此很多人关心一个问题:Redis 适合做 MQ 吗?
这个问题的背后,隐含着两方面的核心问题:
- 消息队列的消息存取需求是什么?
- Redis 如何实现消息队列的需求?
理解了 MQ 的特征和 Redis 提供的 MQ 方案,才能根据实际需求来选择出适合的 Redis 消息队列方案。
# 1.1 MQ 的消息存取需求
先介绍一下消息队列存取消息的过程。在分布式系统中,当两个组件要基于消息队列进行通信时,一个组件会把要处理的数据以消息的形式传递给消息队列,然后,这个组件就可以继续执行其他操作了;远端的另一个组件从消息队列中把消息读取出来,再在本地进行处理。
一个通用的 MQ 的架构模型为:
消息队列中发送消息的组件称为生产者,接收消息的组件称为消费者。
在使用消息队列时,消费者可以异步读取生产者消息,然后再进行处理。这样一来,即使生产者发送消息的速度远远超过了消费者处理消息的速度,生产者已经发送的消息也可以缓存在消息队列中,避免阻塞生产者,这是消息队列作为分布式组件通信的一大优势。
MQ 在存取消息时,必须要满足三个需求:
- 消息保序
- 处理重复的消息(幂等要求)
- 保证消息可靠性
# 需求一:消息保序
虽然消费者是异步处理消息,但是,消费者仍然需要按照生产者发送消息的顺序来处理消息,避免后发送的消息被先处理了。对于要求消息保序的场景来说,一旦出现这种消息被乱序处理的情况,就可能会导致业务逻辑被错误执行,从而给业务方造成损失。
我们来看一个更新商品库存的场景:假设生产者负责接收库存更新请求,消费者负责实际更新库存,现有库存量是10。生产者先后发送了消息1和消息2,消息1要把商品X的库存记录更新为5,消息2是把商品X库存更新为3。如果消息1和2在消息队列中无法保序,出现消息2早于消息1被处理的情况,那么,很显然,库存更新就出错了。这是业务应用无法接受的。
# 需求二:重复消息处理(幂等要求)
消费者从消息队列读取消息时,有时会因为网络堵塞而出现消息重传的情况。对于重复的消息,消费者如果多次处理的话,就可能造成一个业务逻辑被多次执行,如果业务逻辑正好是要修改数据,那就会出现数据被多次修改的问题了。
比如可能出现重复扣款的问题。
# 需求三:消息可靠性保证
消费者在处理消息的时候,还可能出现因为故障或宕机导致消息没有处理完成的情况。此时,消息队列需要能提供消息可靠性的保证,也就是说,当消费者重启后,可以重新读取消息再次进行处理,否则,就会出现消息漏处理的问题了。
Redis 的 List 和 Streams 两种数据类型,就可以满足消息队列的这三个需求。下面来看这两种方案。
# 1.2 基于 List 的 MQ 解决方案
# 1.2.1 解决消息保序问题
List 本身就是按先进先出的顺序对数据进行存取的,所以如果使用 List 作为消息队列保存消息的话,就已经能满足消息保序的需求了:
- 生产者使用 LPUSH 命令把消息写入 List
- 消费者使用 RPOP 命令读取消息
不过这样的话 List 不会主动告知消费者有新消息的写入,这样就需要消费者一直调用 RPOP 来监听消息(比如使用一个while(1)循环),这会带来不必要的性能损失。
为了解决这个问题,Redis 提供了 BRPOP 命令:阻塞式读取,客户端在没有读到队列数据时,自动阻塞,直到有新的数据写入队列,再开始读取新数据。这种方式相比消费者不停调用 RPOP 而言,更加节省 CPU 开销。
# 1.2.2 幂等性要求(处理重复消息)
解决消息重复问题,其实有一个要求:消费者程序本身能对重复消息进行判断:
- 一方面,消息队列要能给每一个消息提供全局唯一的 ID 号;
- 另一方面,消费者程序要把已经处理过的消息的 ID 号记录下来。
幂等性就是指,对于同一条消息,消费者收到一次的处理结果和收到多次的处理结果是一致的。
因为 List 本身不会为消息生成 ID 号,因此需要生产者在发送消息时附带上全局唯一 ID。例如,我们执行以下命令,就把一条全局 ID 为 101030001、库存量为 5 的消息插入了消息队列:
LPUSH mq "101030001:stock:5"
(integer) 1
2
# 1.2.3 消息可靠性保证
当消费者程序从 List 中读取一条消息后,List 就不会再留存这条消息了。所以,如果消费者程序在处理消息的过程出现了故障或宕机,就会导致消息没有处理完成,那么,消费者程序再次启动后,就没法再次从 List 中读取消息了。
为了留存消息,List 类型提供了 BRPOPLPUSH 命令,这个命令的作用是让消费者程序从一个 List 中读取消息,同时,Redis 会把这个消息再插入到另一个 List(可以叫作备份List)留存。这样一来,如果消费者程序读了消息但没能正常处理,等它重启后,就可以从备份 List 中重新读取消息并进行处理了。下图是使用 BRPOPLPUSH 命令留存消息,以及消费者再次读取消息的过程:
好了,到这里你可以看到,基于 List 类型,我们可以满足分布式组件对消息队列的三大需求。但是,在用 List 做消息队列时,我们还可能遇到过一个问题:生产者消息发送很快,而消费者处理消息的速度比较慢,这就导致 List 中的消息越积越多,给 Redis 的内存带来很大压力。这个时候,我们希望启动多个消费者程序组成一个消费组,一起分担处理 List 中的消息。但是,List 类型并不支持消费组的实现。那么,还有没有更合适的解决方案呢?这就要说到 Redis 从 5.0 版本开始提供的 Streams 数据类型了。
和 List 相比,Streams 同样能够满足消息队列的三大需求。而且,它还支持消费组形式的消息读取。接下来,我们就来了解下 Streams 的使用方法。
# 1.3 基于 Streams 的消息队列解决方案
Streams 是 Redis 专门为消息队列设计的数据类型,它提供了丰富的消息队列操作命令:
- XADD:插入消息,保证有序,可以自动生成全局唯一 ID;
- XREAD:用于读取消息,可以按 ID 读取数据;
- XREADGROUP:按消费组形式读取消息;
- XPENDING 和 XACK:XPENDING 命令可以用来查询每个消费组内所有消费者已读取但尚未确认的消息,而 XACK 命令用于向消息队列确认消息处理已完成。
# 1)XADD 命令
XADD 命令可以往消息队列中插入新消息,消息的格式是键-值对形式。对于插入的每一条消息,Streams 可以自动为其生成一个全局唯一的 ID。
比如说,我们执行下面的命令,就可以往名称为 mqstream 的消息队列中插入一条消息,消息的键是repo,值是5。其中,消息队列名称后面的 *
,表示让Redis为插入的数据自动生成一个全局唯一的ID,例如“1599203861727-0”。当然,我们也可以不用 *
,直接在消息队列名称后自行设定一个ID号,只要保证这个ID号是全局唯一的就行。不过,相比自行设定ID号,使用 *
会更加方便高效。
XADD mqstream * repo 5
"1599203861727-0"
2
可以看到,消息的全局唯一ID由两部分组成,第一部分“1599203861727”是数据插入时,以毫秒为单位计算的当前服务器时间,第二部分表示插入消息在当前毫秒内的消息序号,这是从0开始编号的。例如,“1599203861727-0”就表示在“1599203861727”毫秒内的第1条消息。
# 2)XREAD 命令
当消费者需要读取消息时,可以直接使用XREAD命令从消息队列中读取。XREAD 在读取消息时,可以指定一个消息 ID,并从这个消息 ID 的下一条消息开始进行读取。
例如,我们可以执行下面的命令,从 ID 号为 1599203861727-0 的消息开始,读取后续的所有消息(示例中一共 3 条):
XREAD BLOCK 100 STREAMS mqstream 1599203861727-0
1) 1) "mqstream"
2) 1) 1) "1599274912765-0"
2) 1) "repo"
2) "3"
2) 1) "1599274925823-0"
2) 1) "repo"
2) "2"
3) 1) "1599274927910-0"
2) 1) "repo"
2) "1"
2
3
4
5
6
7
8
9
10
11
另外,消费者也可以在调用 XRAED 时设定 block 配置项,实现类似于 BRPOP 的阻塞读取操作。当消息队列中没有消息时,一旦设置了 block 配置项,XREAD 就会阻塞,阻塞的时长可以在 block 配置项进行设置。
举个例子,我们来看一下下面的命令,其中,命令最后的“$”符号表示读取最新的消息,同时,我们设置了block 10000的配置项,10000的单位是毫秒,表明XREAD在读取最新消息时,如果没有消息到来,XREAD将阻塞10000毫秒(即10秒),然后再返回。下面命令中的XREAD执行后,消息队列mqstream中一直没有消息,所以,XREAD在10秒后返回空值(nil):
XREAD block 10000 streams mqstream $
(nil)
(10.00s)
2
3
刚刚讲到的这些操作是 List 也支持的,接下来,我们再来学习下 Streams 特有的功能。
# 3)XGROUP 与 XREADGROUP
Streams 本身可以使用 XGROUP 创建消费组,创建消费组之后,Streams 可以使用 XREADGROUP 命令让消费组内的消费者读取消息。
例如,我们执行下面的命令,创建一个名为 group1 的消费组,这个消费组消费的消息队列是 mqstream:
XGROUP create mqstream group1 0
OK
2
然后,我们再执行一段命令,让group1消费组里的消费者consumer1从mqstream中读取所有消息,其中,命令最后的参数“>”,表示从第一条尚未被消费的消息开始读取。因为在consumer1读取消息前,group1中没有其他消费者读取过消息,所以,consumer1就得到mqstream消息队列中的所有消息了(一共4条):
XREADGROUP group group1 consumer1 streams mqstream >
1) 1) "mqstream"
2) 1) 1) "1599203861727-0"
2) 1) "repo"
2) "5"
2) 1) "1599274912765-0"
2) 1) "repo"
2) "3"
3) 1) "1599274925823-0"
2) 1) "repo"
2) "2"
4) 1) "1599274927910-0"
2) 1) "repo"
2) "1"
2
3
4
5
6
7
8
9
10
11
12
13
14
需要注意的是,消息队列中的消息一旦被消费组里的一个消费者读取了,就不能再被该消费组内的其他消费者读取了。比如说,我们执行完刚才的XREADGROUP命令后,再执行下面的命令,让group1内的consumer2读取消息时,consumer2读到的就是空值,因为消息已经被consumer1读取完了,如下所示:
XREADGROUP group group1 consumer2 streams mqstream 0
1) 1) "mqstream"
2) (empty list or set)
2
3
使用消费组的目的是让组内的多个消费者共同分担读取消息,所以,我们通常会让每个消费者读取部分消息,从而实现消息读取负载在多个消费者间是均衡分布的。例如,我们执行下列命令,让 group2 中的 consumer1、2、3 各自读取一条消息。
XREADGROUP group group2 consumer1 count 1 streams mqstream >
1) 1) "mqstream"
2) 1) 1) "1599203861727-0"
2) 1) "repo"
2) "5"
XREADGROUP group group2 consumer2 count 1 streams mqstream >
1) 1) "mqstream"
2) 1) 1) "1599274912765-0"
2) 1) "repo"
2) "3"
XREADGROUP group group2 consumer3 count 1 streams mqstream >
1) 1) "mqstream"
2) 1) 1) "1599274925823-0"
2) 1) "repo"
2) "2"
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 4)XPENDING 和 XACK
为了保证消费者在发生故障或宕机再次重启后,仍然可以读取未处理完的消息,Streams 会自动使用内部队列(也称为 PENDING List)留存消费组里每个消费者读取的消息,直到消费者使用 XACK 命令通知Streams“消息已经处理完成”。如果消费者没有成功处理消息,它就不会给 Streams 发送 XACK 命令,消息仍然会留存。此时,消费者可以在重启后,用 XPENDING 命令查看已读取、但尚未确认处理完成的消息。
例如,我们来查看一下group2中各个消费者已读取、但尚未确认的消息个数。其中,XPENDING返回结果的第二、三行分别表示group2中所有消费者读取的消息最小ID和最大ID:
XPENDING mqstream group2
1) (integer) 3
2) "1599203861727-0"
3) "1599274925823-0"
4) 1) 1) "consumer1"
2) "1"
2) 1) "consumer2"
2) "1"
3) 1) "consumer3"
2) "1"
2
3
4
5
6
7
8
9
10
如果我们还需要进一步查看某个消费者具体读取了哪些数据,可以执行下面的命令:
XPENDING mqstream group2 - + 10 consumer2
1) 1) "1599274912765-0"
2) "consumer2"
3) (integer) 513336
4) (integer) 1
2
3
4
5
可以看到,consumer2 已读取的消息的 ID 是 1599274912765-0。
一旦消息 1599274912765-0 被 consumer2 处理了,consumer2 就可以使用 XACK 命令通知 Streams,然后这条消息就会被删除。当我们再使用 XPENDING 命令查看时,就可以看到,consumer2 已经没有已读取、但尚未确认处理的消息了。
XACK mqstream group2 1599274912765-0
(integer) 1
XPENDING mqstream group2 - + 10 consumer2
(empty list or set)
2
3
4
现在,我们就知道了用 Streams 实现消息队列的方法,我还想再强调下,Streams 是 Redis 5.0 专门针对消息队列场景设计的数据类型,如果你的 Redis 是5.0及5.0以后的版本,就可以考虑把 Streams 用作消息队列了。
# 1.4 小结
我们学习了分布式系统组件使用消息队列时的三大需求:消息保序、重复消息处理和消息可靠性保证,这三大需求可以进一步转换为对消息队列的三大要求:消息数据有序存取,消息数据具有全局唯一编号,以及消息数据在消费完成后被删除。
下图汇总了 List 和 Streams 实现消息队列的特点和区别:
其实,关于Redis是否适合做消息队列,业界一直是有争论的。很多人认为,要使用消息队列,就应该采用 Kafka、RabbitMQ 这些专门面向消息队列场景的软件,而 Redis 更加适合做缓存。
根据这些年做Redis研发工作的经验,我的看法是:Redis是一个非常轻量级的键值数据库,部署一个Redis实例就是启动一个进程,部署Redis集群,也就是部署多个Redis实例。而Kafka、RabbitMQ部署时,涉及额外的组件,例如Kafka的运行就需要再部署ZooKeeper。相比Redis来说,Kafka和RabbitMQ一般被认为是重量级的消息队列。
所以,关于是否用Redis做消息队列的问题,不能一概而论,我们需要考虑业务层面的数据体量,以及对性能、可靠性、可扩展性的需求。如果分布式系统中的组件消息通信量不大,那么,Redis只需要使用有限的内存空间就能满足消息存储的需求,而且,Redis的高性能特性能支持快速的消息读写,不失为消息队列的一个好的解决方案。
本节问题:如果一个生产者发送给消息队列的消息,需要被多个消费者进行读取和处理(例如,一个消息是一条从业务系统采集的数据,既要被消费者1读取并进行实时计算,也要被消费者2读取并留存到分布式文件系统HDFS中,以便后续进行历史查询),你会使用Redis的什么数据类型来解决这个问题呢?
可以使用Streams数据类型的消费组,同时消费生产者的数据,这是可以的。但是,需要注意,如果只是使用一个消费组的话,消费组内的多个消费者在消费消息时是互斥的,换句话说,在一个消费组内,一个消息只能被一个消费者消费。我们希望消息既要被消费者1读取,也要被消费者2读取,是一个多消费者的需求。所以,如果使用消费组模式,需要让消费者1和消费者2属于不同的消费组,这样它们就能同时消费了。
另外,Redis基于字典和链表数据结构,实现了发布和订阅功能,这个功能可以实现一个消息被多个消费者消费使用,可以满足问题中的场景需求。