diff --git a/bot/filters/chat_rights.py b/bot/filters/chat_rights.py new file mode 100644 index 0000000..f98b03a --- /dev/null +++ b/bot/filters/chat_rights.py @@ -0,0 +1,324 @@ +""" +Фильтры для проверки прав пользователей в чатах +""" +from typing import Any, Union + +from aiogram import Bot +from aiogram.exceptions import TelegramBadRequest, TelegramForbiddenError +from aiogram.filters import BaseFilter +from aiogram.types import Message, CallbackQuery +from aiogram.enums import ChatMemberStatus + +from configs import settings +from middleware.loggers import logger + +__all__ = ( + 'IsBotOwner', + 'IsChatCreator', + 'IsChatAdmin', + 'IsModerator', + 'CanDeleteMessages', + 'CanRestrictMembers', + 'CanPinMessages' +) + + +class IsBotOwner(BaseFilter): + """ + Проверяет, является ли пользователь владельцем бота (из .env). + + Attributes: + send_error_message: Отправлять ли сообщение об ошибке доступа + + Example: + ```python + # Без сообщения об ошибке + @router.message(Command("reset"), IsOwner()) + async def reset_command(message: Message): + await message.answer("🔄 Сброс данных...") + + # С сообщением об ошибке + @router.message(Command("secret"), IsOwner(send_error_message=True)) + async def secret_command(message: Message): + await message.answer("🔐 Секретная команда выполнена") + ``` + """ + + def __init__(self, send_error_message: bool = False) -> None: + """ + Args: + send_error_message: Если True, отправляет сообщение при отказе в доступе + """ + self.send_error_message = send_error_message + + async def __call__( + self, + event: Union[Message, CallbackQuery], + bot: Bot + ) -> Union[bool, dict[str, Any]]: + """ + Проверка владельца бота. + + Returns: + bool или dict: True/dict если владелец, False иначе + """ + if not event.from_user: + return False + + user_id = event.from_user.id + is_owner = user_id in settings.OWNER_ID + + if not is_owner: + logger.warning( + f"Попытка доступа к команде владельца от user_id={user_id}", + log_type='SECURITY', + message=event if isinstance(event, Message) else None + ) + + if self.send_error_message: + error_text = "⛔ Эта команда доступна только владельцу бота!" + + if isinstance(event, Message): + await event.answer(error_text) + elif isinstance(event, CallbackQuery): + await event.answer(error_text, show_alert=True) + + return False + + # Возвращаем информацию для handler + return { + 'is_owner': True, + 'user_id': user_id, + 'owner_ids': settings.OWNER_ID + } + + +class IsChatCreator(BaseFilter): + """ + Проверяет, является ли пользователь создателем чата. + + Example: + ```python + @router.message(Command("transfer"), IsChatCreator()) + async def transfer_ownership(message: Message): + await message.answer("👑 Передача владения чатом...") + ``` + """ + + async def __call__(self, message: Message, bot: Bot) -> Union[bool, dict]: + try: + member = await bot.get_chat_member( + chat_id=message.chat.id, + user_id=message.from_user.id + ) + + is_creator = member.status == ChatMemberStatus.CREATOR + + if is_creator: + return { + 'is_creator': True, + 'user_id': message.from_user.id, + 'chat_id': message.chat.id + } + + return False + + except (TelegramBadRequest, TelegramForbiddenError) as e: + logger.error( + f"Ошибка проверки создателя чата: {e}", + log_type='CHAT_RIGHTS', + message=message + ) + return False + + +class IsChatAdmin(BaseFilter): + """ + Проверяет, является ли пользователь администратором чата (или создателем). + + Example: + ```python + @router.message(Command("ban"), IsChatAdmin()) + async def ban_user(message: Message): + await message.answer("🔨 Бан пользователя...") + ``` + """ + + async def __call__(self, message: Message, bot: Bot) -> Union[bool, dict]: + try: + member = await bot.get_chat_member( + chat_id=message.chat.id, + user_id=message.from_user.id + ) + + is_admin = member.status in ( + ChatMemberStatus.ADMINISTRATOR, + ChatMemberStatus.CREATOR + ) + + if is_admin: + return { + 'is_admin': True, + 'status': member.status.value, + 'user_id': message.from_user.id, + 'chat_id': message.chat.id + } + + return False + + except (TelegramBadRequest, TelegramForbiddenError) as e: + logger.error( + f"Ошибка проверки администратора чата: {e}", + log_type='CHAT_RIGHTS', + message=message + ) + return False + + +class IsModerator(BaseFilter): + """ + Проверяет, имеет ли администратор модераторские права: + - Удаление сообщений + - Ограничение пользователей + - Закрепление сообщений + + Example: + ```python + @router.message(Command("warn"), IsModerator()) + async def warn_user(message: Message): + await message.answer("⚠️ Предупреждение пользователю...") + ``` + """ + + async def __call__(self, message: Message, bot: Bot) -> Union[bool, dict]: + try: + member = await bot.get_chat_member( + chat_id=message.chat.id, + user_id=message.from_user.id + ) + + # Создатель всегда модератор + if member.status == ChatMemberStatus.CREATOR: + return { + 'is_moderator': True, + 'status': 'creator', + 'user_id': message.from_user.id + } + + # Проверка прав администратора + if member.status != ChatMemberStatus.ADMINISTRATOR: + return False + + # Проверка модераторских прав + required_rights = [ + getattr(member, 'can_delete_messages', False), + getattr(member, 'can_restrict_members', False), + getattr(member, 'can_pin_messages', False), + ] + + has_all_rights = all(required_rights) + + if has_all_rights: + return { + 'is_moderator': True, + 'status': 'administrator', + 'can_delete': required_rights[0], + 'can_restrict': required_rights[1], + 'can_pin': required_rights[2], + 'user_id': message.from_user.id + } + + return False + + except (TelegramBadRequest, TelegramForbiddenError) as e: + logger.error( + f"Ошибка проверки модератора: {e}", + log_type='CHAT_RIGHTS', + message=message + ) + return False + + +class CanDeleteMessages(BaseFilter): + """ + Проверяет право на удаление сообщений. + + Example: + ```python + @router.message(Command("clear"), CanDeleteMessages()) + async def clear_messages(message: Message): + await message.answer("🗑️ Очистка сообщений...") + ``` + """ + + async def __call__(self, message: Message, bot: Bot) -> bool: + try: + member = await bot.get_chat_member( + chat_id=message.chat.id, + user_id=message.from_user.id + ) + + if member.status == ChatMemberStatus.CREATOR: + return True + + return getattr(member, 'can_delete_messages', False) + + except (TelegramBadRequest, TelegramForbiddenError): + return False + + +class CanRestrictMembers(BaseFilter): + """ + Проверяет право на ограничение пользователей (бан, мут). + + Example: + ```python + @router.message(Command("mute"), CanRestrictMembers()) + async def mute_user(message: Message): + await message.answer("🔇 Мут пользователя...") + ``` + """ + + async def __call__(self, message: Message, bot: Bot) -> bool: + try: + member = await bot.get_chat_member( + chat_id=message.chat.id, + user_id=message.from_user.id + ) + + if member.status == ChatMemberStatus.CREATOR: + return True + + return getattr(member, 'can_restrict_members', False) + + except (TelegramBadRequest, TelegramForbiddenError): + return False + + +class CanPinMessages(BaseFilter): + """ + Проверяет право на закрепление сообщений. + + Example: + ```python + @router.message(Command("pin"), CanPinMessages()) + async def pin_message(message: Message): + if message.reply_to_message: + await message.reply_to_message.pin() + ``` + """ + + async def __call__(self, message: Message, bot: Bot) -> bool: + try: + member = await bot.get_chat_member( + chat_id=message.chat.id, + user_id=message.from_user.id + ) + + if member.status == ChatMemberStatus.CREATOR: + return True + + return getattr(member, 'can_pin_messages', False) + + except (TelegramBadRequest, TelegramForbiddenError): + return False