相关原理 为什么要限流?
在开发高并发系统时,有三把利器用来保护系统:缓存、降级和限流。 其中,限流在很多场景中用来限制并发和请求量,比如说秒杀抢购,保护自身系统和下游系统不被巨型流量冲垮等。
常见的限流方法包括计数器、滑动窗口、漏桶和令牌桶算法。
计数器算法: 在一段时间间隔内(时间窗/时间区间,处理请求的最大数量固定,超过部分不做处理。 简单粗暴,比如指定线程池大小,指定数据库连接池大小、nginx连接数等,这都属于计数器算法。
计数器算法是限流算法里最简单也是最容易实现的一种算法。 举个例子,比如我们规定对于接口A,我们1分钟的访问次数不能超过100个。
计数器限流的做法是: 我们可以初始化一个计数器counter
,每当收到请求时counter
加1。若counter
值超过100且当前请求与首个请求的时间间隔小于1分钟,则判定为请求过多并拒绝访问;若时间间隔超过1分钟且counter
仍在限流范围内,则重置counter
。
但是这个方法存在一个显著的问题,攻击者可以在0:59的时候一次性发100个请求;到1:00就会将计数器清零,然后在1:01的时候再发100个请求;对于0:00-1:00和1:00-2:00这两个一分钟的请求都是100个请求,看起来是满足系统的要求的;但是在0:59-1:01这不足1分钟的时间段内,却发起了200个请求,可能会引起系统奔溃。
滑动窗口算法:
滑动窗口(rolling window)
是一种时间分段技术。在计数器算法中,如果限制1分钟内的访问次数,这个1分钟就是一个固定时间窗口。而滑动窗口则是将这个固定窗口进一步细分成多个更小的时间单元。
例如:将1分钟的固定窗口划分为6个10秒的小窗口。整个红色矩形框代表一个大时间窗口,窗口会持续滑动,每10秒向右移动一格。假设在第一分钟的第59秒收到100个请求(落在灰色格子),第二分钟的1:00又收到100个请求(落在橘黄色格子)。此时滑动窗口检测到完整1分钟(红色框)内的总请求量达到200次,超过100次的限流阈值,就能及时触发限流机制。 滑动窗口划分得越精细,限流统计的准确性就越高,但过于精细会增加系统负担。
漏桶算法:
水(请求)先进入到漏桶里,漏桶以一定的速度出水,当水流入速度过大会超过桶可接纳的容量时直接溢出。 漏桶算法可以粗略的认为就是注水漏水过程,往桶中以任意速率流入水,以一定速率流出水,当水超过桶容量(capacity)则丢弃,因为桶容量是不变的,保证了整体的速率。 总结:漏桶算法通过一个固定容量和固定漏水速率的水桶模型,强制将任意输入流量整形为恒定速率输出,并在流量超过容量时丢弃请求,以此实现速率限制、流量平滑和系统保护。
令牌桶算法:
令牌桶是一个存放固定容量令牌的桶,按照固定速率往桶里添加令牌,填满了就丢弃令牌,请求是否被处理要看桶中令牌是否足够,当令牌数减为零时则拒绝新的请求。在流量低峰的时候,令牌桶会出现堆积,因此当出现瞬时高峰的时候,有足够多的令牌可以获取,令牌桶允许一定程度突发流量,只要有令牌就可以处理,支持一次拿多个令牌。令牌桶中装的是令牌。
特点 :与漏桶算法相比,令牌桶算法允许短时间内的请求量激增(获得令牌后即可访问接口,可能出现瞬间消耗所有累积令牌的情况),但不会像计数算法那样产生过高峰值(因为令牌是匀速生成的)。因此,令牌桶算法在处理突发流量时表现更优。 部分内容和图片来源:常见限流算法:计数器、滑动窗口、漏桶、令牌桶 限流:计数器、漏桶、令牌桶 三大算法的原理与实战(史上最全)
限流实现 本项目我是基于令牌桶实现的访问限流,请看以下代码LimitMiddleware.h
头文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 class LimitMiddleware : public Middleware { public : LimitMiddleware (int rate, int capacity); void before (HttpRequest& request) override ; void after (HttpResponse& response) override {}; double gettokens () const {return tokens_;} private : void refillTokens () ; private : int rate_; int capacity_; double tokens_; std::chrono::steady_clock::time_point lastRefillTime_; std::mutex mutex_; };
LimitMiddleware.cc
文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 using namespace std::chrono;LimitMiddleware::LimitMiddleware (int rate, int capacity) : rate_ (rate), capacity_ (capacity), tokens_ (capacity), lastRefillTime_ (steady_clock::now ()) { } void LimitMiddleware::refillTokens () { auto now = steady_clock::now (); auto elapsedMs = duration_cast <milliseconds>(now - lastRefillTime_).count (); if (elapsedMs > 0 ) { double newTokens = (elapsedMs / 1000.0 ) * rate_; tokens_ = std::min ((double )capacity_, tokens_ + newTokens); lastRefillTime_ = now; } } void LimitMiddleware::before (HttpRequest& request) { std::lock_guard<std::mutex> lock (mutex_) ; refillTokens (); if (tokens_ >= 1.0 ) { tokens_ -= 1.0 ; } else { HttpResponse resp; resp.setStatusLine (request.getVersion (), http::HttpResponse::k429TooManyRequests, "Too Many Requests" ); resp.setCloseConnection (true ); resp.setContentType ("application/json" ); resp.setContentLength (0 ); resp.setBody ("Rate limit exceeded. Please try again later." ); throw resp; } }
核心逻辑是:
按固定速率补充令牌(refillTokens
)
请求到来时消费令牌(before
)
没有令牌可用时返回(限流了,返回状态码429 Too Many Requests
)
限流测试 在WebApps/LiteHubServer/src/LiteHubServer.cpp
中的initializeMiddleware
函数中,定义了限流中间件
1 2 limitMiddleware_ = std::make_shared <http::middleware::LimitMiddleware>(1 ,100 ); httpServer_.addMiddleware (limitMiddleware_);
这里定义的是一秒不超过100个请求,如果通过手动点击,这个1秒内怎么也到不了100次请求;所以我通过python脚本代码模拟一次大量的访问,python代码如下,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 import requestsfrom concurrent.futures import ThreadPoolExecutor, as_completedimport timefrom collections import CounterTOTAL_REQUESTS = 150 MAX_WORKERS = 3 REQUEST_INTERVAL = 0.02 TARGET_URL = "http://47.122.77.97/" def send_request (index ): try : start_time = time.time() r = requests.get(TARGET_URL, timeout=3 ) elapsed = time.time() - start_time print (f"[{time.strftime('%H:%M:%S' )} ] 请求 {index + 1 :02d} --> 状态码: {r.status_code} (耗时: {elapsed:.2 f} s)" ) return r.status_code except Exception as e: print (f"[{time.strftime('%H:%M:%S' )} ] 请求 {index + 1 :02d} --> 失败: {str (e)} " ) return str (e) def main (): print (f"开始压测,总请求数:{TOTAL_REQUESTS} ,最大并发数:{MAX_WORKERS} " ) results = [] total_start_time = time.time() with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: futures = [] for i in range (TOTAL_REQUESTS): futures.append(executor.submit(send_request, i)) time.sleep(REQUEST_INTERVAL) for future in as_completed(futures): results.append(future.result()) total_elapsed = time.time() - total_start_time print ("\n-----------------------------" ) print ("请求状态统计结果:" ) counts = Counter(results) for key, count in counts.items(): print (f"{key} : {count} 次" ) print (f"\n压测总耗时: {total_elapsed:.2 f} 秒" ) print ("-----------------------------" ) if __name__ == "__main__" : main()
执行python代码的结果如下:
可用看到总共有104次请求成功;46次拒绝访问。证明了设计的令牌桶限流策略有效。 我们将REQUEST_INTERVAL 调整的大一些,避免瞬间的大量请求
再执行一次: 拒绝访问的次数也少了一些,从(46—>>>34,即减少了12次)。
下图为wireshark抓包到的429状态码响应: