Files
PrimoGuardBot-/bot/utils/decorators.py
2026-02-17 11:24:55 +07:00

813 lines
25 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Декораторы для обработчиков бота
"""
import asyncio
from typing import Callable, Optional, Union
from functools import wraps
from datetime import datetime
from collections import defaultdict
from aiogram.types import Message, CallbackQuery
from aiogram.exceptions import TelegramBadRequest, TelegramForbiddenError
from aiogram.enums import ChatType, ChatMemberStatus
from middleware.loggers import logger
from .format_time import format_duration
__all__ = (
'admin_only',
'owner_only',
'private_only',
'group_only',
'rate_limit',
'cooldown',
'log_action',
'catch_errors',
'typing_action',
'delete_command',
'answer_on_error',
'permission_required',
'throttle',
'admin_action'
)
# ================= ХРАНИЛИЩА ДЛЯ RATE LIMIT =================
class RateLimitStorage:
"""Хранилище для rate limiting"""
def __init__(self):
# {user_id: {action: datetime}}
self._storage: dict[int, dict[str, datetime]] = defaultdict(dict)
# {user_id: {action: count}}
self._counters: dict[int, dict[str, int]] = defaultdict(lambda: defaultdict(int))
def check(
self,
user_id: int,
action: str,
limit: int,
period: int
) -> tuple[bool, Optional[int]]:
"""
Проверяет лимит.
Returns:
tuple[bool, Optional[int]]: (можно ли выполнить, секунд до сброса)
"""
now = datetime.now()
if action not in self._storage[user_id]:
# Первое использование
self._storage[user_id][action] = now
self._counters[user_id][action] = 1
return True, None
last_use = self._storage[user_id][action]
time_passed = (now - last_use).total_seconds()
# Если прошел период - сбрасываем
if time_passed >= period:
self._storage[user_id][action] = now
self._counters[user_id][action] = 1
return True, None
# Проверяем счетчик
count = self._counters[user_id][action]
if count >= limit:
# Превышен лимит
retry_after = int(period - time_passed)
return False, retry_after
# Увеличиваем счетчик
self._counters[user_id][action] += 1
return True, None
def reset(self, user_id: int, action: Optional[str] = None):
"""Сбрасывает лимит для пользователя"""
if action:
if user_id in self._storage:
self._storage[user_id].pop(action, None)
self._counters[user_id].pop(action, None)
else:
self._storage.pop(user_id, None)
self._counters.pop(user_id, None)
def cleanup(self, max_age: int = 3600):
"""Очищает старые записи"""
now = datetime.now()
expired_users = []
for user_id, actions in self._storage.items():
expired_actions = [
action for action, dt in actions.items()
if (now - dt).total_seconds() > max_age
]
for action in expired_actions:
actions.pop(action, None)
self._counters[user_id].pop(action, None)
if not actions:
expired_users.append(user_id)
for user_id in expired_users:
self._storage.pop(user_id, None)
self._counters.pop(user_id, None)
# Глобальное хранилище
_rate_limit_storage = RateLimitStorage()
_cooldown_storage = RateLimitStorage()
# ================= ПРОВЕРКА ПРАВ =================
async def _check_admin_rights(
message: Message,
user_id: Optional[int] = None
) -> bool:
"""
Проверяет, является ли пользователь администратором.
Args:
message: Объект сообщения
user_id: ID пользователя (если None, проверяется отправитель)
Returns:
bool: True если администратор
"""
# В личных сообщениях все пользователи "администраторы"
if message.chat.type == ChatType.PRIVATE:
return True
check_user_id = user_id or message.from_user.id
try:
member = await message.bot.get_chat_member(
chat_id=message.chat.id,
user_id=check_user_id
)
return member.status in {
ChatMemberStatus.CREATOR,
ChatMemberStatus.ADMINISTRATOR
}
except (TelegramBadRequest, TelegramForbiddenError):
return False
async def _check_owner_rights(message: Message) -> bool:
"""Проверяет, является ли пользователь владельцем чата"""
if message.chat.type == ChatType.PRIVATE:
return True
try:
member = await message.bot.get_chat_member(
chat_id=message.chat.id,
user_id=message.from_user.id
)
return member.status == ChatMemberStatus.CREATOR
except (TelegramBadRequest, TelegramForbiddenError):
return False
async def _check_bot_admin_rights(message: Message) -> bool:
"""Проверяет, является ли бот администратором"""
if message.chat.type == ChatType.PRIVATE:
return True
try:
bot_member = await message.bot.get_chat_member(
chat_id=message.chat.id,
user_id=message.bot.id
)
return bot_member.status in {
ChatMemberStatus.ADMINISTRATOR
}
except (TelegramBadRequest, TelegramForbiddenError):
return False
# ================= ДЕКОРАТОРЫ ДЛЯ ПРАВ =================
def admin_only(
reply_text: str = "❌ Эта команда доступна только администраторам",
check_bot: bool = False
):
"""
Декоратор: выполнение только для администраторов.
Args:
reply_text: Текст ответа если не админ
check_bot: Также проверять права бота
Example:
```python
@router.message(Command("ban"))
@admin_only()
async def ban_handler(message: Message):
await message.answer("Бан пользователя...")
```
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(update: Union[Message, CallbackQuery], *args, **kwargs):
# Получаем message
message = update if isinstance(update, Message) else update.message
if not message:
return None
# Проверяем права пользователя
if not await _check_admin_rights(message):
if isinstance(update, CallbackQuery):
await update.answer(reply_text, show_alert=True)
else:
await message.answer(reply_text)
logger.warning(
f"Попытка использования admin команды от @{message.from_user.id}",
log_type='SECURITY'
)
return None
# Проверяем права бота если нужно
if check_bot and not await _check_bot_admin_rights(message):
error_text = "❌ Бот не является администратором чата"
if isinstance(update, CallbackQuery):
await update.answer(error_text, show_alert=True)
else:
await message.answer(error_text)
return None
return await func(update, *args, **kwargs)
return wrapper
return decorator
def owner_only(reply_text: str = "❌ Эта команда доступна только владельцу чата"):
"""
Декоратор: выполнение только для владельца чата.
Args:
reply_text: Текст ответа если не владелец
Example:
```python
@router.message(Command("destroy"))
@owner_only()
async def destroy_handler(message: Message):
await message.answer("Удаление чата...")
```
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(update: Union[Message, CallbackQuery], *args, **kwargs):
message = update if isinstance(update, Message) else update.message
if not message:
return None
if not await _check_owner_rights(message):
if isinstance(update, CallbackQuery):
await update.answer(reply_text, show_alert=True)
else:
await message.answer(reply_text)
logger.warning(
f"Попытка использования owner команды от @{message.from_user.id}",
log_type='SECURITY'
)
return None
return await func(update, *args, **kwargs)
return wrapper
return decorator
def permission_required(*permissions: str):
"""
Декоратор: проверка конкретных прав администратора.
Args:
permissions: Список прав (can_delete_messages, can_restrict_members, и т.д.)
Example:
```python
@router.message(Command("pin"))
@permission_required("can_pin_messages")
async def pin_handler(message: Message):
await message.reply_to_message.pin()
```
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(update: Union[Message, CallbackQuery], *args, **kwargs):
message = update if isinstance(update, Message) else update.message
if not message:
return None
# В личных сообщениях пропускаем проверку
if message.chat.type == ChatType.PRIVATE:
return await func(update, *args, **kwargs)
try:
member = await message.bot.get_chat_member(
chat_id=message.chat.id,
user_id=message.from_user.id
)
# Владелец имеет все права
if member.status == ChatMemberStatus.CREATOR:
return await func(update, *args, **kwargs)
# Проверяем права
if member.status == ChatMemberStatus.ADMINISTRATOR:
missing_permissions = []
for perm in permissions:
if not getattr(member, perm, False):
missing_permissions.append(perm)
if missing_permissions:
error_text = (
f"❌ Недостаточно прав\n"
f"Требуются: {', '.join(missing_permissions)}"
)
if isinstance(update, CallbackQuery):
await update.answer(error_text, show_alert=True)
else:
await message.answer(error_text)
return None
return await func(update, *args, **kwargs)
# Не администратор
error_text = "❌ Эта команда доступна только администраторам"
if isinstance(update, CallbackQuery):
await update.answer(error_text, show_alert=True)
else:
await message.answer(error_text)
except (TelegramBadRequest, TelegramForbiddenError):
pass
return wrapper
return decorator
# ================= ДЕКОРАТОРЫ ДЛЯ ТИПОВ ЧАТОВ =================
def private_only(reply_text: str = "❌ Эта команда работает только в личных сообщениях"):
"""
Декоратор: выполнение только в личных сообщениях.
Example:
```python
@router.message(Command("start"))
@private_only()
async def start_handler(message: Message):
await message.answer("Приветствие...")
```
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(update: Union[Message, CallbackQuery], *args, **kwargs):
message = update if isinstance(update, Message) else update.message
if not message:
return None
if message.chat.type != ChatType.PRIVATE:
if isinstance(update, CallbackQuery):
await update.answer(reply_text, show_alert=True)
else:
await message.answer(reply_text)
return None
return await func(update, *args, **kwargs)
return wrapper
return decorator
def group_only(reply_text: str = "❌ Эта команда работает только в группах"):
"""
Декоратор: выполнение только в группах.
Example:
```python
@router.message(Command("ban"))
@group_only()
async def ban_handler(message: Message):
await message.answer("Бан пользователя...")
```
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(update: Union[Message, CallbackQuery], *args, **kwargs):
message = update if isinstance(update, Message) else update.message
if not message:
return None
if message.chat.type not in {ChatType.GROUP, ChatType.SUPERGROUP}:
if isinstance(update, CallbackQuery):
await update.answer(reply_text, show_alert=True)
else:
await message.answer(reply_text)
return None
return await func(update, *args, **kwargs)
return wrapper
return decorator
# ================= RATE LIMITING =================
def rate_limit(limit: int = 1, period: int = 60, action: Optional[str] = None):
"""
Декоратор: ограничение частоты вызовов.
Args:
limit: Количество вызовов
period: Период в секундах
action: Название действия (по умолчанию имя функции)
Example:
```python
@router.message(Command("search"))
@rate_limit(limit=3, period=60) # 3 раза в минуту
async def search_handler(message: Message):
await message.answer("Поиск...")
```
"""
def decorator(func: Callable) -> Callable:
action_name = action or func.__name__
@wraps(func)
async def wrapper(update: Union[Message, CallbackQuery], *args, **kwargs):
message = update if isinstance(update, Message) else update.message
if not message:
return None
user_id = message.from_user.id
# Проверяем лимит
allowed, retry_after = _rate_limit_storage.check(
user_id, action_name, limit, period
)
if not allowed:
retry_time = format_duration(retry_after)
error_text = f"⏳ Слишком часто! Повторите через {retry_time}"
if isinstance(update, CallbackQuery):
await update.answer(error_text, show_alert=True)
else:
await message.answer(error_text)
logger.debug(
f"Rate limit для пользователя {user_id}: {action_name}",
log_type='RATE_LIMIT'
)
return None
return await func(update, *args, **kwargs)
return wrapper
return decorator
def cooldown(seconds: int, action: Optional[str] = None):
"""
Декоратор: кулдаун между вызовами (1 раз в N секунд).
Args:
seconds: Кулдаун в секундах
action: Название действия
Example:
```python
@router.message(Command("daily"))
@cooldown(seconds=86400) # Раз в день
async def daily_handler(message: Message):
await message.answer("Ежедневная награда!")
```
"""
return rate_limit(limit=1, period=seconds, action=action)
def throttle(rate: float = 1.0):
"""
Декоратор: throttling (antiflood).
Args:
rate: Минимальный интервал в секундах между вызовами
Example:
```python
@router.message()
@throttle(rate=0.5) # Не чаще 2 раз в секунду
async def echo_handler(message: Message):
await message.answer(message.text)
```
"""
return cooldown(seconds=int(rate), action='throttle')
# ================= ЛОГИРОВАНИЕ =================
def log_action(
action_name: Optional[str] = None,
log_args: bool = False
):
"""
Декоратор: логирование действий.
Args:
action_name: Название действия (по умолчанию имя функции)
log_args: Логировать аргументы
Example:
```python
@router.message(Command("ban"))
@log_action("BAN_USER", log_args=True)
async def ban_handler(message: Message):
await message.answer("Бан...")
```
"""
def decorator(func: Callable) -> Callable:
name = action_name or func.__name__
@wraps(func)
async def wrapper(update: Union[Message, CallbackQuery], *args, **kwargs):
message = update if isinstance(update, Message) else update.message
if not message:
return await func(update, *args, **kwargs)
user_id = message.from_user.id
username = message.from_user.username or f"id{user_id}"
# Логируем начало
log_msg = f"Действие '{name}' от @{username}"
if log_args and message.text:
log_msg += f" | Аргументы: {message.text}"
logger.info(log_msg, log_type='ACTION')
try:
result = await func(update, *args, **kwargs)
logger.info(f"Действие '{name}' выполнено успешно", log_type='ACTION')
return result
except Exception as e:
logger.error(f"Ошибка в действии '{name}': {e}", log_type='ACTION')
raise
return wrapper
return decorator
# ================= ОБРАБОТКА ОШИБОК =================
def catch_errors(
notify_user: bool = True,
error_message: str = "❌ Произошла ошибка при выполнении команды"
):
"""
Декоратор: перехват ошибок.
Args:
notify_user: Уведомлять пользователя об ошибке
error_message: Текст уведомления
Example:
```python
@router.message(Command("risky"))
@catch_errors(notify_user=True)
async def risky_handler(message: Message):
# Код который может вызвать ошибку
...
```
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(update: Union[Message, CallbackQuery], *args, **kwargs):
try:
return await func(update, *args, **kwargs)
except Exception as e:
logger.error(
f"Ошибка в {func.__name__}: {e}",
log_type='ERROR'
)
if notify_user:
message = update if isinstance(update, Message) else update.message
if message:
try:
if isinstance(update, CallbackQuery):
await update.answer(error_message, show_alert=True)
else:
await message.answer(error_message)
except:
pass
return wrapper
return decorator
def answer_on_error(error_message: str = "❌ Ошибка"):
"""
Декоратор: ответ пользователю при ошибке.
Alias для catch_errors с уведомлением.
"""
return catch_errors(notify_user=True, error_message=error_message)
# ================= ДЕЙСТВИЯ =================
def typing_action():
"""
Декоратор: показывает "печатает..." во время выполнения.
Example:
```python
@router.message(Command("search"))
@typing_action()
async def search_handler(message: Message):
# Долгий поиск...
await asyncio.sleep(3)
await message.answer("Результаты поиска")
```
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(update: Union[Message, CallbackQuery], *args, **kwargs):
message = update if isinstance(update, Message) else update.message
if not message:
return await func(update, *args, **kwargs)
# Отправляем действие "печатает"
async def send_typing():
try:
while True:
await message.bot.send_chat_action(
chat_id=message.chat.id,
action="typing"
)
await asyncio.sleep(4) # Обновляем каждые 4 секунды
except asyncio.CancelledError:
pass
# Создаем задачу
typing_task = asyncio.create_task(send_typing())
try:
result = await func(update, *args, **kwargs)
return result
finally:
typing_task.cancel()
try:
await typing_task
except asyncio.CancelledError:
pass
return wrapper
return decorator
def delete_command(delay: Optional[int] = None):
"""
Декоратор: удаляет команду после выполнения.
Args:
delay: Задержка перед удалением (секунды)
Example:
```python
@router.message(Command("clean"))
@delete_command(delay=0)
async def clean_handler(message: Message):
await message.answer("Очистка...")
```
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(message: Message, *args, **kwargs):
if not isinstance(message, Message):
return await func(message, *args, **kwargs)
# Выполняем функцию
result = await func(message, *args, **kwargs)
# Удаляем команду
try:
if delay:
await asyncio.sleep(delay)
await message.delete()
except (TelegramBadRequest, TelegramForbiddenError):
pass
return result
return wrapper
return decorator
# ================= КОМБИНИРОВАННЫЕ ДЕКОРАТОРЫ =================
def admin_action(
log: bool = True,
check_bot: bool = True,
delete_cmd: bool = False
):
"""
Комбинированный декоратор для admin команд.
Args:
log: Логировать действие
check_bot: Проверять права бота
delete_cmd: Удалять команду
Example:
```python
@router.message(Command("ban"))
@admin_action(log=True, check_bot=True)
async def ban_handler(message: Message):
await message.answer("Бан...")
```
"""
def decorator(func: Callable) -> Callable:
# Применяем декораторы
decorated = func
if log:
decorated = log_action(f"ADMIN_{func.__name__.upper()}")(decorated)
decorated = admin_only(check_bot=check_bot)(decorated)
if delete_cmd:
decorated = delete_command()(decorated)
return decorated
return decorator
# ================= ОЧИСТКА ХРАНИЛИЩ =================
def cleanup_storage(max_age: int = 3600):
"""
Очищает хранилища rate limit от старых записей.
Args:
max_age: Максимальный возраст записи в секундах
"""
_rate_limit_storage.cleanup(max_age)
_cooldown_storage.cleanup(max_age)