Files
PrimoExampleBot/bot/middlewares/spam_mdw.py
2025-08-30 07:39:44 +07:00

97 lines
3.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from typing import Callable, Awaitable, Any, Dict
from aiogram import BaseMiddleware
from aiogram.types import TelegramObject, Message, CallbackQuery
import time
from collections import defaultdict
from middleware.loggers import loggers # ваш логгер
class RateLimitMiddleware(BaseMiddleware):
"""
Middleware для ограничения частоты запросов от пользователей (анти-спам).
Зачем нужен:
- Защита от DDoS и флуда
- Предотвращение злоупотребления ботом
- Контроль нагрузки на сервер
"""
def __init__(self, rate_limit: int = 10, time_period: float = 2.0):
"""
Инициализация rate limit middleware.
Args:
rate_limit: Максимальное количество запросов за период
time_period: Период времени в секундах
"""
self.rate_limit = rate_limit
self.time_period = time_period
self.user_calls: Dict[int, list[float]] = defaultdict(list)
super().__init__()
async def __call__(
self,
handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]],
event: TelegramObject,
data: Dict[str, Any],
log: bool = False,
) -> Any:
"""
Проверяет rate limit перед обработкой запроса.
"""
# Пропускаем не-сообщения и не-колбэки
if not isinstance(event, (Message, CallbackQuery)):
return await handler(event, data)
user_id: int = event.from_user.id
user_str: str = f"@{event.from_user.username}" if event.from_user.username else f"id{user_id}"
current_time: float = time.time()
# Очищаем старые запросы
self.user_calls[user_id] = [
call_time for call_time in self.user_calls[user_id]
if current_time - call_time < self.time_period
]
# Логируем текущее состояние rate limit
if log:
loggers.debug(
text=f"Rate limit: {len(self.user_calls[user_id])}/{self.rate_limit} за {self.time_period}сек",
log_type="RATE_LIMIT_STATUS",
user=user_str
)
# Проверяем текущий лимит
if len(self.user_calls[user_id]) >= self.rate_limit:
# Логируем попытку спама
if log:
loggers.warning(
text=f"Превышен rate limit ({self.rate_limit}/{self.time_period}сек)",
log_type="RATE_LIMIT_EXCEEDED",
user=user_str
)
# Отправляем сообщение о превышении лимита
if isinstance(event, Message):
await event.answer(
text="⏳ Слишком много запросов! Пожалуйста, подождите немного.",
)
elif isinstance(event, CallbackQuery):
await event.answer(
text="⏳ Подождите немного перед следующим действием.",
show_alert=True
)
return None
# Добавляем текущий запрос и продолжаем обработку
self.user_calls[user_id].append(current_time)
loggers.debug(
text=f"Запрос добавлен в rate limit",
log_type="RATE_LIMIT_ADDED",
user=user_str
)
return await handler(event, data)