本文目录
- 一、限流
- 二、漏桶
- 三、令牌桶算法
- 四、Gin框架中实现令牌桶限流
一、限流
限流又称为流量控制,也就是流控,通常是指限制到达系统的并发请求数。
限流虽然会影响部分用户的使用体验,但是能一定程度上保证系统的稳定性,不至于崩溃。
常见的各种厂商的公开API服务通常也会限制用户的请求次数,比如百度地图的API来限制请求数等。
二、漏桶
漏桶是一种比较常见的限流策略。
一句话来概括漏洞的核心就是:数据以任意速率进入漏桶,但是漏桶以固定速率(通常是预先设定的速率)将数据输出。
有点像削峰填谷,但是它缺点也很明显,就是不能很好处理有大量突发请求的场景。
import ("fmt""time""go.uber.org/ratelimit"
)func main() {rl := ratelimit.New(100) // per secondprev := time.Now()for i := 0; i < 10; i++ {now := rl.Take()fmt.Println(i, now.Sub(prev))prev = now}// Output:// 0 0// 1 10ms// 2 10ms// 3 10ms// 4 10ms// 5 10ms// 6 10ms// 7 10ms// 8 10ms// 9 10ms
}
上面你的代码是uber
团队开源的漏桶库ratelimit
,这个库比较简单。
ratelimit.New(100)
创建了一个速率限制器(ratelimit.Limiter),限制频率为每秒 100 次。这里的 100 表示每秒最多可以获取 100 次令牌(tokens),即每秒最多可以执行 100 次操作。
prev
是一个 time.Time
类型的变量,用于记录上一次操作的时间。初始值为当前时间。
rl.Take()
这是 ratelimit.Limiter
的一个方法,用于获取一个令牌。如果当前时间距离上一次获取令牌的时间小于 1/100 秒(因为每秒 100 次),Take() 方法会阻塞,直到下一个令牌可用。now 是获取令牌时的时间点(time.Time 类型)。
now.Sub(prev)
计算当前时间点 now 和上一次时间点 prev 之间的时间差(返回 time.Duration
类型)。这个时间差表示两次操作之间的时间间隔。
所以这段代码的目的是通过 ratelimit.New(100) 创建一个每秒最多执行 100 次操作的速率限制器,并在循环中模拟 10 次操作。每次操作之间的时间间隔会被打印出来,用于观察速率限制器是否按预期工作。
我们来看看这个限流器的实现源码。
限制器是一个接口类型,其要求实现一个Take()方法:
type Limiter interface {// Take方法应该阻塞已确保满足 RPSTake() time.Time
}
实现限制器接口的结构体定义如下,这里可以重点留意下maxSlack
字段,它在后面的Take()方法中的处理。
type limiter struct {sync.Mutex // 锁last time.Time // 上一次的时刻sleepFor time.Duration // 需要等待的时间perRequest time.Duration // 每次的时间间隔maxSlack time.Duration // 最大的富余量clock Clock // 时钟
}
limiter
结构体实现Limiter
接口的Take()
方法内容如下:
// Take 会阻塞确保两次请求之间的时间走完
// Take 调用平均数为 time.Second/rate.
func (t *limiter) Take() time.Time {t.Lock()defer t.Unlock()now := t.clock.Now()// 如果是第一次请求就直接放行if t.last.IsZero() {t.last = nowreturn t.last}// sleepFor 根据 perRequest 和上一次请求的时刻计算应该sleep的时间// 由于每次请求间隔的时间可能会超过perRequest, 所以这个数字可能为负数,并在多个请求之间累加t.sleepFor += t.perRequest - now.Sub(t.last)// 我们不应该让sleepFor负的太多,因为这意味着一个服务在短时间内慢了很多随后会得到更高的RPS。if t.sleepFor < t.maxSlack {t.sleepFor = t.maxSlack}// 如果 sleepFor 是正值那么就 sleepif t.sleepFor > 0 {t.clock.Sleep(t.sleepFor)t.last = now.Add(t.sleepFor)t.sleepFor = 0} else {t.last = now}return t.last
}
上面的代码根据记录每次请求的间隔时间和上一次请求的时刻来计算当次请求需要阻塞的时间——sleepFor
,这里需要留意的是sleepFor
的值可能为负,在经过间隔时间长的两次访问之后会导致随后大量的请求被放行,所以代码中针对这个场景有专门的优化处理。创建限制器的New()
函数中会为maxSlack
设置初始值,也可以通过WithoutSlack
这个Option
取消这个默认值。
func New(rate int, opts ...Option) Limiter {l := &limiter{perRequest: time.Second / time.Duration(rate),maxSlack: -10 * time.Second / time.Duration(rate),}for _, opt := range opts {opt(l)}if l.clock == nil {l.clock = clock.New()}return l
}
三、令牌桶算法
令牌桶按固定的速率往桶里放入令牌,并且只要能从桶里取出令牌就能通过,令牌桶支持突发流量的快速处理。
如果放令牌的速率很快,那么令牌桶里边有很多令牌了,这个时候当有大量的客户端请求过来了,那么就可以直接取出令牌处理请求。
也就是令牌以固定的速率(例如每秒生成若干个令牌)被添加到桶中,桶有一个最大容量,当桶满时,多余的令牌会被丢弃。
次发送数据(如网络请求或数据包)时,需要从桶中取出一定数量的令牌。如果桶中有足够的令牌,则允许发送数据,并从桶中扣除相应的令牌;如果令牌不足,则请求被拒绝或排队等待。
由于桶中的令牌可以积累,因此在短时间内允许突发流量(bursty traffic),只要平均速率符合限制。
对于从桶里取不到令牌的场景,我们可以选择等待也可以直接拒绝并返回。这个需要根据业务场景来处理,如果业务场景不同,那么相对应的处理也就需要不一样。
可以参照github上的开源库:github.com/juju/ratelimit库。
创建令牌桶的方法如下:
// 创建指定填充速率和容量大小的令牌桶
func NewBucket(fillInterval time.Duration, capacity int64) *Bucket// 创建指定填充速率、容量大小和每次填充的令牌数的令牌桶
func NewBucketWithQuantum(fillInterval time.Duration, capacity, quantum int64) *Bucket// 创建填充速度为指定速率和容量大小的令牌桶
// NewBucketWithRate(0.1, 200) 表示每秒填充20个令牌
func NewBucketWithRate(rate float64, capacity int64) *Bucket
取出令牌的方法如下:
// 取token(非阻塞)
func (tb *Bucket) Take(count int64) time.Duration
func (tb *Bucket) TakeAvailable(count int64) int64// 最多等maxWait时间取token
func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool)// 取token(阻塞)
func (tb *Bucket) Wait(count int64)
func (tb *Bucket) WaitMaxDuration(count int64, maxWait time.Duration) bool
(注意这个令牌桶跟JWT算法中的Token并不是一个概念。)
虽说是令牌桶但没有必要真的去生成令牌放到桶里,我们只需要每次来取令牌的时候计算一下,当前是否有足够的令牌就可以了,具体的计算方式可以总结为下面的公式:
当前令牌数 = 上一次剩余的令牌数 + (本次取令牌的时刻-上一次取令牌的时刻)/放置令牌的时间间隔 * 每次放置的令牌数
四、Gin框架中实现令牌桶限流
这里我们可以将限流组件定义成中间件,因为中间件是在所有请求的必经之路上。按照需要限流的策略将中间件添加到不同的地方,如果要对全站进行限流就可以添加成全局中间件。如果某一组路由需要限流,就添加到对应的路由组就行。
func RateLimitMiddleware(fillInterval time.Duration, cap int64) func(c *gin.Context) {bucket := ratelimit.NewBucket(fillInterval, cap)return func(c *gin.Context) {// 如果取不到令牌就中断本次请求返回 rate limit...if bucket.TakeAvailable(1) == 0 {c.String(http.StatusOK, "rate limit...")c.Abort()return}// 取到令牌就放行c.Next()}
}
cap
是填充速率,比如一秒钟填一个等。
然后我们在路由中生成即可,这样全网站就限速了。