Redisson分布式限流器
- 一、使用
- 1.1、方法
- 1.2、示例
- 二、原理
- 2.1、设置限流器
- 2.2、获取令牌
- 三、总结
最近有需求在做分布式限流,调研的限流框架大概有:
1、spring cloud gateway集成redis限流,但属于网关层限流
2、阿里Sentinel,功能强大、带监控平台
3、srping cloud hystrix,属于接口层限流,提供线程池与信号量两种方式
4、其他:redission、手撸代码
一、使用
1.1、方法
使用很简单、主要代码如下:
// 1、 声明一个限流器
RRateLimiter rateLimiter = redissonClient.getRateLimiter(key);
// 2、 设置速率,5秒中产生3个令牌
rateLimiter.trySetRate(RateType.OVERALL, 3, 5, RateIntervalUnit.SECONDS);
// 3、 试图获取一个令牌,获取到返回true
rateLimiter.tryAcquire(1)
- 创建限流器
/*** Returns rate limiter instance by name* * @param name of rate limiter* @return RateLimiter object*/
RRateLimiter getRateLimiter(String name);
- 设置限流参数
/*** Initializes RateLimiter's state and stores config to Redis server.* * @param mode - rate mode* @param rate - rate* @param rateInterval - rate time interval* @param rateIntervalUnit - rate time interval unit* @return true if rate was set and false otherwise*/
boolean trySetRate(RateType mode, long rate, long rateInterval, RateIntervalUnit rateIntervalUnit);
trySetRate
用于设置限流参数。其中 RateType 包含 OVERALL
和 PER_CLIENT
两个枚举常量,分别表示全局限流和单机限流。后面三个参数表明了令牌的生成速率,即每 rateInterval
生成 rate
个令牌,rateIntervalUnit
为 rateInterval
的时间单位。
- 获取令牌
/*** Acquires a specified permits from this RateLimiter, * blocking until one is available.** Acquires the given number of permits, if they are available * and returns immediately, reducing the number of available permits * by the given amount.* * @param permits the number of permits to acquire*/
void acquire(long permits);/*** Acquires the given number of permits only if all are available* within the given waiting time.** Acquires the given number of permits, if all are available and returns immediately,* with the value true, reducing the number of available permits by one.** If no permit is available then the current thread becomes* disabled for thread scheduling purposes and lies dormant until* the specified waiting time elapses.** If a permits is acquired then the value true is returned.** If the specified waiting time elapses then the value false* is returned. If the time is less than or equal to zero, the method* will not wait at all.** @param permits amount* @param timeout the maximum time to wait for a permit* @param unit the time unit of the timeout argument* @return true if a permit was acquired and false* if the waiting time elapsed before a permit was acquired*/
boolean tryAcquire(long permits, long timeout, TimeUnit unit);
acquire
和 tryAcquire
均可用于获取指定数量的令牌,不过 acquire
会阻塞等待,而 tryAcquire
会等待 timeout
时间,如果仍然没有获得指定数量的令牌直接返回 false
。
1.2、示例
<!-- 添加Redisson依赖 -->
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.XX.X</version> <!-- 替换为你使用的Redisson版本 -->
</dependency>
import org.redisson.Redisson;
import org.redisson.api.RRateLimiter;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;public class RedissonRateLimiterExample {public static void main(String[] args) {// 1. 配置RedissonClientConfig config = new Config();config.useSingleServer().setAddress("redis://127.0.0.1:6379");RedissonClient redisson = Redisson.create(config);// 2. 获取RRateLimiter对象RRateLimiter rateLimiter = redisson.getRateLimiter("myRateLimiter");// 3. 设置限流策略,例如每秒钟不超过10个请求rateLimiter.trySetRate(RateType.OVERALL, 10, 1, RateIntervalUnit.SECONDS);// 4. 尝试获取许可证,超时时间为500毫秒if (rateLimiter.tryAcquire(500, TimeUnit.MILLISECONDS)) {// 如果获取到许可证,执行业务逻辑System.out.println("许可证获取成功,执行业务逻辑");} else {// 如果未获取到许可证,执行拒绝策略System.out.println("许可证获取失败,业务逻辑拒绝执行");}// 5. 关闭Redisson客户端redisson.shutdown();}
}
在这个例子中,我们配置了一个名为myRateLimiter
的限流器,允许每秒钟10个请求的通过率。通过tryAcquire
方法尝试获取许可证,如果在指定的时间内获取到许可证,则执行业务逻辑,否则执行拒绝策略。最后,不要忘了关闭Redisson
客户端以释放资源。
二、原理
Redisson 的 RRateLimiter
基于令牌桶实现,令牌桶的主要特点如下:
- 令牌以固定速率生成。
- 生成的令牌放入令牌桶中存放,如果令牌桶满了则多余的令牌会直接丢弃,当请求到达时,会尝试从令牌桶中取令牌,取到了令牌的请求可以执行。
- 如果桶空了,那么尝试取令牌的请求会被直接丢弃。
2.1、设置限流器
trySetRate()
方法,通过下面 Lua 脚本设置限流器的相关参数,底层源码如下:
@Overridepublic RFuture<Boolean> trySetRateAsync(RateType type, long rate, long rateInterval, RateIntervalUnit unit) {return commandExecutor.evalWriteNoRetryAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"redis.call('hsetnx', KEYS[1], 'rate', ARGV[1]);"+ "redis.call('hsetnx', KEYS[1], 'interval', ARGV[2]);"+ "return redis.call('hsetnx', KEYS[1], 'type', ARGV[3]);",Collections.singletonList(getRawName()), rate, unit.toMillis(rateInterval), type.ordinal());}
举个例子,更容易理解:
比如下面这段代码,5秒中产生3个令牌,并且所有实例共享(RateType.OVERALL所有实例共享、RateType.CLIENT单实例端共享)
trySetRate(RateType.OVERALL, 3, 5, RateIntervalUnit.SECONDS);
那么redis中就会设置3个参数:
hsetnx,key,rate,3hsetnx,key,interval,5hsetnx,key,type,0
2.2、获取令牌
tryAcquire(1)
方法,获取令牌则是通过以下的 Lua 脚本实现的,底层源码如下:
-- 速率
local rate = redis.call("hget", KEYS[1], "rate")
-- 时间区间(ms)
local interval = redis.call("hget", KEYS[1], "interval")
local type = redis.call("hget", KEYS[1], "type")
assert(rate ~= false and interval ~= false and type ~= false, "RateLimiter is not initialized")-- {name}:value 分析后面的代码,这个key记录的是当前令牌桶中的令牌数
local valueName = KEYS[2]-- {name}:permits 这个key是一个zset,记录了请求的令牌数,score则为请求的时间戳
local permitsName = KEYS[4]-- 单机限流才会用到,集群模式不用关注
if type == "1" thenvalueName = KEYS[3]permitsName = KEYS[5]
end-- 原版本有bug(https://github.com/redisson/redisson/issues/3197),最新版将这行代码提前了
-- rate为1 arg1这里是 请求的令牌数量(默认是1)。rate必须比请求的令牌数大
assert(tonumber(rate) >= tonumber(ARGV[1]), "Requested permits amount could not exceed defined rate")-- 第一次执行这里应该是null,会进到else分支
-- 第二次执行到这里由于else分支中已经放了valueName的值进去,所以第二次会进if分支
local currentValue = redis.call("get", valueName)
if currentValue ~= false then-- 从第一次设的zset中取数据,范围是0 ~ (第二次请求时间戳 - 令牌生产的时间)-- 可以看到,如果第二次请求时间距离第一次请求时间很短(小于令牌产生的时间),那么这个差值将小于上一次请求的时间,取出来的将会是空列表。反之,能取出之前的请求信息-- 这里作者将这个取出来的数据命名为expiredValues,可认为指的是过期的数据local expiredValues = redis.call("zrangebyscore", permitsName, 0, tonumber(ARGV[2]) - interval)local released = 0-- lua迭代器,遍历expiredValues,如果有值,那么released等于之前所有请求的令牌数之和,表示应该释放多少令牌for i, v in ipairs(expiredValues) dolocal random, permits = struct.unpack("fI", v)released = released + permitsend-- 没有过期请求的话,released还是0,这个if不会进,有过期请求才会进if released > 0 then-- 移除zset中所有元素,重置周期redis.call("zrem", permitsName, unpack(expiredValues))currentValue = tonumber(currentValue) + releasedredis.call("set", valueName, currentValue)end-- 这里简单分析下上面这段代码:-- 1. 只有超过了1个令牌生产周期后的请求,expiredValues才会有值。-- 2. 以rate为3举例,如果之前发生了两个请求那么现在released为2,currentValue为1 + 2 = 3-- 以此可以看到,redisson的令牌桶放令牌操作是通过请求时间窗来做的,如果距离上一个请求的时间已经超过了一个令牌生产周期时间,那么令牌桶中的令牌应该得到重置,表示生产rate数量的令牌。-- 如果当前令牌数 < 请求的令牌数if tonumber(currentValue) < tonumber(ARGV[1]) then-- 从zset中找到距离当前时间最近的那个请求,也就是上一次放进去的请求信息local nearest = redis.call('zrangebyscore', permitsName, '(' .. (tonumber(ARGV[2]) - interval), tonumber(ARGV[2]), 'withscores', 'limit', 0, 1); local random, permits = struct.unpack("fI", nearest[1])-- 返回 上一次请求的时间戳 - (当前时间戳 - 令牌生成的时间间隔) 这个值表示还需要多久才能生产出足够的令牌return tonumber(nearest[2]) - (tonumber(ARGV[2]) - interval)else-- 如果当前令牌数 ≥ 请求的令牌数,表示令牌够多,更新zsetredis.call("zadd", permitsName, ARGV[2], struct.pack("fI", ARGV[3], ARGV[1]))-- valueName存的是当前总令牌数,-1表示取走一个redis.call("decrby", valueName, ARGV[1])return nilend
else-- set一个key-value数据 记录当前限流器的令牌数redis.call("set", valueName, rate)-- 建了一个以当前限流器名称相关的zset,并存入 以score为当前时间戳,以lua格式化字符串{当前时间戳为种子的随机数、请求的令牌数}为value的值。-- struct.pack第一个参数表示格式字符串,f是浮点数、I是长整数。所以这个格式字符串表示的是把一个浮点数和长整数拼起来的结构体。我的理解就是往zset里记录了最后一次请求的时间戳和请求的令牌数redis.call("zadd", permitsName, ARGV[2], struct.pack("fI", ARGV[3], ARGV[1]))-- 从总共的令牌数 减去 请求的令牌数。redis.call("decrby", valueName, ARGV[1])return nil
end
总结一下,redisson用了zset
来记录请求的信息,这样可以非常巧妙的通过比较score,也就是请求的时间戳,来判断当前请求距离上一个请求有没有超过一个令牌生产周期。如果超过了,则说明令牌桶中的令牌需要生产,之前用掉了多少个就生产多少个,而之前用掉了多少个令牌的信息也在zset中保存了。
然后比较当前令牌桶中令牌的数量,如果足够多就返回了,如果不够多则返回到下一个令牌生产还需要多少时间。这个返回值特别重要。
三、总结
redission
分布式限流采用令牌桶思想和固定时间窗口,trySetRate
方法设置桶的大小,利用redis key
过期机制达到时间窗口目的,控制固定时间窗口内允许通过的请求量。
随着互联网的快速发展,高并发场景下的性能问题愈发凸显。特别是在处理大量数据的定时任务中,如何防止多机并发对下游服务造成过大压力成为了一个亟待解决的问题。传统的单机限流方法虽然能够在一定程度上控制请求量,但在分布式环境下往往无法达到预期效果。这时,Redisson
分布式限流的出现为我们提供了一种新的解决方案。
Redisson
是一个在Redis
能力上构建的开发库,它不仅支持Redis
的基础操作,还封装了布隆过滤器、分布式锁、限流器等工具。其中,RRateLimiter
是Redisson
实现分布式限流的核心组件。它基于Redis
的分布式特性,能够在多台机器之间协同工作,实现对请求量的精确控制。
那么,Redisson分布式限流的实现原理是什么呢?
首先,我们需要了解Redisson
分布式限流的核心思想:通过Redis
的原子操作来保证多个节点之间的限流策略一致。RRateLimiter
内部使用了Redis
的Lua
脚本功能,确保了在执行限流操作时的原子性。
其次,RRateLimiter
采用了滑动窗口算法来计算请求速率。在滑动窗口内,RRateLimiter
会记录通过的请求数量,并根据窗口大小和时间间隔来判断是否超出限流阈值。当请求速率超过阈值时,后续请求将被拒绝,从而实现限流效果。
此外,Redisson
还提供了丰富的限流策略,如固定窗口限流、漏桶算法限流等。这些策略可以根据具体场景进行灵活选择,以满足不同业务需求。
在实际应用中我们可以通过以下步骤来使用Redisson
实现分布式限流:
- 引入
Redisson
依赖:首先,我们需要在项目中引入Redisson
的依赖,以便能够使用其提供的分布式限流功能。 - 创建
Redisson
客户端:然后,我们需要创建一个Redisson
客户端实例,用于与Redis
服务器进行通信。 - 配置
RRateLimiter
:接下来,我们需要配置RRateLimiter
的参数,包括限流阈值、时间窗口大小等。这些参数将决定限流策略的具体实施。 - 使用
RRateLimiter
进行限流:最后,在业务代码中,我们可以使用RRateLimiter
实例来进行限流操作。当接收到请求时,先通过RRateLimiter
判断是否超出限流阈值,若超出则拒绝请求,否则继续处理。
通过以上步骤,我们可以轻松地在分布式环境下实现高效的限流功能。Redisson
分布式限流不仅解决了多机并发对下游服务造成过大压力的问题,还提供了灵活的限流策略和丰富的功能支持。在实际应用中,我们可以根据具体需求选择合适的限流策略,并结合Redisson
的其他功能来提升系统的性能和稳定性。
总之,Redisson
分布式限流是一种高效、灵活且易于实现的解决方案,对于处理高并发场景下的性能瓶颈问题具有重要意义。通过深入了解其实现原理和应用方法,我们可以更好地利用Redisson
库来优化系统性能,提升用户体验。
参考地址:
https://www.cnblogs.com/guoziyi/p/18266627
https://github.com/oneone1995/blog/issues/13
https://blog.csdn.net/qq_43686863/article/details/135634098