当前位置:首页 > 技术分析 > 正文内容

Python异步装饰器实战:手把手教你实现高性能请求限流器

ruisui8822小时前技术分析1

在现代Web应用和API服务中,请求限流是保护系统稳定性和防止资源滥用的重要机制。随着Python异步编程的普及,如何优雅地实现异步环境下的请求限流成为开发者关注的焦点。异步装饰器提供了一种简洁而强大的解决方案,能够在不阻塞其他协程的情况下实现精确的流量控制。

基础概念

异步装饰器是Python异步编程中的重要工具,它能够为异步函数添加额外的功能而不改变原函数的核心逻辑。与传统的同步装饰器不同,异步装饰器需要处理协程对象,并正确管理异步上下文。在请求限流场景中,异步装饰器可以在函数执行前检查当前请求频率,决定是否允许执行或需要等待。

请求限流器的核心思想是控制单位时间内允许执行的操作数量。常见的限流算法包括令牌桶算法、漏桶算法和滑动窗口算法。在异步环境中,我们需要考虑并发安全性和协程调度的特殊性,确保限流器能够准确控制请求频率而不会因为协程切换导致计数错误。

核心实现机制

1、基础异步限流装饰器

下面展示一个基于时间窗口的简单异步限流装饰器实现。该装饰器使用滑动窗口算法,记录每个时间窗口内的请求次数,当超过限制时会让协程等待直到可以执行。装饰器内部使用asyncio.Lock确保并发安全,防止多个协程同时修改计数器造成数据竞争。

import asyncio
import time
from functools import wraps
from collections import defaultdict, deque


