分布式限流器实现一般有计数器、滑动窗口、漏桶、令牌桶等实现算法。以下是 Go 的一些实现:
令牌桶
令牌桶按照一定速率将令牌放入桶中,桶有最大容量,桶满时,不再新增令牌。如果桶中令牌少于申请的令牌数量,则会被限流,否则申请令牌成功。令牌桶限制的是平均流入速率(允许突发请求,只要有令牌就可以处理,支持一次拿 N 个令牌),并允许一定程度突发流量。
Golang 实现
这里使用 Go 简单实现说明令牌桶原理:
type RateLimiter struct {
Capacity int64
LeakingRate float64
Tokens int64
LastLeakTime int64
}
func (rl *RateLimiter) Allow() bool {
now := time.Now().Unix()
supplyToken = rl.Tokens + (now - rl.LastLeakTime) * rl.LeakingRate
rl.Tokens = min(rl.Capacity, supplyToken)
rl.LastLeakTime = now
if rl.Tokens > 1 {
rl.Tokens--
return true
}
return false
}
Redis Lua 实现
在实际应用中,由于限流服务可能为多点服务,此时需要实现分布式的限流功能,仅用 Go 实现可能实现会非常复杂,那么需要选用外部组件将实现复杂性降低,一般会选用 Redis 或 ETCD 来将限流服务实现原子化。
以下为使用 Redis + Lua 实现限流器的示例,使用 Lua 脚本的目的是保持操作的原子性。另外,还有这样一种情况,需要限流的对象可能还会有二级分类,比如需要对服务限流,每个服务需要限制总体速率,服务中的接口也需定制单独的速率。
实现方案是在原始模型中增加子桶配置(本文假设所有接口拥有同样的最大令牌数和速率),每个单独子桶用额外的 Hash 存储当前令牌数量与上次取得令牌时间。
--- 获取令牌桶
local funnel = redis.call("HMGET", KEYS[1], 'capacity', 'leaking_rate', 'tokens', 'subfunnel_capacity', 'subfunnel_rate', 'last_leak_time')
--- 没有拿到限流配置,放行
if funnel[1] == nil then
return 0
end
local capacity = tonumber(funnel[1]) --- capacity 容量
local leaking_rate = tonumber(funnel[2]) --- leaking_rate 令牌桶添加令牌速率
local tokens = tonumber(funnel[3]) --- tokens 剩余容量
local subfunnel_capacity = tonumber(funnel[4]) --- subfunnel_capacity 子桶容量
local subfunnel_rate = tonumber(funnel[5]) --- subfunnel_rate 子桶添加令牌速率
local last_leak_time = tonumber(funnel[6]) --- last_leak_time 最后访问时间戳
--- 容量小于等于0,放行
if (capacity <= 0) then
return 0
end
local now = tonumber(ARGV[1]) --- 获取时间戳
local subfunnel_name = ARGV[2] --- 获取限流子桶Name
local req_token = tonumber(ARGV[3]) --- 请求获取令牌数量
--- 计算需要补充的令牌
local supply_token = (now - last_leak_time) * leaking_rate
if (supply_token > 0) then
last_leak_time = now
tokens = math.min(supply_token + tokens, capacity)
end
local result = 1 --- 默认不能取得令牌
--- 判断总桶
if (tokens >= req_token) then
result = 0
--- 判断子桶
if (string.len(subfunnel_name) > 0 and subfunnel_capacity > 0) then
local subkey = KEYS[1]..":subfunnel"
local free_field = subfunnel_name..':tokens'
local last_field = subfunnel_name..':last_leak_time'
local subfunnel = redis.pcall('HMGET', subkey, free_field, last_field)
local sub_free, sub_last = subfunnel_capacity, now
if (subfunnel[1]) then
sub_free = tonumber(subfunnel[1])
end
if (subfunnel[2]) then
sub_last = tonumber(subfunnel[2])
end
local sub_supply = (now - sub_last) * subfunnel_rate --- 计算子桶补充令牌数
if (sub_supply > 0) then
sub_last = now
sub_free = math.min(sub_supply + sub_free, subfunnel_capacity)
end
if (sub_free >= req_token) then
sub_free = sub_free - req_token
tokens = tokens - req_token
else
result = 2
end
redis.pcall('HMSET', subkey, free_field, sub_free, last_field, sub_last) --- 更新子桶
else
tokens = tokens - req_token
end
end
redis.call('HMSET', KEYS[1], "last_leak_time", last_leak_time, "tokens", tokens) --- 更新桶
return last_leak_time * 10 + result
返回值设置为 time * 10 + (enum return code)
,可同时返回请求时间和请求结果。为了更有效利用空间,可以设置 expire
释放长时间没有请求的桶。
以 Go 调用 Redis Lua Script 为例,需预先加载设定好的 Lua 脚本(NewScript
and Load
),然后在申请令牌时调用(据此实现 Allow
或 AllowN
方法),返回请求时间和请求结果。
func CheckFunnel(client *redis.Client, key string, subFunnel string, count int) (reqTime int64, isLimit bool, err error) {
reqTime = MakeTimestamp()
// applyToken is *redis.Script
res, err := applyToken.Run(client, []string{key}, reqTime, subFunnel, count).Int64()
if err != nil {
return reqTime, false, err
}
retCode := res % 10
isLimit = bool(retCode == 0)
switch retCode {
// case retCode and set err
case 0:
err = nil
}
return
}
func Allow(client *redis.Client, key string, subFunnel string) (isLimit bool) {
_, isLimit, _ = CheckFunnel(client, key, subFunnel, 1)
return
}
速率
初始化令牌桶需要将各项参数:capacity
、leaking_rate
以 Hash 形式存储在 Redis 内。根据 rate = capacity / interval
计算出统一单位的速率:MakeRate(100, time.Second)
。
func MakeRate(capacity int64, duration time.Duration) float64 {
return float64(capacity) * float64(time.Millisecond) / float64(duration) // to req/ms
}
请求令牌时需传入当前时间到 Lua 脚本,当前时间的表示方式也应该与速率的单位保持一致,如与上面的例子统一为 millisecond。
func MakeTimestamp() int64 {
return time.Now().UnixNano() / int64(time.Millisecond)
}
误差
网上其他相关令牌桶的实现,有的会使用了 math.floor
将需补充的令牌数量 supply_token
取整,这在一定场景下面将会有限流误差。比如时间单位为一个 interval
,假设发生了四次请求,每次请求都发生在 (now + 1/4 * interval)
,那么根据上面的算式,取整后可能会使补充的令牌为零,但理论上四次请求后,补充的令牌应该为一个单位 token。要消除这个误差,不应该使用 math.floor
取整,虽然 tokens 的值可能会为非整数,但由于判断条件只会判断 tokens 是否大于请求令牌数 N
时才会发放令牌,所以结果是合理的。
supply_token = 1/4 * interval * rate = 1/4 * token
More
在实际 Qos 应用中,还有两种更高级的桶算法:srTCM
(Single Rate Three Color Marker, 单速率三颜色标记, RFC2697)和 trTCM
(Two Rate Three Color Marker, 双速率三颜色标记, RFC2698)。