一、简介

   该限流器基于令牌桶算法实现,特点如下:
   1.接入方便,无业务侵入:接入只需要添加一行代码r.pass()。
   2.线程安全,CPU友好。
   3.轻量,核心代码200行。
   github地址:https://github.com/YukangLiu/RateLimiter

二、令牌桶算法简述

   有一个装着令牌(token)的桶,它按照qps的速率补充令牌,满了则溢出。当一个请求到来时,它需要先从桶中取出一个令牌,才能继续进行后续操作。所以简单来说,令牌桶算法是基于令牌的补给速率限制qps。

三、使用

   先来看看如何使用,其使用非常简单,先创建一个限流器对象,然后在需要限流的地方调用pass()方法即可,示例如下:

int main()
{
    RateLimiter r(100);//100qps限流器

    for(int i = 0; i < 500; ++i)
    {
        r.pass();//通过的速率为100/s
    }

    return 0;
}

   这个for循环执行完需要5s。

四、实现

//限流器
class RateLimiter 
{
public:
    //qps限制最大为十亿
    RateLimiter(int64_t qps);
    
    DISALLOW_COPY_MOVE_AND_ASSIGN(RateLimiter);

    //对外接口,能返回说明流量在限定值内
    void pass();

private:
    //获得当前时间,单位ns
    int64_t now();

    //更新令牌桶中的令牌
    void supplyTokens();

    //尝试获得令牌
    bool tryGetToken();

    //必定成功获得令牌
    void mustGetToken();

    //令牌桶大小
    const int64_t bucketSize_;

    //剩下的token数
    AtomicSequence tokenLeft_;

    //补充令牌的单位时间
    const int64_t supplyUnitTime_;

    //上次补充令牌的时间,单位纳秒
    int64_t lastAddTokenTime_;

    //自旋锁
    Spinlock lock_;
};

   该类主要维护的是令牌桶的容量、当前桶中还存在的令牌数、上次补充令牌的时间这三个变量。其中AtomicSequence就是对原子变量的一个封装,主要为了防止false sharing,其具体设计可以看我另一篇博客Disruptor原理概述与轻量级C++实现
   具体实现如下:

int64_t RateLimiter::now()
{
    struct timeval tv;
    ::gettimeofday(&tv, 0);
    int64_t seconds = tv.tv_sec;
    return seconds * NS_PER_SECOND + tv.tv_usec * NS_PER_USECOND;
}

   上述方法用于获取当前时间的纳秒数。

void RateLimiter::supplyTokens()
{
    auto cur = now();
    if (cur - lastAddTokenTime_ < supplyUnitTime_)
    {
        return;
    }

    {
        SpinlockGuard lock(lock_);
        //等待自旋锁期间可能已经补充过令牌了
        int64_t newTokens = (cur - lastAddTokenTime_) / supplyUnitTime_;
        if (newTokens <= 0)
        {
            return;
        }
      
        //更新补充时间,不能直接=cur,否则会导致时间丢失
        lastAddTokenTime_ += (newTokens * supplyUnitTime_);
      
        auto freeRoom = bucketSize_ - tokenLeft_.load();
        if(newTokens > freeRoom || newTokens > bucketSize_)
        {
            newTokens = freeRoom > bucketSize_ ? bucketSize_ : freeRoom;
        }
      
        tokenLeft_.fetch_add(newTokens);
    }
    
}

   上述方法用于补充令牌:计算上次补充到当前过了多久,然后根据qps补充相应数量的令牌,最后更新补充令牌的时间。这里有一个细节就是更新的时间并不是当前的时间,而是补充相应令牌数对应需要经过的时间。比如1s要补充1个令牌,本次补充距上次过了1.8s,此时只补充1个令牌,补充令牌的时间只比上次增加1s而不是增加1.8s。
   另外,这里用到了自旋锁,该自旋锁是用原子变量模拟信号量来实现的,不会让进程睡眠,这里不多介绍,有兴趣可以到github查看源码。

bool RateLimiter::tryGetToken()
{
    supplyTokens();

    //获得一个令牌
    auto token = tokenLeft_.fetch_add(-1);
    if(token <= 0)
    {//已经没有令牌了,归还透支的令牌
        tokenLeft_.fetch_add(1);
        return false;
    }
    
    return true;
}

   上述方法用于尝试获取令牌,首先回去补充令牌桶,然后取一个令牌,如果桶中已没有令牌则返回失败。

void RateLimiter::mustGetToken()
{
    bool isGetToken = false;
    for(int i = 0; i < RETRY_IMMEDIATELY_TIMES; ++i)
    {
        isGetToken =  tryGetToken();
        if(isGetToken)
        {
            return;
        }
    }

    while(1)
    {
        isGetToken =  tryGetToken();
        if(isGetToken)
        {
            return;
        }
        else
        {
            //让出CPU
            sleep(0);
        }
    }
}

   上述方法用于必须获取令牌,不获得就不返回。它在tryGetToken的基础上增加了retry机制:首先自旋retry一定次数,还是没拿到令牌说明要么此时并发量很大,要么qps较小,需要较长时间才能补充令牌,所以这之后每次retry失败都会让出cpu,让cpu先去处理其它任务而不是占用CPU资源。所以这种retry机制是对CPU友好的,但是同时也不会导致并发量下降。

void RateLimiter::pass()
{
    return mustGetToken();
}
       该方法则是对外接口,目前是直接调用mustGetToken方法。

五、后续

   这个工具接下来可以延申的地方为增加超时时长,超过时间则返回,不再等待。即添加拒绝策略。
   最后,有问题都可以私信我,毕竟人无完人,本文有地方描述得不太准确希望能够告知。