List
使用List这个数据类型,底层就是一个列表,头部/尾部操作元素的时间复杂度都是O(1)
,
生产者使用LPUSH发布消息,消费者使用RPOP获取消息
当然,使用RPUSH和LPOP也是可以的,只要保证生产者和消费者不会同时操作一个方向的元素即可
这里的问题是:如果队列中已经没有消息了,消费者在执行RPOP操作的时候,会返回NULL
而一般消费者的逻辑是不断地从队列中获取消息进行处理,如果此时队列为空,消费者依旧会频繁获取消息,造成CPU空转,不仅仅浪费CPU资源,还给Redis造成压力
for {result, err := redisCli.RPop(context.Background(), key).Result()if err == redis.Nil {continue}//处理业务逻辑}
解决方案:
- 当队列为空的时候,休眠一会,再尝试拉取消息
for {result, err := redisCli.RPop(context.Background(), key).Result()if err == redis.Nil {time.Sleep(time.Minute)continue}//处理业务逻辑}
优点是:简单的解决了CPU空转问题,提高了CPU的使用效率
缺点是:如果休眠的时候有消息需要处理,该消息必须要等待,会造成消息的延迟处理。对于实时性要求不强的场景,可以和业务方沟通获取到消息延迟和CPU空转的平衡点
2. 能否在阻塞等待的同时,还能立刻接收到新消息呢?可以使用Redis的BRPOP命令
for {result, err := redisCli.BRPop(context.Background(), time.Minute, key).Result()//处理业务逻辑}
BRPOP表示阻塞式拉取新消息,可以传入一个超时时间,如果设置为0,表示不设置超时,直到有新消息才返回;否则,超时时间后返回NULL
还有一个要注意的点就是:如果设置的超时时间太长,这个链接太久没有活跃,会被Redis服务器判定为无效链接,强制把这个客户端踢下线。所以,客户端需要准备重连机制
总结一下,这种队列模型的缺点如下:
-
不支持重复消费:当某个消费者获取到消息以后,这条消息就被删除了,无法被其他消费者消费
-
消息丢失:消费者获取到消息以后,如果宕机或是执行业务逻辑出错,这条消息就丢失了
发布/订阅模型:Pub/Sub
可以解决刚刚的第一个问题:不支持重复消费
发布/订阅模型是多组生产者/消费者的模式,使用Publish/Subscribe
命令
假如想开启多个消费者,同时消费同一批消息,可以使用该模型。
具体如下:
-
使用
Subscribe
命令,启动多个消费者,订阅同一个队列 -
此时,这些消费者都会被阻塞住,等待新消息的到来
-
启动一个生产者,发布一条消息
-
这些消费者会解除阻塞,收到生产者的新消息
而且,消费者还可以根据一定的规则去订阅生产者,即支持通配符匹配 ,对应命令PSubscribe
该模型最大的缺点就是会丢数据,如果发生消费者下线/Redis宕机/消息堆积,都会导致数据丢失。
这和实现方式有关系,Pub/Sub在实现的时候很简单,没有基于任何的数据类型,也没有做任何的数据存储,它只是为生产者和消费者建立了数据转发通道。当消费者订阅了指定的队列以后,Redis会记录一个从队列到消费者的映射关系,当生产者往这个队列里发布消息的时候,Redis就从映射关系里找到对应的消费者把消息转发过去。
-
如果消费者下线,在这期间生产者发布的消息,因为找不到消费者而被丢弃掉,消费者上线以后也不会重新发送
-
因为Pub/Sub没有对数据进行存储,也就不具备数据持久化的能力,不会被写入到AOF或RDB里,当Redis宕机以后,也就无法恢复数据
-
每个消费者订阅了一个队列以后,都会被分配一个缓冲区,生产者会把数据写入到该缓冲区上,消费者不断从缓冲区里读取消息。这里的缓冲区其实也就类似一块内存,内存的大小是有限的,如果消费者读取消息的速度过慢导致消息有积压,内存持续增长,如果超过了缓冲区配置的上限,Redis就会强制把这个消费者下线。具体的配置信息为
client-output-buffer-limit pubsub 32mb 8mb 60
,含义为 缓冲区一旦超过32MB,或是 超过8MB且持续60秒,直接强制把消费者下线
对比List而言,Pub/Sub相当于推模型,而List相当于拉模型
哨兵集群和Redis实例通信的时候,采用了Pub/Sub的方案,也就是即时通讯的业务场景
Stream
通过刚刚的两种模型可以总结出理想中的消息队列的特点
-
支持阻塞等待拉取消息
-
支持发布/订阅模式
-
可以重新消费
-
数据可持久化
-
消息可以堆积
Redis作者开发了Stream这个新的类型来支持以上功能
Stream通过XADD/XREAD
完成最简单的生产者-消费者模型,其中XADD
表示发布消息,XREAD
表示读取消息
生产者发布1条消息,其中 * 表示自动生成唯一的消息ID,格式是时间戳-自增序号
127.0.0.1:6379> XADD queue * name zs
"1729244432000-0"
消费者拉取消息,从开头读取5条消息,0-0表示从开头读取
XREAD COUNT 5 STREAMS queue 0-0
如果想要继续拉取消息,需要传入上一条消息的ID:
XREAD COUNT 5 STREAMS queue 1729244432000-0
没有消息的话,Redis会返回NULL
阻塞式
Stream也同样支持阻塞式拉取消息,增加BLOCK参数即可,这里的0表示超时时间,和BRPOP的参数同样含义
XREAD COUNT 5 BLOCK 0 STREAMS queue 1729244432000-0
发布订阅
Stream通过XGROUP/XREADGROUP
完成发布/订阅模式,其中XGROUP
表示创建消费者组,XREADGROUP
表示在指定消费组下,开启消费者拉取消息
重新消费
当一组消费者处理完消息后,需要用XACK
命令告知Redis把这条消息标记为已完成
如果消费者宕机,则不会发送XACK
,Redis依旧会保留这条消息;当消费者重新上线后,Redis会把之前没有处理成功的数据,重新发给这个消费者;这样即使消费者异常,也不会丢失数据了。
持久化
和其他数据类型一样,每个写操作,都会写入到RDB和AOF里
消息堆积
消息队列对消息堆积的解决方案一般有2个:
-
生产者限流
-
中间件丢弃旧消息,只保留最新消息
Stream采用的第二种,发布消息的时候可以使用MAXLEN
参数指定队列的最大长度
XADD queue MAXLEN 10000 * name zs
专业的消息队列的优点
一个专业的消息队列,必须要具备消息不丢和消息可堆积这两个模块
从消息队列来分析下怎么做才能做到消息不丢,消息队列无非三部分:生产者、中间件和消费者
-
生产者:发布消息的时候可能会发生的异常情况有
-
消息没发出去,因为网络故障或其他原因,中间件会直接返回失败
-
不确定是否发送成功,读取响应结果超时了,为了避免丢失,生产者也会继续重试
所以在消费者这里,必须要设计幂等逻辑,保证业务的正确性
-
-
消费者:消费者拿到消息后,还没处理完成,就异常宕机了。解决方法就是消费者处理完消息后通知中间件,也就是Kafka的
ACK
或Stream的XACK
-
中间件:Redis其实还是有可能丢消息
-
AOF持久化配置写盘的时机是每秒,但是写盘是异步的过程,Redis宕机时还是会数据丢失
-
主从复制也是异步的,主从切换的时候,如果从库还未完成主库发来的数据就被提成主库,那也会丢数据
专业的消息队列中间件通过部署集群来解决这个问题
如果Redis的数据都存储在内存里,一旦发生积压,就会导致Redis内存持续增长,超过机器上线后还会有OOM的风险,而Kafka等消息队列的数据都存储在磁盘上
-
总结一下:
如果业务场景简单,能够接受消息丢失,而且消息积压概率更小的情况下,可以采用Redis;否则,最好是用专业的消息队列