分布式限流器之令牌桶实现

分布式限流器实现一般有计数器、滑动窗口、漏桶、令牌桶等实现算法。以下是 Go 的一些实现:

令牌桶

ratelimit-令牌桶

令牌桶按照一定速率将令牌放入桶中,桶有最大容量,桶满时,不再新增令牌。如果桶中令牌少于申请的令牌数量,则会被限流,否则申请令牌成功。令牌桶限制的是平均流入速率(允许突发请求,只要有令牌就可以处理,支持一次拿 N 个令牌),并允许一定程度突发流量。

Golang 实现

这里使用 Go 简单实现说明令牌桶原理:

type RateLimiter struct {
	Capacity int64
	LeakingRate float64
	Tokens int64
	LastLeakTime int64
}

func (rl *RateLimiter) Allow() bool {
	now := time.Now().Unix()

	supplyToken = rl.Tokens + (now - rl.LastLeakTime) * rl.LeakingRate
	rl.Tokens = min(rl.Capacity, supplyToken)
	rl.LastLeakTime = now

	if rl.Tokens > 1 {
			rl.Tokens--
			return true
	}
	return false
}

Redis Lua 实现

在实际应用中,由于限流服务可能为多点服务,此时需要实现分布式的限流功能,仅用 Go 实现可能实现会非常复杂,那么需要选用外部组件将实现复杂性降低,一般会选用 Redis 或 ETCD 来将限流服务实现原子化。

以下为使用 Redis + Lua 实现限流器的示例,使用 Lua 脚本的目的是保持操作的原子性。另外,还有这样一种情况,需要限流的对象可能还会有二级分类,比如需要对服务限流,每个服务需要限制总体速率,服务中的接口也需定制单独的速率。

ratelimit-service-api

实现方案是在原始模型中增加子桶配置(本文假设所有接口拥有同样的最大令牌数和速率),每个单独子桶用额外的 Hash 存储当前令牌数量与上次取得令牌时间。

--- 获取令牌桶
local funnel = redis.call("HMGET", KEYS[1], 'capacity', 'leaking_rate', 'tokens', 'subfunnel_capacity', 'subfunnel_rate', 'last_leak_time')

--- 没有拿到限流配置,放行
if funnel[1] == nil then
	return 0
end

local capacity = tonumber(funnel[1])            ---  capacity 容量
local leaking_rate = tonumber(funnel[2])        ---  leaking_rate 令牌桶添加令牌速率
local tokens = tonumber(funnel[3])              ---  tokens 剩余容量
local subfunnel_capacity = tonumber(funnel[4])  ---  subfunnel_capacity 子桶容量
local subfunnel_rate = tonumber(funnel[5])      ---  subfunnel_rate 子桶添加令牌速率
local last_leak_time = tonumber(funnel[6])      ---  last_leak_time 最后访问时间戳

--- 容量小于等于0,放行
if (capacity <= 0) then
	return 0
end

local now = tonumber(ARGV[1])              --- 获取时间戳
local subfunnel_name = ARGV[2]             --- 获取限流子桶Name
local req_token = tonumber(ARGV[3])        --- 请求获取令牌数量

--- 计算需要补充的令牌
local supply_token = (now - last_leak_time) * leaking_rate
if (supply_token > 0) then
	last_leak_time = now
	tokens = math.min(supply_token + tokens, capacity)
end

local result = 1 --- 默认不能取得令牌

--- 判断总桶
if (tokens >= req_token) then
	result = 0

--- 判断子桶
	if (string.len(subfunnel_name) > 0 and subfunnel_capacity > 0) then
		local subkey = KEYS[1]..":subfunnel"
		local free_field = subfunnel_name..':tokens'
		local last_field = subfunnel_name..':last_leak_time'

		local subfunnel = redis.pcall('HMGET', subkey, free_field, last_field)
		local sub_free, sub_last = subfunnel_capacity, now

		if (subfunnel[1]) then
			sub_free = tonumber(subfunnel[1])
		end
		if (subfunnel[2]) then
			sub_last = tonumber(subfunnel[2])
		end
		
		local sub_supply = (now - sub_last) * subfunnel_rate  --- 计算子桶补充令牌数
		if (sub_supply > 0) then
			sub_last = now
			sub_free = math.min(sub_supply + sub_free, subfunnel_capacity)
		end

		if (sub_free >= req_token) then
			sub_free = sub_free - req_token
			tokens = tokens - req_token
		else
			result = 2
		end

		redis.pcall('HMSET', subkey, free_field, sub_free, last_field, sub_last)  --- 更新子桶
	else
		tokens = tokens - req_token
	end
end

redis.call('HMSET', KEYS[1], "last_leak_time", last_leak_time, "tokens", tokens)  --- 更新桶

return last_leak_time * 10 + result

返回值设置为 time * 10 + (enum return code),可同时返回请求时间和请求结果。为了更有效利用空间,可以设置 expire 释放长时间没有请求的桶。

以 Go 调用 Redis Lua Script 为例,需预先加载设定好的 Lua 脚本(NewScript and Load),然后在申请令牌时调用(据此实现 AllowAllowN 方法),返回请求时间和请求结果。

func CheckFunnel(client *redis.Client, key string, subFunnel string, count int) (reqTime int64, isLimit bool, err error) {
	reqTime = MakeTimestamp()
	// applyToken is *redis.Script
	res, err := applyToken.Run(client, []string{key}, reqTime, subFunnel, count).Int64()

	if err != nil {
		return reqTime, false, err
	}

	retCode := res % 10
	isLimit = bool(retCode == 0)

	switch retCode {
	// case retCode and set err
	case 0:
		err = nil
	}

	return
}

func Allow(client *redis.Client, key string, subFunnel string) (isLimit bool) {
	_, isLimit, _ = CheckFunnel(client, key, subFunnel, 1)
	return
}
速率

初始化令牌桶需要将各项参数:capacityleaking_rate 以 Hash 形式存储在 Redis 内。根据 rate = capacity / interval 计算出统一单位的速率:MakeRate(100, time.Second)

func MakeRate(capacity int64, duration time.Duration) float64 {
	return float64(capacity) * float64(time.Millisecond) / float64(duration) // to req/ms
}

请求令牌时需传入当前时间到 Lua 脚本,当前时间的表示方式也应该与速率的单位保持一致,如与上面的例子统一为 millisecond。

func MakeTimestamp() int64 {
	return time.Now().UnixNano() / int64(time.Millisecond)
}
误差

网上其他相关令牌桶的实现,有的会使用了 math.floor 将需补充的令牌数量 supply_token 取整,这在一定场景下面将会有限流误差。比如时间单位为一个 interval,假设发生了四次请求,每次请求都发生在 (now + 1/4 * interval),那么根据上面的算式,取整后可能会使补充的令牌为零,但理论上四次请求后,补充的令牌应该为一个单位 token。要消除这个误差,不应该使用 math.floor 取整,虽然 tokens 的值可能会为非整数,但由于判断条件只会判断 tokens 是否大于请求令牌数 N 时才会发放令牌,所以结果是合理的。

supply_token = 1/4 * interval * rate = 1/4 * token

More

在实际 Qos 应用中,还有两种更高级的桶算法:srTCM(Single Rate Three Color Marker, 单速率三颜色标记, RFC2697)和 trTCM(Two Rate Three Color Marker, 双速率三颜色标记, RFC2698)。