贝利信息

asyncio.Semaphore 如何与限流装饰器结合使用

日期:2026-01-23 00:00 / 作者:舞姬之光
限流装饰器不能直接套 asyncio.Semaphore,因为其 acquire() 是协程需 await,而同步装饰器无法等待;正确做法是用异步装饰器封装 async with semaphore: 逻辑,确保复用同一信号量实例并自动释放。

限流装饰器为什么不能直接套 asyncio.Semaphore

因为 asyncio.Semaphoreacquire() 是协程函数,必须用 await 调用;而普通装饰器在定义时是同步执行的,无法 await 一个协程对象。直接写 @semaphore.acquire() 会报 RuntimeWarning: coroutine 'Semaphore.acquire' was never awaited,甚至导致死锁。

正确做法:用异步装饰器 + async with 包裹

核心是把信号量控制逻辑封装进一个真正的异步装饰器里,并确保每次调用都走 async with semaphore: 流程。示例如下:

import asyncio
from functools import wraps

def rate_limit(limit: int): semaphore = asyncio.Semaphore(limit) def decorator(func): @wraps(func) async def wrapper(*args, *kwargs): async with semaphore: return await func(args, **kwargs) return wrapper return decorator

@rate_limit(3) async def fetch_data(url: str): print(f"GET {url}") await asyncio.sleep(1) # 模拟请求 return f"done: {url}"

这个模式的关键点:

常见踩坑

场景与修复

实际用的时候容易掉进这几个坑:

进阶:按用户/路径维度做差异化限流

如果需要对不同 API 路径、不同用户 ID 或不同目标域名分别限流,就不能只用一个全局 semaphore。推荐用字典缓存 + 键隔离:

from collections import defaultdict
import asyncio

_semaphores = defaultdict(lambda: asyncio.Semaphore(3))

def per_domain_rate_limit(domain: str): semaphore = _semaphores[domain] def decorator(func): @wraps(func) async def wrapper(*args, *kwargs): async with semaphore: return await func(args, **kwargs) return wrapper return decorator

@per_domain_rate_limit("httpbin.org") async def fetch_httpbin(): ...

注意:_semaphores 字典本身不需要加锁 —— asyncio.Semaphore 是线程/协程安全的,但字典读写在高并发下可能有竞态,生产环境建议用 asyncio.Lock 包一层或改用 weakref.WeakValueDictionary 防内存泄漏。

真正难的不是写对语法,而是想清楚「谁和谁共用一个信号量」—— 同一资源池里的所有协程,必须共享同一个 semaphore 实例,且生命周期要覆盖整个应用运行期。