在本文中,我们将介绍一些基于Python和Redis的速率限制算法,从最简单的实现到高级通用细胞速率算法(GCRA)。我们将使用redis-py
与Redis(
pip install redis
)进行交互。我建议克隆我的存储库以尝试查询约束。
时限
限制每个周期的请求数的第一种方法是使用限时算法:对于每个限制键(rate-key,唯一的东西,例如昵称或IP地址),计数器(最初设置限制值)和有效期分别存储(期间),随着收到请求而减少。
使用Python和Redis,可以按以下方式实现此算法:
- 检查约束键的存在。
- 如果存在,则使用限制值(Redis SETNX)和到期日期(Redis EXPIRE)对其进行初始化。
- 我们为每个后续请求(Redis DECRBY)减小此值。
- 仅当值小于零时才停止查询。
- 在指定的时间段后,密钥将自动删除。
from datetime import timedelta
from redis import Redis
def request_is_limited(r: Redis, key: str, limit: int, period: timedelta):
if r.setnx(key, limit):
r.expire(key, int(period.total_seconds()))
bucket_val = r.get(key)
if bucket_val and int(bucket_val) > 0:
r.decrby(key, 1)
return False
return True
您可以在30秒内模拟20个请求的限制时看到此代码的工作方式(为了更清楚地说明,我将该函数放在了模块中)。
import redis
from datetime import timedelta
from ratelimit import time_bucketed
r = redis.Redis(host='localhost', port=6379, db=0)
requests = 25
for i in range(requests):
if time_bucketed.request_is_limited(r, 'admin', 20, timedelta(seconds=30)):
print ('Request is limited')
else:
print ('Request is allowed')
仅前20个请求将不受限制,在它们之后,您必须等待30秒,以便可以再次发送新请求。
这种方法的缺点是它不限制频率,而是限制用户在一定时间内发出的请求数。如果整个限制都被立即用尽,您将不得不等待一段时间的结束。
漏桶算法
有一种方法可以非常仔细地限制频率:这是当前的Bucket算法。其工作原理与真正的漏水桶相同:我们将水(要求)倒入桶中的最边缘,而一部分水则不断地从底部的孔中流出(通常使用后台过程实现)。如果倒入水桶的水量大于倒出的水量,则将水桶装满,当水桶装满水时,不再接受新的请求。
算法的工作原理
您可以跳过此方法,以获得更优雅的解决方案,该解决方案不需要单独的过程来模拟泄漏:通用信元速率算法。
广义信元速率控制算法
GCRA是在电信行业创建的,被称为异步传输模式(ATM)。 ATM网络管理员使用它来延迟或丢弃到达速率高于给定限制的信元(固定大小的小数据包)。
GCRA使用每个请求的理论到达时间(TAT)跟踪剩余限制:
tat = current_time + period
如果到达时间小于当前的TAT,则限制下一个请求。如果将频率分为1个请求/周期,则此方法效果很好。但实际上,频率通常被计算为极限/周期。例如,如果频率为10个请求/ 60秒,则用户可以在前6秒内发出10个请求。并且以1次请求/ 6秒的频率,他将不得不在两次请求之间等待6秒。
为了能够在短时间内发送一组请求并在限制> 1的期间内保持其数量限制,每个请求必须除以期间/限制比率,然后下一个理论到达时间(
new_tat
)的计算将有所不同。让我们将请求的到达时间表示为t
:
new_tat = tat + period / limit
如果请求已分组(t <= tat
)new_tat = t + period / limit
如果请求未分组(t > tat
)
因此:
new_tat = max(tat, t) + period / limit
如果请求
new_tat
大于当前时间和期间的总和,则该请求将受到限制new_tat > t + period
。当new_tat = tat + period / limit
我们得到tat + period / limit > t + period
。因此,只需要在时限制查询tat - t > period - period / limit
。
period — period / limit
<----------------------->
--|----------------------------|--->
t TAT
在这种情况下,我们不需要更新TAT,因为有限的请求没有理论上的到达时间。
现在,让我们构建代码的最终版本!
from datetime import timedelta
from redis import Redis
def request_is_limited(r: Redis, key: str, limit: int, period: timedelta):
period_in_seconds = int(period.total_seconds())
t = r.time()[0]
separation = round(period_in_seconds / limit)
r.setnx(key, 0)
tat = max(int(r.get(key)), t)
if tat - t <= period_in_seconds - separation:
new_tat = max(tat, t) + separation
r.set(key, new_tat)
return False
return True
我们使用Redis TIME是因为GCRA与时间有关,因此我们需要确保当前时间在多个部署中保持一致(不同机器之间的时钟差异可能导致误报)。
此代码演示了10个请求/ 60秒时的GCRA性能。
import redis
from datetime import timedelta
from ratelimit import gcra
r = redis.Redis(host='localhost', port=6379, db=0)
requests = 10
for i in range(requests):
if gcra.request_is_limited(r, 'admin', 10, timedelta(minutes=1)):
print ('Request is limited')
else:
print ('Request is allowed')
该算法不限制前10个请求,但您必须等待至少6秒钟才能发出下一个请求。尝试在一段时间后运行脚本并更改限制和期限的值(例如
limit = 1
和period=timedelta(seconds=6)
)。
为了保持实现的清洁,我没有在接收和发送呼叫之间添加锁。但是对于具有相同密钥且同时具有多个请求的请求,它是必需的。使用Lua,您可以将锁定添加为上下文管理器。
def request_is_limited(r: Redis, key: str, limit: int, period: timedelta):
period_in_seconds = int(period.total_seconds())
t = r.time()[0]
separation = round(period_in_seconds / limit)
r.setnx(key, 0)
try:
with r.lock('lock:' + key, blocking_timeout=5) as lock:
tat = max(int(r.get(key)), t)
if tat - t <= period_in_seconds - separation:
new_tat = max(tat, t) + separation
r.set(key, new_tat)
return False
return True
except LockError:
return True
完整的代码在GitHub上。