lua实现分布式限流器

这一部分主要是整理一下使用redis实现限流器的功能。因为我们限流功能可能会涉及到多个redis语句使用,虽然redis是单线程的,但是在分布式环境下还是会出问题。所以这里我们使用了lua脚本执行redis语句保证操作的原子性。

固定窗口限流器

固定窗口限流器是最简单粗暴的一种限流器实现方法,其实就是在固定时间点之间只允许固定的访问次数,但是在临界时间点会出问题,在临界一秒内可能发送两倍固定次数的问题。

image.png

代码实现

主要思想就是使用incr key来一步步增加对应key的值,只要超过了那就超过访问次数了。如果是第一次访问,也就是incr key的时候返回1,这个时候我们就需要设置一下这个键的过期时间。需要注意,我们读取文件需要隔绝空字符,不然的话lua脚本会报错。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class LimitRequest {
@Autowired
JedisPool jedisPool;

public boolean limit(String keys) throws Exception {
Jedis jedis = jedisPool.getResource();
String lua = getLuaScript(jedis);
if((Long) jedis.eval(lua,1,keys,"5","10") == 1)
return false;

return true;

}

public String getLuaScript(Jedis jedis) throws Exception {
File lua = null;
FileInputStream inputStream = null;
String res = "";
try{
lua = ResourceUtils.getFile("classpath:script/limit.lua");
inputStream = new FileInputStream(lua);

byte[] buffer = new byte[1024 * 5];
int len = 0;
while((len = inputStream.read(buffer)) != -1){
res += new String(buffer,0,len);
}

}catch (Exception e){
e.printStackTrace();
}finally {
inputStream.close();
jedis.close();
}
return res;
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
local key_name = KEYS[1];
local times = tonumber(ARGV[1]); -- 将次数转换为数字类型
local expiration = tonumber(ARGV[2]); -- 将过期时间转换为数字类型

local res = redis.call("incr", key_name);

if res == 1 then
redis.call('expire', key_name, expiration);
return 1;
end

if res > times then
return 0;
end

return 1;

滑动窗口限流器

滑动窗口算法通常用于限流,即在给定时间窗口内限制事件(例如API请求)的数量。使用 Redis 实现滑动窗口算法是一种高效的解决方案,因为 Redis 提供了高性能的数据操作和内置的过期机制。但是滑动窗口限流器的缺点就是不能平滑的处理请求,如果某一时间请求过多,那么之后的请求都需要等待。

下面是一个使用 Redis 和 Lua 脚本实现的滑动窗口限流算法的基本解决方案:

解决方案的步骤

  1. 确定时间窗口大小(例如,60秒)和请求限制数量(例如,100个请求)。
  2. 使用 Redis 的有序集合(sorted set)存储每个请求的时间戳作为分数(score)。
  3. 当一个新的请求到来时,使用当前时间作为时间戳,添加到有序集合中。
  4. 移除时间窗口之前的所有请求记录(即,移除比当前时间戳早的记录)。
  5. 计算时间窗口内的请求数量,如果数量超过限制,则拒绝请求。
  6. 如果请求被接受,可以选择设置一个合理的过期时间来自动清理旧的数据,以防止集合无限增长。

Lua 脚本代码

以下是实现上述逻辑的 Lua 脚本示例,该脚本可以在 Redis 服务器上执行。这个脚本做了几件事情:清理旧的请求记录、添加新的请求、检查当前请求量是否超过限制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
-- Lua script for Redis to implement sliding window rate limiting
local key = KEYS[1] -- Redis key for the sorted set
local limit = tonumber(ARGV[1]) -- Request limit for the window
local windowSize = tonumber(ARGV[2]) -- Window size in seconds
local currentTime = tonumber(ARGV[3]) -- Current timestamp in milliseconds
local clearBefore = currentTime - windowSize * 1000 -- Calculate the timestamp to clear

-- Remove timestamps outside the current window
redis.call('ZREMRANGEBYSCORE', key, 0, clearBefore)

-- Add the current request timestamp to the sorted set with the current time as the score
redis.call('ZADD', key, currentTime, currentTime)

-- Count the number of requests in the current window
local count = redis.call('ZCOUNT', key, clearBefore, currentTime)

-- Set the sorted set to expire to avoid it growing indefinitely
redis.call('EXPIRE', key, windowSize)

-- Return whether the current request is allowed (1) or rate-limited (0)
if count <= limit then
return 1
else
return 0
end

java如何使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class LimitRequestByWindows {

@Autowired
JedisPool jedisPool;
public boolean limit(String keys) throws Exception {
Jedis jedis = jedisPool.getResource();
String lua = getLuaScript(jedis);
if((Long) jedis.eval(lua,1,keys,"5","10", String.valueOf(System.currentTimeMillis())) == 1)
return false;

return true;

}

public String getLuaScript(Jedis jedis) throws Exception {
File lua = null;
FileInputStream inputStream = null;
String res = "";
try{
lua = ResourceUtils.getFile("classpath:script/limit_windows.lua");
inputStream = new FileInputStream(lua);

byte[] buffer = new byte[1024 * 5];
int len = 0;
while((len = inputStream.read(buffer)) != -1){
res += new String(buffer,0,len);
}

}catch (Exception e){
e.printStackTrace();
}finally {
inputStream.close();
jedis.close();
}
return res;
}

}

漏斗限流器

漏斗容量有限,当流水的的速度小于灌水的速度,漏斗就会水满溢出,利用这个原理我们可以设计限流代码!漏斗的剩余的空间就代表着当前行为(请求)可以持续进行的数量,漏斗的流水速率代表系统允许行为(请求)发生的最大频率,通常安装系统的处理能力权衡后进行设值。特点是可以针对突发请求缓解服务器压力,但是不可能很好的处理图发请求

实现逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
-- 漏斗名 对应每个请求源
local funnel_key = KEYS[1]
-- 漏斗容量 最多多少请求
local capacity = tonumber(ARGV[1])
-- 漏斗流速 每毫秒能流出多少请求
local leak_rate = tonumber(ARGV[2])
-- 当前时间戳
local current_time = tonumber(ARGV[3])

-- 获取上一次流出的时间和上一次的水位
local funnel = redis.call('HMGET', funnel_key, 'last_time', 'water_height')
local last_time = tonumber(funnel[1])
local water_height = tonumber(funnel[2])

-- 如果是第一次请求那么就初始化
if last_time == nil then
last_time = current_time
water_height = 0
end

-- 计算时间差
local delta_time = current_time - last_time
-- 根据时间差计算上一次到现在能流出多少请求出来
local leaked_volume = delta_time * leak_rate
water_height = math.max(0, water_height - leaked_volume) -- Ensure water height does not go negative

-- 更新上一次流出时间
last_time = current_time

-- 更新桶 在这里是默认一个请求一个请求进入的
if water_height < capacity then
water_height = water_height + 1
-- 再次检查是否超出容量 这是当我们每次进入的请求如果不是1的情况下
if water_height <= capacity then
-- 更新hash
redis.call('HMSET', funnel_key, 'last_time', last_time, 'water_height', water_height)
return 1
end
end

-- 否则限流
return 0

这个脚本的逻辑是:

  1. 获取当前漏斗状态(上次更新时间和当前水高度)。
  2. 根据时间差和漏水速率计算自上次以来漏出的水量,更新当前水高度。
  3. 如果添加当前请求后漏斗的水高度不超过容量,请求被接受并更新漏斗状态;否则,请求被限流。

这样,当漏斗“容量”已满时,即水高度达到或超过容量时,新的请求将会被限流。这直接体现了漏斗算法中“超过漏斗容量则限流”的原则。

请注意,这个示例假设每个请求增加的水量(或占用的容量)是固定的,这里简化为1个单位。根据实际需求,这个增加量可以根据请求的类型或其他因素进行调整。

令牌桶限流器

实现逻辑

先有一个桶,容量是固定的,是用来放令牌的。

以固定速率向桶放令牌,如果桶满了就不放令牌了。

处理请求是先从桶拿令牌,先拿到令牌再处理请求,拿不到令牌同样也被限流了。

接下来的实现其实参考了这篇文章

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
--[[
1. key - 令牌桶的 key
2. intervalPerTokens - 生成令牌的间隔(ms)
3. curTime - 当前时间
4. initTokens - 令牌桶初始化的令牌数
5. bucketMaxTokens - 令牌桶的上限
6. resetBucketInterval - 重置桶内令牌的时间间隔
7. currentTokens - 当前桶内令牌数
8. bucket - 当前 key 的令牌桶对象
]] --

local key = KEYS[1]
local intervalPerTokens = tonumber(ARGV[1])
local curTime = tonumber(ARGV[2])
local initTokens = tonumber(ARGV[3])
local bucketMaxTokens = tonumber(ARGV[4])
local resetBucketInterval = tonumber(ARGV[5])

local bucket = redis.call('hgetall', key)
local currentTokens

-- 若当前桶未初始化,先初始化令牌桶
if table.maxn(bucket) == 0 then
-- 初始桶内令牌
currentTokens = initTokens
-- 设置桶最近的填充时间是当前
redis.call('hset', key, 'lastRefillTime', curTime)
-- 初始化令牌桶的过期时间, 设置为间隔的 1.5 倍
redis.call('pexpire', key, resetBucketInterval * 1.5)

-- 若桶已初始化,开始计算桶内令牌
-- 为什么等于 4 ? 因为有两对 field, 加起来长度是 4
-- { "lastRefillTime(上一次更新时间)","curTime(更新时间值)","tokensRemaining(当前保留的令牌)","令牌数" }
elseif table.maxn(bucket) == 4 then

-- 上次填充时间
local lastRefillTime = tonumber(bucket[2])
-- 剩余的令牌数
local tokensRemaining = tonumber(bucket[4])

-- 当前时间大于上次填充时间
if curTime > lastRefillTime then

-- 拿到当前时间与上次填充时间的时间间隔
-- 举例理解: curTime = 2620 , lastRefillTime = 2000, intervalSinceLast = 620
local intervalSinceLast = curTime - lastRefillTime

-- 如果当前时间间隔 大于 令牌的生成间隔
-- 举例理解: intervalSinceLast = 620, resetBucketInterval = 1000
if intervalSinceLast > resetBucketInterval then

-- 将当前令牌填充满
currentTokens = initTokens

-- 更新重新填充时间
redis.call('hset', key, 'lastRefillTime', curTime)

-- 如果当前时间间隔 小于 令牌的生成间隔
else

-- 可授予的令牌 = 向下取整数( 上次填充时间与当前时间的时间间隔 / 两个令牌许可之间的时间间隔 )
-- 举例理解 : intervalPerTokens = 200 ms , 令牌间隔时间为 200ms
-- intervalSinceLast = 620 ms , 当前距离上一个填充时间差为 620ms
-- grantedTokens = 620/200 = 3.1 = 3
local grantedTokens = math.floor(intervalSinceLast / intervalPerTokens)

-- 可授予的令牌 > 0 时
-- 举例理解 : grantedTokens = 620/200 = 3.1 = 3
if grantedTokens > 0 then

-- 生成的令牌 = 上次填充时间与当前时间的时间间隔 % 两个令牌许可之间的时间间隔
-- 举例理解 : padMillis = 620%200 = 20
-- curTime = 2620
-- curTime - padMillis = 2600
local padMillis = math.fmod(intervalSinceLast, intervalPerTokens)

-- 将当前令牌桶更新到上一次生成时间
redis.call('hset', key, 'lastRefillTime', curTime - padMillis)
end

-- 更新当前令牌桶中的令牌数
-- Math.min(根据时间生成的令牌数 + 剩下的令牌数, 桶的限制) => 超出桶最大令牌的就丢弃
currentTokens = math.min(grantedTokens + tokensRemaining, bucketMaxTokens)
end
else
-- 如果当前时间小于或等于上次更新的时间, 说明刚刚初始化, 当前令牌数量等于桶内令牌数
-- 不需要重新填充
currentTokens = tokensRemaining
end
end

-- 如果当前桶内令牌小于 0,抛出异常
assert(currentTokens >= 0)

-- 如果当前令牌 == 0 ,更新桶内令牌, 返回 0
if currentTokens == 0 then
redis.call('hset', key, 'tokensRemaining', currentTokens)
return 0
else
-- 如果当前令牌 大于 0, 更新当前桶内的令牌 -1 , 再返回当前桶内令牌数
redis.call('hset', key, 'tokensRemaining', currentTokens - 1)
return currentTokens
end

这个Lua脚本实现了一个令牌桶限流算法,主要用于控制对某些资源的访问速率,避免因过度请求而导致的系统过载。脚本通过Redis存储和更新令牌桶的状态,包括最后一次令牌填充时间和当前剩余令牌数。下面是这个脚本的详细流程解释:

脚本参数

  • key: Redis中用于存储令牌桶状态的键。
  • intervalPerTokens: 每个令牌生成的时间间隔(毫秒)。
  • curTime: 当前时间(通常以毫秒为单位)。
  • initTokens: 令牌桶初始化时的令牌数量。
  • bucketMaxTokens: 令牌桶的最大令牌数。
  • resetBucketInterval: 重置令牌桶内令牌数量的时间间隔。

脚本流程

  1. 初始化令牌桶:如果发现令牌桶(key)在Redis中未初始化(即table.maxn(bucket) == 0),脚本会设置初始的令牌数,并记录当前时间为最后一次填充令牌的时间。同时,设置令牌桶的过期时间为resetBucketInterval * 1.5,以自动清理长时间未使用的令牌桶。

  2. 计算新令牌:如果令牌桶已存在,脚本将计算自上次填充令牌以来经过的时间,并基于此时间和每个令牌生成的时间间隔(intervalPerTokens)计算应该添加到桶中的新令牌数量。

    • 如果当前时间距上次填充时间的间隔大于resetBucketInterval,则将令牌桶填满(即设置令牌数为initTokens)。
    • 否则,根据时间间隔计算应生成的令牌数,更新令牌桶中的令牌总数,但不超过桶的最大容量(bucketMaxTokens)。
  3. 令牌消耗:每次脚本执行都尝试消耗一个令牌。如果当前令牌桶中的令牌数大于0,则从桶中移除一个令牌,并允许请求通过(脚本返回当前剩余令牌数)。如果没有足够的令牌(即currentTokens == 0),则拒绝请求。

  4. 异常处理:脚本包含一个断言(assert(currentTokens >= 0)),确保令牌数不会变成负数。这是一个基本的健壮性检查。

  5. 令牌桶更新:在每次脚本执行的最后,更新Redis中的令牌桶状态,包括最后一次填充令牌的时间和剩余的令牌数。

漏斗限流器和令牌桶限流器的总结

漏斗算法(Leaky Bucket)和令牌桶算法(Token Bucket)都是常用的流量控制或限流算法,它们各有优势和劣势,适用于不同的场景。以下是对这两种算法的优缺点总结:

漏斗算法(Leaky Bucket)

优点:

  • 平滑流量:漏斗算法可以平滑网络流量,即使在输入流量不稳定的情况下,也能保证以恒定的速率流出,适合于需要稳定输出速率的场景。
  • 简单易实现:算法逻辑简单,容易理解和实现。

缺点:

  • 灵活性较低:由于其恒定的流出速率,漏斗算法对于需要应对突发流量的场景不够灵活。
  • 可能导致资源浪费:在流入速率低于流出速率的情况下,可能不会充分利用可用资源。

令牌桶算法(Token Bucket)

优点:

  • 允许突发流量:令牌桶算法允许一定程度的突发流量,因为当桶中有足够的令牌时,可以立即处理多个请求,适合于流量具有突发性的场景。
  • 高度灵活:通过调整令牌的生成速率和桶的容量,可以灵活控制允许的流量强度和突发量,适应不同的流量需求。
  • 支持不同优先级的流量控制:可以通过为不同类型的流量分配不同数量的令牌来实现优先级控制。

缺点:

  • 实现相对复杂:与漏斗算法相比,令牌桶算法在实现上可能更复杂,尤其是在需要考虑令牌的生成和消耗、桶的容量管理等方面。
  • 可能会有较短的流量突增:尽管令牌桶算法可以控制突发流量,但如果大量请求在令牌积累时同时到达,系统短时间内仍可能面临较高的负载。

总结

  • 如果需要处理具有较大突发性的流量或者需要对不同类型的流量实施优先级控制,令牌桶算法可能是更好的选择。
  • 如果场景要求流量输出稳定,或者实现简单性是首要考虑的因素,漏斗算法可能更为合适。

在选择适合的算法时,需要根据实际的应用场景和需求综合考虑。