From 7964e7810817425839b6f7ae30f9ec5d1b65b4b2 Mon Sep 17 00:00:00 2001 From: Verum Date: Mon, 23 Feb 2026 14:35:01 +0700 Subject: [PATCH] =?UTF-8?q?=D0=92=D1=81=D0=B5=20=D0=B4=D0=B5=D0=BA=D0=BE?= =?UTF-8?q?=D1=80=D0=B0=D1=82=D0=BE=D1=80=D1=8B=20=D1=84=D1=83=D0=BD=D0=BA?= =?UTF-8?q?=D1=86=D0=B8=D0=B9=20=D0=B1=D0=BE=D1=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bot/utils/decorators.py | 812 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 812 insertions(+) create mode 100644 bot/utils/decorators.py diff --git a/bot/utils/decorators.py b/bot/utils/decorators.py new file mode 100644 index 0000000..21b424f --- /dev/null +++ b/bot/utils/decorators.py @@ -0,0 +1,812 @@ +""" +Декораторы для обработчиков бота +""" +import asyncio +from typing import Callable, Optional, Union +from functools import wraps +from datetime import datetime +from collections import defaultdict + +from aiogram.types import Message, CallbackQuery +from aiogram.exceptions import TelegramBadRequest, TelegramForbiddenError +from aiogram.enums import ChatType, ChatMemberStatus + +from middleware.loggers import logger +from .format_time import format_duration + +__all__ = ( + 'admin_only', + 'owner_only', + 'private_only', + 'group_only', + 'rate_limit', + 'cooldown', + 'log_action', + 'catch_errors', + 'typing_action', + 'delete_command', + 'answer_on_error', + 'permission_required', + 'throttle', + 'admin_action' +) + + +# ================= ХРАНИЛИЩА ДЛЯ RATE LIMIT ================= + +class RateLimitStorage: + """Хранилище для rate limiting""" + + def __init__(self): + # {user_id: {action: datetime}} + self._storage: dict[int, dict[str, datetime]] = defaultdict(dict) + # {user_id: {action: count}} + self._counters: dict[int, dict[str, int]] = defaultdict(lambda: defaultdict(int)) + + def check( + self, + user_id: int, + action: str, + limit: int, + period: int + ) -> tuple[bool, Optional[int]]: + """ + Проверяет лимит. + + Returns: + tuple[bool, Optional[int]]: (можно ли выполнить, секунд до сброса) + """ + now = datetime.now() + + if action not in self._storage[user_id]: + # Первое использование + self._storage[user_id][action] = now + self._counters[user_id][action] = 1 + return True, None + + last_use = self._storage[user_id][action] + time_passed = (now - last_use).total_seconds() + + # Если прошел период - сбрасываем + if time_passed >= period: + self._storage[user_id][action] = now + self._counters[user_id][action] = 1 + return True, None + + # Проверяем счетчик + count = self._counters[user_id][action] + + if count >= limit: + # Превышен лимит + retry_after = int(period - time_passed) + return False, retry_after + + # Увеличиваем счетчик + self._counters[user_id][action] += 1 + return True, None + + def reset(self, user_id: int, action: Optional[str] = None): + """Сбрасывает лимит для пользователя""" + if action: + if user_id in self._storage: + self._storage[user_id].pop(action, None) + self._counters[user_id].pop(action, None) + else: + self._storage.pop(user_id, None) + self._counters.pop(user_id, None) + + def cleanup(self, max_age: int = 3600): + """Очищает старые записи""" + now = datetime.now() + expired_users = [] + + for user_id, actions in self._storage.items(): + expired_actions = [ + action for action, dt in actions.items() + if (now - dt).total_seconds() > max_age + ] + + for action in expired_actions: + actions.pop(action, None) + self._counters[user_id].pop(action, None) + + if not actions: + expired_users.append(user_id) + + for user_id in expired_users: + self._storage.pop(user_id, None) + self._counters.pop(user_id, None) + + +# Глобальное хранилище +_rate_limit_storage = RateLimitStorage() +_cooldown_storage = RateLimitStorage() + + +# ================= ПРОВЕРКА ПРАВ ================= + +async def _check_admin_rights( + message: Message, + user_id: Optional[int] = None +) -> bool: + """ + Проверяет, является ли пользователь администратором. + + Args: + message: Объект сообщения + user_id: ID пользователя (если None, проверяется отправитель) + + Returns: + bool: True если администратор + """ + # В личных сообщениях все пользователи "администраторы" + if message.chat.type == ChatType.PRIVATE: + return True + + check_user_id = user_id or message.from_user.id + + try: + member = await message.bot.get_chat_member( + chat_id=message.chat.id, + user_id=check_user_id + ) + + return member.status in { + ChatMemberStatus.CREATOR, + ChatMemberStatus.ADMINISTRATOR + } + + except (TelegramBadRequest, TelegramForbiddenError): + return False + + +async def _check_owner_rights(message: Message) -> bool: + """Проверяет, является ли пользователь владельцем чата""" + if message.chat.type == ChatType.PRIVATE: + return True + + try: + member = await message.bot.get_chat_member( + chat_id=message.chat.id, + user_id=message.from_user.id + ) + + return member.status == ChatMemberStatus.CREATOR + + except (TelegramBadRequest, TelegramForbiddenError): + return False + + +async def _check_bot_admin_rights(message: Message) -> bool: + """Проверяет, является ли бот администратором""" + if message.chat.type == ChatType.PRIVATE: + return True + + try: + bot_member = await message.bot.get_chat_member( + chat_id=message.chat.id, + user_id=message.bot.id + ) + + return bot_member.status in { + ChatMemberStatus.ADMINISTRATOR + } + + except (TelegramBadRequest, TelegramForbiddenError): + return False + + +# ================= ДЕКОРАТОРЫ ДЛЯ ПРАВ ================= + +def admin_only( + reply_text: str = "❌ Эта команда доступна только администраторам", + check_bot: bool = False +): + """ + Декоратор: выполнение только для администраторов. + + Args: + reply_text: Текст ответа если не админ + check_bot: Также проверять права бота + + Example: + ```python + @router.message(Command("ban")) + @admin_only() + async def ban_handler(message: Message): + await message.answer("Бан пользователя...") + ``` + """ + + def decorator(func: Callable) -> Callable: + @wraps(func) + async def wrapper(update: Union[Message, CallbackQuery], *args, **kwargs): + # Получаем message + message = update if isinstance(update, Message) else update.message + + if not message: + return None + + # Проверяем права пользователя + if not await _check_admin_rights(message): + if isinstance(update, CallbackQuery): + await update.answer(reply_text, show_alert=True) + else: + await message.answer(reply_text) + + logger.warning( + f"Попытка использования admin команды от @{message.from_user.id}", + log_type='SECURITY' + ) + return None + + # Проверяем права бота если нужно + if check_bot and not await _check_bot_admin_rights(message): + error_text = "❌ Бот не является администратором чата" + + if isinstance(update, CallbackQuery): + await update.answer(error_text, show_alert=True) + else: + await message.answer(error_text) + return None + + return await func(update, *args, **kwargs) + + return wrapper + + return decorator + + +def owner_only(reply_text: str = "❌ Эта команда доступна только владельцу чата"): + """ + Декоратор: выполнение только для владельца чата. + + Args: + reply_text: Текст ответа если не владелец + + Example: + ```python + @router.message(Command("destroy")) + @owner_only() + async def destroy_handler(message: Message): + await message.answer("Удаление чата...") + ``` + """ + + def decorator(func: Callable) -> Callable: + @wraps(func) + async def wrapper(update: Union[Message, CallbackQuery], *args, **kwargs): + message = update if isinstance(update, Message) else update.message + + if not message: + return None + + if not await _check_owner_rights(message): + if isinstance(update, CallbackQuery): + await update.answer(reply_text, show_alert=True) + else: + await message.answer(reply_text) + + logger.warning( + f"Попытка использования owner команды от @{message.from_user.id}", + log_type='SECURITY' + ) + return None + + return await func(update, *args, **kwargs) + + return wrapper + + return decorator + + +def permission_required(*permissions: str): + """ + Декоратор: проверка конкретных прав администратора. + + Args: + permissions: Список прав (can_delete_messages, can_restrict_members, и т.д.) + + Example: + ```python + @router.message(Command("pin")) + @permission_required("can_pin_messages") + async def pin_handler(message: Message): + await message.reply_to_message.pin() + ``` + """ + + def decorator(func: Callable) -> Callable: + @wraps(func) + async def wrapper(update: Union[Message, CallbackQuery], *args, **kwargs): + message = update if isinstance(update, Message) else update.message + + if not message: + return None + + # В личных сообщениях пропускаем проверку + if message.chat.type == ChatType.PRIVATE: + return await func(update, *args, **kwargs) + + try: + member = await message.bot.get_chat_member( + chat_id=message.chat.id, + user_id=message.from_user.id + ) + + # Владелец имеет все права + if member.status == ChatMemberStatus.CREATOR: + return await func(update, *args, **kwargs) + + # Проверяем права + if member.status == ChatMemberStatus.ADMINISTRATOR: + missing_permissions = [] + + for perm in permissions: + if not getattr(member, perm, False): + missing_permissions.append(perm) + + if missing_permissions: + error_text = ( + f"❌ Недостаточно прав\n" + f"Требуются: {', '.join(missing_permissions)}" + ) + + if isinstance(update, CallbackQuery): + await update.answer(error_text, show_alert=True) + else: + await message.answer(error_text) + return None + + return await func(update, *args, **kwargs) + + # Не администратор + error_text = "❌ Эта команда доступна только администраторам" + + if isinstance(update, CallbackQuery): + await update.answer(error_text, show_alert=True) + else: + await message.answer(error_text) + + except (TelegramBadRequest, TelegramForbiddenError): + pass + + return wrapper + + return decorator + + +# ================= ДЕКОРАТОРЫ ДЛЯ ТИПОВ ЧАТОВ ================= + +def private_only(reply_text: str = "❌ Эта команда работает только в личных сообщениях"): + """ + Декоратор: выполнение только в личных сообщениях. + + Example: + ```python + @router.message(Command("start")) + @private_only() + async def start_handler(message: Message): + await message.answer("Приветствие...") + ``` + """ + + def decorator(func: Callable) -> Callable: + @wraps(func) + async def wrapper(update: Union[Message, CallbackQuery], *args, **kwargs): + message = update if isinstance(update, Message) else update.message + + if not message: + return None + + if message.chat.type != ChatType.PRIVATE: + if isinstance(update, CallbackQuery): + await update.answer(reply_text, show_alert=True) + else: + await message.answer(reply_text) + return None + + return await func(update, *args, **kwargs) + + return wrapper + + return decorator + + +def group_only(reply_text: str = "❌ Эта команда работает только в группах"): + """ + Декоратор: выполнение только в группах. + + Example: + ```python + @router.message(Command("ban")) + @group_only() + async def ban_handler(message: Message): + await message.answer("Бан пользователя...") + ``` + """ + + def decorator(func: Callable) -> Callable: + @wraps(func) + async def wrapper(update: Union[Message, CallbackQuery], *args, **kwargs): + message = update if isinstance(update, Message) else update.message + + if not message: + return None + + if message.chat.type not in {ChatType.GROUP, ChatType.SUPERGROUP}: + if isinstance(update, CallbackQuery): + await update.answer(reply_text, show_alert=True) + else: + await message.answer(reply_text) + return None + + return await func(update, *args, **kwargs) + + return wrapper + + return decorator + + +# ================= RATE LIMITING ================= + +def rate_limit(limit: int = 1, period: int = 60, action: Optional[str] = None): + """ + Декоратор: ограничение частоты вызовов. + + Args: + limit: Количество вызовов + period: Период в секундах + action: Название действия (по умолчанию имя функции) + + Example: + ```python + @router.message(Command("search")) + @rate_limit(limit=3, period=60) # 3 раза в минуту + async def search_handler(message: Message): + await message.answer("Поиск...") + ``` + """ + + def decorator(func: Callable) -> Callable: + action_name = action or func.__name__ + + @wraps(func) + async def wrapper(update: Union[Message, CallbackQuery], *args, **kwargs): + message = update if isinstance(update, Message) else update.message + + if not message: + return None + + user_id = message.from_user.id + + # Проверяем лимит + allowed, retry_after = _rate_limit_storage.check( + user_id, action_name, limit, period + ) + + if not allowed: + retry_time = format_duration(retry_after) + error_text = f"⏳ Слишком часто! Повторите через {retry_time}" + + if isinstance(update, CallbackQuery): + await update.answer(error_text, show_alert=True) + else: + await message.answer(error_text) + + logger.debug( + f"Rate limit для пользователя {user_id}: {action_name}", + log_type='RATE_LIMIT' + ) + return None + + return await func(update, *args, **kwargs) + + return wrapper + + return decorator + + +def cooldown(seconds: int, action: Optional[str] = None): + """ + Декоратор: кулдаун между вызовами (1 раз в N секунд). + + Args: + seconds: Кулдаун в секундах + action: Название действия + + Example: + ```python + @router.message(Command("daily")) + @cooldown(seconds=86400) # Раз в день + async def daily_handler(message: Message): + await message.answer("Ежедневная награда!") + ``` + """ + return rate_limit(limit=1, period=seconds, action=action) + + +def throttle(rate: float = 1.0): + """ + Декоратор: throttling (antiflood). + + Args: + rate: Минимальный интервал в секундах между вызовами + + Example: + ```python + @router.message() + @throttle(rate=0.5) # Не чаще 2 раз в секунду + async def echo_handler(message: Message): + await message.answer(message.text) + ``` + """ + return cooldown(seconds=int(rate), action='throttle') + + +# ================= ЛОГИРОВАНИЕ ================= + +def log_action( + action_name: Optional[str] = None, + log_args: bool = False +): + """ + Декоратор: логирование действий. + + Args: + action_name: Название действия (по умолчанию имя функции) + log_args: Логировать аргументы + + Example: + ```python + @router.message(Command("ban")) + @log_action("BAN_USER", log_args=True) + async def ban_handler(message: Message): + await message.answer("Бан...") + ``` + """ + + def decorator(func: Callable) -> Callable: + name = action_name or func.__name__ + + @wraps(func) + async def wrapper(update: Union[Message, CallbackQuery], *args, **kwargs): + message = update if isinstance(update, Message) else update.message + + if not message: + return await func(update, *args, **kwargs) + + user_id = message.from_user.id + username = message.from_user.username or f"id{user_id}" + + # Логируем начало + log_msg = f"Действие '{name}' от @{username}" + + if log_args and message.text: + log_msg += f" | Аргументы: {message.text}" + + logger.info(log_msg, log_type='ACTION') + + try: + result = await func(update, *args, **kwargs) + logger.info(f"Действие '{name}' выполнено успешно", log_type='ACTION') + return result + + except Exception as e: + logger.error(f"Ошибка в действии '{name}': {e}", log_type='ACTION') + raise + + return wrapper + + return decorator + + +# ================= ОБРАБОТКА ОШИБОК ================= + +def catch_errors( + notify_user: bool = True, + error_message: str = "❌ Произошла ошибка при выполнении команды" +): + """ + Декоратор: перехват ошибок. + + Args: + notify_user: Уведомлять пользователя об ошибке + error_message: Текст уведомления + + Example: + ```python + @router.message(Command("risky")) + @catch_errors(notify_user=True) + async def risky_handler(message: Message): + # Код который может вызвать ошибку + ... + ``` + """ + + def decorator(func: Callable) -> Callable: + @wraps(func) + async def wrapper(update: Union[Message, CallbackQuery], *args, **kwargs): + try: + return await func(update, *args, **kwargs) + + except Exception as e: + logger.error( + f"Ошибка в {func.__name__}: {e}", + log_type='ERROR' + ) + + if notify_user: + message = update if isinstance(update, Message) else update.message + + if message: + try: + if isinstance(update, CallbackQuery): + await update.answer(error_message, show_alert=True) + else: + await message.answer(error_message) + except: + pass + + return wrapper + + return decorator + + +def answer_on_error(error_message: str = "❌ Ошибка"): + """ + Декоратор: ответ пользователю при ошибке. + + Alias для catch_errors с уведомлением. + """ + return catch_errors(notify_user=True, error_message=error_message) + + +# ================= ДЕЙСТВИЯ ================= + +def typing_action(): + """ + Декоратор: показывает "печатает..." во время выполнения. + + Example: + ```python + @router.message(Command("search")) + @typing_action() + async def search_handler(message: Message): + # Долгий поиск... + await asyncio.sleep(3) + await message.answer("Результаты поиска") + ``` + """ + + def decorator(func: Callable) -> Callable: + @wraps(func) + async def wrapper(update: Union[Message, CallbackQuery], *args, **kwargs): + message = update if isinstance(update, Message) else update.message + + if not message: + return await func(update, *args, **kwargs) + + # Отправляем действие "печатает" + async def send_typing(): + try: + while True: + await message.bot.send_chat_action( + chat_id=message.chat.id, + action="typing" + ) + await asyncio.sleep(4) # Обновляем каждые 4 секунды + except asyncio.CancelledError: + pass + + # Создаем задачу + typing_task = asyncio.create_task(send_typing()) + + try: + result = await func(update, *args, **kwargs) + return result + finally: + typing_task.cancel() + try: + await typing_task + except asyncio.CancelledError: + pass + + return wrapper + + return decorator + + +def delete_command(delay: Optional[int] = None): + """ + Декоратор: удаляет команду после выполнения. + + Args: + delay: Задержка перед удалением (секунды) + + Example: + ```python + @router.message(Command("clean")) + @delete_command(delay=0) + async def clean_handler(message: Message): + await message.answer("Очистка...") + ``` + """ + + def decorator(func: Callable) -> Callable: + @wraps(func) + async def wrapper(message: Message, *args, **kwargs): + if not isinstance(message, Message): + return await func(message, *args, **kwargs) + + # Выполняем функцию + result = await func(message, *args, **kwargs) + + # Удаляем команду + try: + if delay: + await asyncio.sleep(delay) + + await message.delete() + except (TelegramBadRequest, TelegramForbiddenError): + pass + + return result + + return wrapper + + return decorator + + +# ================= КОМБИНИРОВАННЫЕ ДЕКОРАТОРЫ ================= + +def admin_action( + log: bool = True, + check_bot: bool = True, + delete_cmd: bool = False +): + """ + Комбинированный декоратор для admin команд. + + Args: + log: Логировать действие + check_bot: Проверять права бота + delete_cmd: Удалять команду + + Example: + ```python + @router.message(Command("ban")) + @admin_action(log=True, check_bot=True) + async def ban_handler(message: Message): + await message.answer("Бан...") + ``` + """ + + def decorator(func: Callable) -> Callable: + # Применяем декораторы + decorated = func + + if log: + decorated = log_action(f"ADMIN_{func.__name__.upper()}")(decorated) + + decorated = admin_only(check_bot=check_bot)(decorated) + + if delete_cmd: + decorated = delete_command()(decorated) + + return decorated + + return decorator + + +# ================= ОЧИСТКА ХРАНИЛИЩ ================= + +def cleanup_storage(max_age: int = 3600): + """ + Очищает хранилища rate limit от старых записей. + + Args: + max_age: Максимальный возраст записи в секундах + """ + _rate_limit_storage.cleanup(max_age) + _cooldown_storage.cleanup(max_age)