class AsyncRateLimiter:
    def __init__(self, max_calls: int, time_window: float):
        self.max_calls = max_calls
        self.time_window = time_window
        self.calls = defaultdict(deque)
        self.lock = asyncio.Lock()

    def __call__(self, func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            async with self.lock:
                current_time = time.time()
                func_key = f"{func.__module__}.{func.__name__}"

                while (self.calls[func_key] and
                       current_time - self.calls[func_key][0] > self.time_window):
                    self.calls[func_key].popleft()

                if len(self.calls[func_key]) >= self.max_calls:
                    sleep_time = (self.calls[func_key][0] + self.time_window - current_time)
                    if sleep_time > 0:
                        await asyncio.sleep(sleep_time)
                        return await wrapper(*args, **kwargs)

                self.calls[func_key].append(current_time)

            return await func(*args, **kwargs)

        return wrapper


# Usage example
rate_limiter = AsyncRateLimiter(max_calls=5, time_window=60.0)


@rate_limiter
async def api_request(url: str):
    print(f"Making request to {url} at {time.strftime('%H:%M:%S')}")
    await asyncio.sleep(0.1)
    return f"Response from {url}"

2、令牌桶算法实现

令牌桶算法是一种更加灵活的限流策略,它允许短时间内的突发请求,同时保证长期的平均速率不超过限制。下面的实现创建了一个异步令牌桶限流器,定期向桶中添加令牌,每次请求消耗一个令牌。当桶中没有令牌时,请求会等待直到有新的令牌生成。

import asyncio
import time
from functools import wraps


class AsyncTokenBucket:
    def __init__(self, capacity: int, refill_rate: float):
        self.capacity = capacity
        self.tokens = capacity
        self.refill_rate = refill_rate
        self.last_refill = time.time()
        self.lock = asyncio.Lock()

    async def acquire(self, tokens: int = 1) -> bool:
        async with self.lock:
            current_time = time.time()
            time_passed = current_time - self.last_refill

            new_tokens = time_passed * self.refill_rate
            self.tokens = min(self.capacity, self.tokens + new_tokens)
            self.last_refill = current_time

            if self.tokens >= tokens:
                self.tokens -= tokens
                return True

            wait_time = (tokens - self.tokens) / self.refill_rate
            await asyncio.sleep(wait_time)
            return await self.acquire(tokens)

    def __call__(self, tokens: int = 1):
        def decorator(func):
            @wraps(func)
            async def wrapper(*args, **kwargs):
                await self.acquire(tokens)
                return await func(*args, **kwargs)

            return wrapper

        return decorator


token_bucket = AsyncTokenBucket(capacity=10, refill_rate=2.0)


@token_bucket(tokens=1)
async def heavy_computation():
    print(f"执行重计算任务 at {time.strftime('%H:%M:%S')}")
    await asyncio.sleep(0.5)
    return "计算完成"

高级功能扩展

1、支持用户级别限流

在实际应用中,我们经常需要为不同用户设置独立的限流策略。下面的实现支持基于用户ID的分组限流,每个用户都有独立的令牌桶,可以设置不同的限流参数。装饰器会自动从函数参数或关键字参数中提取用户标识,为每个用户维护独立的限流状态。

import asyncio
import time
import inspect
from functools import wraps
from collections import defaultdict, deque


class UserBasedRateLimiter:
    def __init__(self, max_calls: int, time_window: float, user_key: str = 'user_id'):
        self.max_calls = max_calls
        self.time_window = time_window
        self.user_key = user_key
        self.user_calls = defaultdict(deque)
        self.locks = defaultdict(asyncio.Lock)

    def __call__(self, func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            user_id = kwargs.get(self.user_key)
            if not user_id and args:
                sig = inspect.signature(func)
                params = list(sig.parameters.keys())
                if self.user_key in params:
                    param_index = params.index(self.user_key)
                    if param_index < len(args):
                        user_id = args[param_index]

            if not user_id:
                raise ValueError(f"无法找到用户标识 {self.user_key}")

            async with self.locks[user_id]:
                current_time = time.time()
                user_calls = self.user_calls[user_id]

                while user_calls and current_time - user_calls[0] > self.time_window:
                    user_calls.popleft()

                if len(user_calls) >= self.max_calls:
                    sleep_time = user_calls[0] + self.time_window - current_time
                    if sleep_time > 0:
                        await asyncio.sleep(sleep_time)
                        return await wrapper(*args, **kwargs)

                user_calls.append(current_time)

            return await func(*args, **kwargs)

        return wrapper


user_limiter = UserBasedRateLimiter(max_calls=3, time_window=10.0)


@user_limiter
async def send_email(user_id: str, message: str):
    print(f"为用户 {user_id} 发送邮件: {message}")
    await asyncio.sleep(0.2)
    return f"邮件已发送给 {user_id}"

2、分级限流与异常处理

在复杂的业务场景中,我们需要实现更加精细的限流控制,包括不同优先级的请求处理和完善的异常处理机制。下面的实现提供了多级限流功能,支持普通用户和VIP用户的差异化限流策略,同时包含详细的异常处理和监控功能。

from enum import Enum
import logging
import asyncio
import time
from functools import wraps
from collections import defaultdict, deque


class UserTier(Enum):
    NORMAL = "normal"
    VIP = "vip"
    PREMIUM = "premium"


class RateLimitExceeded(Exception):
    def __init__(self, retry_after: float):
        self.retry_after = retry_after
        super().__init__(f"请求频率超限,请在 {retry_after:.2f} 秒后重试")


class TieredRateLimiter:
    def __init__(self):
        self.tier_configs = {
            UserTier.NORMAL: {'max_calls': 10, 'time_window': 60},
            UserTier.VIP: {'max_calls': 50, 'time_window': 60},
            UserTier.PREMIUM: {'max_calls': 200, 'time_window': 60}
        }
        self.user_calls = defaultdict(deque)
        self.locks = defaultdict(asyncio.Lock)
        self.logger = logging.getLogger(__name__)

    def get_user_tier(self, user_id: str) -> UserTier:
        if user_id.startswith('vip_'):
            return UserTier.VIP
        elif user_id.startswith('premium_'):
            return UserTier.PREMIUM
        return UserTier.NORMAL

    def __call__(self, raise_on_limit: bool = False):
        def decorator(func):
            @wraps(func)
            async def wrapper(*args, **kwargs):
                user_id = kwargs.get('user_id')
                if not user_id:
                    raise ValueError("缺少用户标识参数")

                user_tier = self.get_user_tier(user_id)
                config = self.tier_configs[user_tier]

                async with self.locks[user_id]:
                    current_time = time.time()
                    user_calls = self.user_calls[user_id]

                    while user_calls and current_time - user_calls[0] > config['time_window']:
                        user_calls.popleft()

                    if len(user_calls) >= config['max_calls']:
                        retry_after = user_calls[0] + config['time_window'] - current_time
                        self.logger.warning(f"用户 {user_id}({user_tier.value}) 请求频率超限")

                        if raise_on_limit:
                            raise RateLimitExceeded(retry_after)

                        if retry_after > 0:
                            await asyncio.sleep(retry_after)
                            return await wrapper(*args, **kwargs)

                    user_calls.append(current_time)
                    self.logger.info(f"用户 {user_id}({user_tier.value}) 请求通过,当前窗口内请求数: {len(user_calls)}")

                return await func(*args, **kwargs)

            return wrapper

        return decorator


tiered_limiter = TieredRateLimiter()


@tiered_limiter(raise_on_limit=True)
async def query_database(user_id: str, query: str):
    print(f"执行数据库查询 - 用户: {user_id}, 查询: {query}")
    await asyncio.sleep(0.1)
    return f"查询结果: {query}"

3、基于权重的动态限流

为了更好地适应不同类型请求的资源消耗差异,我们可以实现基于权重的动态限流机制。该实现允许为不同的操作分配不同的权重值,重要或资源密集型操作消耗更多的限流配额。同时支持动态调整限流参数,可以根据系统负载情况实时调整限流策略。

import asyncio
import time
from functools import wraps
from collections import defaultdict, deque


class WeightedRateLimiter:
    def __init__(self, max_weight: int = 100, time_window: float = 60.0):
        self.max_weight = max_weight
        self.time_window = time_window
        self.weight_records = defaultdict(deque)
        self.locks = defaultdict(asyncio.Lock)
        self.dynamic_multiplier = 1.0
        self.last_adjustment = time.time()

    async def adjust_limits(self, load_factor: float):
        """根据系统负载动态调整限流参数"""
        if load_factor > 0.8:
            self.dynamic_multiplier = 0.5  # 高负载时减少限流配额
        elif load_factor < 0.3:
            self.dynamic_multiplier = 1.5  # 低负载时增加限流配额
        else:
            self.dynamic_multiplier = 1.0

        self.last_adjustment = time.time()

    def __call__(self, weight: int = 1):
        def decorator(func):
            @wraps(func)
            async def wrapper(*args, **kwargs):
                func_key = f"{func.__module__}.{func.__name__}"
                effective_max = int(self.max_weight * self.dynamic_multiplier)

                async with self.locks[func_key]:
                    current_time = time.time()
                    records = self.weight_records[func_key]

                    current_weight = 0
                    while records and current_time - records[0][0] > self.time_window:
                        records.popleft()

                    for record_time, record_weight in records:
                        current_weight += record_weight

                    if current_weight + weight > effective_max:
                        oldest_time = records[0][0] if records else current_time
                        wait_time = oldest_time + self.time_window - current_time

                        if wait_time > 0:
                            await asyncio.sleep(wait_time)
                            return await wrapper(*args, **kwargs)

                    records.append((current_time, weight))

                return await func(*args, **kwargs)

            return wrapper

        return decorator


weighted_limiter = WeightedRateLimiter(max_weight=100, time_window=30.0)


@weighted_limiter(weight=10)
async def heavy_operation():
    print("执行重型操作")
    await asyncio.sleep(1.0)
    return "重型操作完成"


@weighted_limiter(weight=1)
async def light_operation():
    print("执行轻量操作")
    await asyncio.sleep(0.1)
    return "轻量操作完成"

实际应用场景

异步请求限流器在Web API开发中具有广泛的应用价值。在FastAPI或其他异步Web框架中,我们可以将限流装饰器应用于路由处理函数,有效防止API滥用。例如,对于用户注册接口,我们可以限制每个IP地址每小时只能注册3次,防止恶意批量注册。对于数据查询接口,可以限制每个用户每分钟最多查询10次,保护数据库资源。

在微服务架构中,异步限流器还可以用于控制服务间的调用频率。当一个服务需要调用下游服务时,可以使用限流装饰器确保不会因为请求过于频繁而压垮下游服务。结合熔断器模式,可以构建更加健壮的分布式系统。

import asyncio
import time
from functools import wraps
from collections import defaultdict, deque


class AsyncRateLimiter:
    def __init__(self, max_calls: int, time_window: float):
        self.max_calls = max_calls
        self.time_window = time_window
        self.calls = defaultdict(deque)
        self.lock = asyncio.Lock()

    def __call__(self, func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            async with self.lock:
                current_time = time.time()
                func_key = f"{func.__module__}.{func.__name__}"

                # Clean expired records
                while (self.calls[func_key] and
                       current_time - self.calls[func_key][0] > self.time_window):
                    self.calls[func_key].popleft()

                # Check rate limit
                if len(self.calls[func_key]) >= self.max_calls:
                    sleep_time = (self.calls[func_key][0] + self.time_window - current_time)
                    if sleep_time > 0:
                        await asyncio.sleep(sleep_time)
                        return await wrapper(*args, **kwargs)

                # Record this call
                self.calls[func_key].append(current_time)

            return await func(*args, **kwargs)

        return wrapper


class UserBasedRateLimiter:
    def __init__(self, max_calls: int, time_window: float, user_key: str = 'user_id'):
        self.max_calls = max_calls
        self.time_window = time_window
        self.user_key = user_key
        self.user_calls = defaultdict(deque)
        self.locks = defaultdict(asyncio.Lock)

    def __call__(self, func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            # Get user ID
            user_id = kwargs.get(self.user_key)
            if not user_id and args:
                import inspect
                sig = inspect.signature(func)
                params = list(sig.parameters.keys())
                if self.user_key in params:
                    param_index = params.index(self.user_key)
                    if param_index < len(args):
                        user_id = args[param_index]

            if not user_id:
                raise ValueError(f"Missing user identifier: {self.user_key}")

            async with self.locks[user_id]:
                current_time = time.time()
                user_calls = self.user_calls[user_id]

                # Clean expired records
                while user_calls and current_time - user_calls[0] > self.time_window:
                    user_calls.popleft()

                # Check rate limit
                if len(user_calls) >= self.max_calls:
                    sleep_time = user_calls[0] + self.time_window - current_time
                    if sleep_time > 0:
                        await asyncio.sleep(sleep_time)
                        return await wrapper(*args, **kwargs)

                user_calls.append(current_time)

            return await func(*args, **kwargs)

        return wrapper


# Initialize rate limiters
rate_limiter = AsyncRateLimiter(max_calls=5, time_window=1.0)
user_limiter = UserBasedRateLimiter(max_calls=3, time_window=2.0)


@rate_limiter
async def api_request(url: str):
    print(f"Making request to {url} at {time.strftime('%H:%M:%S')}")
    await asyncio.sleep(0.1)
    return f"Response from {url}"


@user_limiter
async def send_email(user_id: str, message: str):
    print(f"Sending email to {user_id}: {message} at {time.strftime('%H:%M:%S')}")
    await asyncio.sleep(0.2)
    return f"Email sent to {user_id}"


# Test function
async def test_rate_limiter():
    # Test basic rate limiting
    print("Testing basic rate limiter (5 requests/second)")
    tasks = []
    for i in range(10):
        task = api_request(f"https://api.example.com/data/{i}")
        tasks.append(task)

    start_time = time.time()
    results = await asyncio.gather(*tasks)
    end_time = time.time()

    print(f"Completed 10 requests in {end_time - start_time:.2f} seconds")
    print("-" * 50)

    # Test user-based rate limiting
    print("Testing user-based rate limiter (3 requests/2 seconds per user)")
    user_tasks = []
    for user in ['user1', 'user2']:
        for i in range(5):
            task = send_email(user_id=user, message=f"Message {i}")
            user_tasks.append(task)

    start_time = time.time()
    await asyncio.gather(*user_tasks)
    end_time = time.time()

    print(f"Completed all user emails in {end_time - start_time:.2f} seconds")


if __name__ == "__main__":
    asyncio.run(test_rate_limiter())

总结

本文分享了Python异步装饰器在实现请求限流器方面的核心技术与实践应用。阐述了异步装饰器的基础概念及其在流量控制中的重要作用,介绍了基于时间窗口的基础限流实现和令牌桶算法的异步版本。在高级功能扩展部分,提供了用户级别限流、分级限流与异常处理、基于权重的动态限流等三种实用方案,每种方案都包含完整的代码实现和应用示例。还分析了异步限流器在Web API开发和微服务架构中的具体应用场景,并从性能优化角度提出了内存管理、数据结构选择等最佳实践建议。

扫描二维码推送至手机访问。

版权声明:本文由ruisui88发布,如需转载请注明出处。

本文链接:http://www.ruisui88.com/post/4690.html

标签: kwargs.get
分享给朋友:

“Python异步装饰器实战:手把手教你实现高性能请求限流器” 的相关文章

Vue组件通信之props深入详解!

props 是 Vue 组件中一个很重要的概念。它是用来从父组件向子组件传递数据的。为什么需要props?这是因为在Vue中,组件是相互隔离的。每个组件都有自己的作用域,子组件无法直接访问父组件的状态或值。通过props,父组件可以将数据传递给子组件。使用props的步骤:1. 在子组件中定义pro...

10分钟搞定gitlab-ci自动化部署

gitlab-ci 是持续集成工具/自动化部署工具,类似 jenkins。持续集成 是将代码集成到共享存储库并尽可能早地自动构建/测试每个更改的实践 - 通常一天几次。概述在编码完成时都会进行打包发布过程,如果每次都手动操作这一步骤就会浪费时间,效率低下。所以就有了持续集成。准备事项请提前安装以下软...

K8s里我的容器到底用了多少内存?

作者:frostchen导语 Linux下开发者习惯在物理机或者虚拟机环境下使用top和free等命令查看机器和进程的内存使用量,近年来越来越多的应用服务完成了微服务容器化改造,过去查看、监控和定位内存使用量的方法似乎时常不太奏效。如果你的应用程序刚刚迁移到K8s中,经常被诸如以下问题所困扰:容器的...

Solid State Logic 发布低保真数字失真插件 Digicrush

Solid State Logic 宣布推出低保真数字失真插件 Digicrush ,他们最新的创意工具具有经典数字失真的粗糙、低保真特性,完美模拟早期数字音频的衰减和伪影。Digicrush 充满怀旧气息,深受经典数字采样器和效果器的影响,具有内置抖动、可调比特深度和采样率降低功能,是为音轨添加复...

你感动了吗?佳能超规格镜头 RF 24-105mm F2.8深度测评

如果要你选一支用作多题材创作的挂机镜头,那我相信很多人会选择24-105mm这个焦段的镜头。作为一支可以实现从广角到长焦的变焦镜头,24-105mm有着丰富的焦段选择。只是基于镜头体积以及光学结构上的限制,此前的24-105mm镜头只能恒定在F4的光圈。而佳能打破了这一限制,将实用焦段和恒定光圈完美...

JS数组过滤元素的方法

引言JavaScript 作为前端开发的核心技术之一,在现代 Web 开发中扮演着举足轻重的角色。随着 Web 应用越来越复杂,高效处理数据集合的需求日益凸显。本文旨在介绍 JavaScript 中数组过滤的基础知识及其在实际项目中的应用技巧。技术概述定义数组过滤是 JavaScript 提供的一种...