咱们目前在作业中遇到一个功用问题,咱们有个定时使命需要处理大量的数据,为了提高吞吐量,所以布置了很多台机器,但这个使命在运转前需要从别的服务那拉取大量的数据,随着数据量的增大,假如一起多台机器并发拉取数据,会对下游服务产生非常大的压力。之前现已增加了单机限流,但无法解决问题,由于这个数据使命运转中只有不到10%的时刻拉取数据,假如单机限流限制太狠,虽然集群总的恳求量控制住了,但使命吞吐量又降下来。假如限流阈值太高,多机并发的时分,仍是有或许压垮下游。 所以目前仅有可行的解决方案便是分布式限流。
我目前是挑选直接运用Redisson库中的RRateLimiter完成了分布式限流,关于Redission或许很多人都有所耳闻,它其实是在Redis能力上构建的开发库,除了支持Redis的根底操作外,还封装了布隆过滤器、分布式锁、限流器……等东西。今天要说的RRateLimiter及时其完成的限流器。接下来本文将具体介绍下RRateLimiter的具体运用办法、完成原理还有一些注意事项,最终简略谈谈我对分布式限流底层原理的了解。
RRateLimiter运用
RRateLimiter的运用办法反常的简略,参数也不多。只要创建出RedissonClient,就能够从client中获取到RRateLimiter目标,直接看代码示例。
RedissonClient redissonClient = Redisson.create();
RRateLimiter rateLimiter = redissonClient.getRateLimiter("xindoo.limiter");
rateLimiter.trySetRate(RateType.OVERALL, 100, 1, RateIntervalUnit.HOURS);
rateLimiter.trySetRate便是设置限流参数,RateType有两种,OVERALL是大限制流 ,PER_CLIENT是单Client限流(能够以为便是单机限流),这里咱们只讨论大局模式。而后边三个参数的作用便是设置在多长时刻窗口内(rateInterval+IntervalUnit),答应总量不超过多少(rate),上面代码中我设置的值便是1小时内总答应数不超过100个。然后调用rateLimiter的tryAcquire()或许acquire()办法即可获取答应。
rateLimiter.acquire(1); // 申请1份答应,直到成功
boolean res = rateLimiter.tryAcquire(1, 5, TimeUnit.SECONDS); // 申请1份答应,假如5s内未申请到就抛弃
运用起来仍是很简略的嘛,以上代码中的两种办法都是同步调用,但Redisson还相同供给了异步办法acquireAsync()和tryAcquireAsync(),运用其回来的RFuture就能够异步获取答应。
RRateLimiter的完成
接下来咱们顺着tryAcquire()办法来看下它的完成办法,在RedissonRateLimiter类中,咱们能够看到最底层的tryAcquireAsync()办法。
private <T> RFuture<T> tryAcquireAsync(RedisCommand<T> command, Long value) {
byte[] random = new byte[8];
ThreadLocalRandom.current().nextBytes(random);
return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"——————————————————————————————————————"
+ "这里是一大段lua代码"
+ "____________________________________",
Arrays.asList(getRawName(), getValueName(), getClientValueName(), getPermitsName(), getClientPermitsName()),
value, System.currentTimeMillis(), random);
}
映入眼帘的便是一大段lua代码,其实这段Lua代码便是限流完成的核心,我把这段lua代码摘出来,并加了一些注释,咱们来具体看下。
local rate = redis.call("hget", KEYS[1], "rate") # 100
local interval = redis.call("hget", KEYS[1], "interval") # 3600000
local type = redis.call("hget", KEYS[1], "type") # 0
assert(rate ~= false and interval ~= false and type ~= false, "RateLimiter is not initialized")
local valueName = KEYS[2] # {xindoo.limiter}:value 用来存储剩下答应数量
local permitsName = KEYS[4] # {xindoo.limiter}:permits 记载了一切答应宣布的时刻戳
# 假如是单实例模式,name信息后边就需要拼接上clientId来区分出来了
if type == "1" then
valueName = KEYS[3] # {xindoo.limiter}:value:b474c7d5-862c-4be2-9656-f4011c269d54
permitsName = KEYS[5] # {xindoo.limiter}:permits:b474c7d5-862c-4be2-9656-f4011c269d54
end
# 对参数校验
assert(tonumber(rate) >= tonumber(ARGV[1]), "Requested permits amount could not exceed defined rate")
# 获取当时还有多少答应
local currentValue = redis.call("get", valueName)
local res
# 假如有记载当时还剩下多少答应
if currentValue ~= false then
# 收回已过期的答应数量
local expiredValues = redis.call("zrangebyscore", permitsName, 0, tonumber(ARGV[2]) - interval)
local released = 0
for i, v in ipairs(expiredValues) do
local random, permits = struct.unpack("Bc0I", v)
released = released + permits
end
# 清理已过期的答应记载
if released > 0 then
redis.call("zremrangebyscore", permitsName, 0, tonumber(ARGV[2]) - interval)
if tonumber(currentValue) + released > tonumber(rate) then
currentValue = tonumber(rate) - redis.call("zcard", permitsName)
else
currentValue = tonumber(currentValue) + released
end
redis.call("set", valueName, currentValue)
end
# ARGV permit timestamp random, random是一个随机的8字节
# 假如剩下答应不够,需要在res中回来下个答应需要等待多长时刻
if tonumber(currentValue) < tonumber(ARGV[1]) then
local firstValue = redis.call("zrange", permitsName, 0, 0, "withscores")
res = 3 + interval - (tonumber(ARGV[2]) - tonumber(firstValue[2]))
else
redis.call("zadd", permitsName, ARGV[2], struct.pack("Bc0I", string.len(ARGV[3]), ARGV[3], ARGV[1]))
# 减小可用答应量
redis.call("decrby", valueName, ARGV[1])
res = nil
end
else # 反之,记载到还有多少答应,说明是初次运用或许之前已记载的信息现已过期了,就将装备rate写进去,并减少答应数
redis.call("set", valueName, rate)
redis.call("zadd", permitsName, ARGV[2], struct.pack("Bc0I", string.len(ARGV[3]), ARGV[3], ARGV[1]))
redis.call("decrby", valueName, ARGV[1])
res = nil
end
local ttl = redis.call("pttl", KEYS[1])
# 重置
if ttl > 0 then
redis.call("pexpire", valueName, ttl)
redis.call("pexpire", permitsName, ttl)
end
return res
即便是加了注释,相信你仍是很难一下子看懂这段代码的,接下来我就以其在Redis中的数据存储形式,然辅以流程图让咱们彻底了解其完成完成原理。
首先用RRateLimiter有个name,在我代码中便是xindoo.limiter
,用这个作为KEY你就能够在Redis中找到一个map,里面存储了limiter的作业模式(type)、可数量(rate)、时刻窗口巨细(interval),这些都是在limiter创建时写入到的redis中的,在上面的lua代码中也运用到了。
其次还俩很重要的key,valueName和permitsName,其间在我的代码完成中valueName是{xindoo.limiter}:value
,它存储的是当时可用的答应数量。我代码中permitsName的具体值是{xindoo.limiter}:permits
,它是一个zset,其间存储了当时一切的答应授权记载(含有答应授权时刻戳),其间SCORE直接运用了时刻戳,而VALUE中包含了8字节的随机值和答应的数量,如下图:
{xindoo.limiter}:permits这个zset中存储了一切的历史授权记载,知道了这些信息,相信你也就了解了RRateLimiter的完成原理,咱们仍是将上面的那大段Lua代码的流程图绘制出来,整个执行的流程会更直观。
看到这咱们应该能了解这段Lua代码的逻辑了,能够看到Redis用了多个字段来存储限流的信息,也有各式各样的操作,那Redis是如何确保在分布式下这些限流信息数据的一致性的?答案是不需要确保,在这个场景下,信息天然便是一致性的。原因是Redis的单进程数据处理模型,在同一个Key下,一切的eval恳求都是串行的,一切不需要考虑数据并发操作的问题。在这里,Redisson也运用了HashTag,确保一切的限流信息都存储在同一个Redis实例上。
RRateLimiter运用时注意事项
了解了RRateLimiter的底层原理,再结合Redis自身的特性,我想到了RRateLimiter运用的几个限制点(问题点)。
RRateLimiter对错公正限流器
这个是我查阅材料得知,并且在自己代码实践的过程中也得到了验证,具体表现便是假如多个实例(机器)取竞争这些答应,很或许某些实例会获取到大部分,而另外一些实例可怜巴巴仅获取到少数的答应,也便是说容易呈现旱的旱死 涝的涝死的状况。在运用过程中,你就必须考虑你能否承受这种状况,假如不能承受就得考虑用某些办法尽或许让其变公正。
Rate不要设置太大
从RRateLimiter的完成原理你也看出了,它采用的是滑动窗口的模式来限流的,并且记载了一切的答应授权信息,所以假如你设置的Rate值过大,在Redis中存储的信息(permitsName对应的zset)也就越多,每次执行那段lua脚本的功用也就越差,这对Redis实例也是一种压力。个人主张假如你是想设置较大的限流阈值,倾向于小Rate+小时刻窗口的办法,并且这种设置办法恳求也会更均匀一些。
限流的上限取决于Redis单实例的功用
从原理上看,RRateLimiter在Redis上所存储的信息都必须在一个Redis实例上,所以它的限流QPS的上限便是Redis单实例的上限,比方你Redis实例便是1w QPS,你想用RRateLimiter完成一个2w QPS的限流器,必然完成不了。 那有没有突破Redis单实例功用上限的办法?单限流器肯定是完成不了的,咱们能够拆分多个限流器,比方我搞10个限流器,名词用不一样的,然后每台机器随机运用一个限流器限流,实践的流量不就被分散到不同的限流器上了吗,总的限流上线不也就上来了。
分布式限流的实质
分布式限流的实质实践上便是协同,协同的实质便是信息交流,信息交流最重要的的便是信息的准确性和一致性。 更简略粗暴了解,分布式限流的实质原理其实仍是分布式数据一致性的原理,而限流只是数据成果的一种决策。所以只要以任何办法能让信息同步,且确保信息的正确性就能够完成一个分布式限流器了,这便是我了解的实质思路。
其实从上面的RRateLimiter的完成原理也能够看出来,它不便是存储了一些信息吗! 那我不必Redis,而是运用mysql行不行。实践肯定是能够的,只要将的上面Lua代码中的一切操作都放到一个事务里,且事务的级别改成串行化,依旧能完成RRateLimiter相同的功用。假如你具有Mysql相关常识的话,肯定也能根据Mysql将RRateLimiter的API封装出来,但是封装出来的限流器,其限流的上限就取决于Mysql实例的功用上限。
最近chatGPT比较火,我也问了下它对分布式限流实质原理的了解,下面是它的回答,咱们觉得怎么样?
分布式限流的实质原理是经过在分布式体系中同享限流状况来限制体系中单位时刻内的恳求数量,然后避免体系因流量过大而崩溃。
这是经过运用一些同享的存储组件,如数据库,缓存,分布式锁等来完成的。在每次恳求时,体系会检查当时的恳求数量是否超过了预先设定的限制,假如超过了限制,恳求就会被回绝;假如未超过限制,恳求就会被允许。
经过运用分布式限流技能,体系能够在高并发状况下保持稳定的功用,并避免因流量过大而导致的体系崩溃。