Python和Redis的速率限制



在本文中,我们将介绍一些基于Python和Redis的速率限制算法,从最简单的实现到高级通用细胞速率算法(GCRA)。我们将使用redis-py



与Redis(pip install redis进行交互我建议克隆我的存储库以尝试查询约束。



时限



限制每个周期的请求数的第一种方法是使用限时算法:对于每个限制键(rate-key,唯一的东西,例如昵称或IP地址),计数器(最初设置限制值)和有效期分别存储(期间),随着收到请求而减少。



使用Python和Redis,可以按以下方式实现此算法:



  1. 检查约束键的存在。
  2. 如果存在,则使用限制值(Redis SETNX)和到期日期(Redis EXPIRE对其进行初始化
  3. 我们为每个后续请求(Redis DECRBY减小此值
  4. 仅当值小于零时才停止查询。
  5. 在指定的时间段后,密钥将自动删除。


from datetime import timedelta
from redis import Redis

def request_is_limited(r: Redis, key: str, limit: int, period: timedelta):
    if r.setnx(key, limit):
        r.expire(key, int(period.total_seconds()))
    bucket_val = r.get(key)
    if bucket_val and int(bucket_val) > 0:
        r.decrby(key, 1)
        return False
    return True


您可以在30秒内模拟20个请求的限制时看到此代码的工作方式(为了更清楚地说明,我将该函数放在了模块中)。



import redis
from datetime import timedelta
from ratelimit import time_bucketed

r = redis.Redis(host='localhost', port=6379, db=0)
requests = 25

for i in range(requests):
    if time_bucketed.request_is_limited(r, 'admin', 20, timedelta(seconds=30)):
        print ('Request is limited')
    else:
        print ('Request is allowed')


仅前20个请求将不受限制,在它们之后,您必须等待30秒,以便可以再次发送新请求。



这种方法的缺点是它不限制频率,而是限制用户在一定时间内发出的请求数。如果整个限制都被立即用尽,您将不得不等待一段时间的结束。



漏桶算法



有一种方法可以非常仔细地限制频率:这是当前的Bucket算法其工作原理与真正的漏水桶相同:我们将水(要求)倒入桶中的最边缘,而一部分水则不断地从底部的孔中流出(通常使用后台过程实现)。如果倒入水桶的水量大于倒出的水量,则将水桶装满,当水桶装满水时,不再接受新的请求。



算法的工作原理



您可以跳过此方法,以获得更优雅的解决方案,该解决方案不需要单独的过程来模拟泄漏:通用信元速率算法。



广义信元速率控制算法



GCRA是在电信行业创建的,被称为异步传输模式(ATM)。 ATM网络管理员使用它来延迟或丢弃到达速率高于给定限制的信元(固定大小的小数据包)。



GCRA使用每个请求的理论到达时间(TAT)跟踪剩余限制:



tat = current_time + period


如果到达时间小于当前的TAT,则限制下一个请求。如果将频率分为1个请求/周期,则此方法效果很好。但实际上,频率通常被计算为极限/周期。例如,如果频率为10个请求/ 60秒,则用户可以在前6秒内发出10个请求。并且以1次请求/ 6秒的频率,他将不得不在两次请求之间等待6秒。



为了能够在短时间内发送一组请求并在限制> 1的期间内保持其数量限制,每个请求必须除以期间/限制比率,然后下一个理论到达时间(new_tat)的计算将有所不同。让我们将请求的到达时间表示为t



  • new_tat = tat + period / limit如果请求已分组(t <= tat
  • new_tat = t + period / limit如果请求未分组(t > tat


因此:



new_tat = max(tat, t) + period / limit


如果请求new_tat大于当前时间和期间的总和,则该请求将受到限制new_tat > t + periodnew_tat = tat + period / limit我们得到tat + period / limit > t + period因此,只需要在时限制查询tat - t > period - period / limit



      period — period / limit
      <----------------------->
--|----------------------------|--->
  t                           TAT


在这种情况下,我们不需要更新TAT,因为有限的请求没有理论上的到达时间。



现在,让我们构建代码的最终版本!



from datetime import timedelta
from redis import Redis

def request_is_limited(r: Redis, key: str, limit: int, period: timedelta):
    period_in_seconds = int(period.total_seconds())
    t = r.time()[0]
    separation = round(period_in_seconds / limit)
    r.setnx(key, 0)
    tat = max(int(r.get(key)), t)
    if tat - t <= period_in_seconds - separation:
        new_tat = max(tat, t) + separation
        r.set(key, new_tat)
        return False
    return True


我们使用Redis TIME是因为GCRA与时间有关,因此我们需要确保当前时间在多个部署中保持一致(不同机器之间的时钟差异可能导致误报)。



此代码演示了10个请求/ 60秒时的GCRA性能。



import redis
from datetime import timedelta
from ratelimit import gcra

r = redis.Redis(host='localhost', port=6379, db=0)
requests = 10

for i in range(requests):
    if gcra.request_is_limited(r, 'admin', 10, timedelta(minutes=1)):
        print ('Request is limited')
    else:
        print ('Request is allowed')


该算法不限制前10个请求,但您必须等待至少6秒钟才能发出下一个请求。尝试在一段时间后运行脚本并更改限制和期限的值(例如limit = 1period=timedelta(seconds=6))。



为了保持实现的清洁,我没有在接收和发送呼叫之间添加锁。但是对于具有相同密钥且同时具有多个请求的请求,它是必需的。使用Lua,您可以将锁定添加为上下文管理器。



def request_is_limited(r: Redis, key: str, limit: int, period: timedelta):
    period_in_seconds = int(period.total_seconds())
    t = r.time()[0]
    separation = round(period_in_seconds / limit)
    r.setnx(key, 0)
    try:
        with r.lock('lock:' + key, blocking_timeout=5) as lock:
            tat = max(int(r.get(key)), t)
            if tat - t <= period_in_seconds - separation:
                new_tat = max(tat, t) + separation
                r.set(key, new_tat)
                return False
            return True
    except LockError:
        return True


完整的代码在GitHub上



All Articles