diff --git a/bot/filters/admin.py b/bot/filters/admin.py new file mode 100644 index 0000000..d5cfe00 --- /dev/null +++ b/bot/filters/admin.py @@ -0,0 +1,109 @@ +""" +Фильтры для проверки прав администратора +""" +from typing import Union + +from aiogram.filters import BaseFilter +from aiogram.types import Message, CallbackQuery + +from configs import settings +from database import get_manager +from middleware.loggers import logger + +__all__ = ('IsSuperAdmin', 'IsAdmin', 'IsOwner') + + +class IsSuperAdmin(BaseFilter): + """ + Проверяет, является ли пользователь суперадминистратором (из .env). + + Суперадмины имеют полный доступ ко всем командам бота. + + Example: + ```python + @router.message(Command("addadmin"), IsSuperAdmin()) + async def add_admin_command(message: Message): + await message.answer("Добавление админа...") + ``` + """ + + async def __call__(self, event: Union[Message, CallbackQuery]) -> bool: + user_id = event.from_user.id + is_super_admin = user_id in settings.OWNER_ID + + if not is_super_admin: + logger.warning( + f"Попытка доступа к команде суперадмина от user_id={user_id}", + log_type='SECURITY', + message=event if isinstance(event, Message) else None + ) + + return is_super_admin + + +class IsAdmin(BaseFilter): + """ + Проверяет, является ли пользователь администратором (суперадмин или доп. админ). + + Администраторы могут управлять банвордами, но не могут добавлять других админов. + Список дополнительных админов загружается из БД через BanWordsManager. + + Example: + ```python + @router.message(Command("addword"), IsAdmin()) + async def add_word_command(message: Message): + await message.answer("Добавление банворда...") + ``` + """ + + async def __call__(self, event: Union[Message, CallbackQuery]) -> bool: + user_id = event.from_user.id + + # Проверка суперадмина + if user_id in settings.OWNER_ID: + return True + + # Проверка доп. админа из БД (через кэш) + manager = get_manager() + is_db_admin = manager.is_admin_cached(user_id) + + if not is_db_admin: + logger.warning( + f"Попытка доступа к админ-команде от user_id={user_id}", + log_type='SECURITY', + message=event if isinstance(event, Message) else None + ) + + return is_db_admin + + +class IsOwner(BaseFilter): + """ + Проверяет, является ли пользователь первым владельцем бота (OWNER_ID[0]). + + Используется для критических операций (например, полная очистка данных). + + Example: + ```python + @router.message(Command("reset_all"), IsOwner()) + async def reset_command(message: Message): + await message.answer("⚠️ Сброс всех данных...") + ``` + """ + + async def __call__(self, event: Union[Message, CallbackQuery]) -> bool: + user_id = event.from_user.id + + # Берём первого суперадмина как владельца + owner_id = settings.OWNER_ID[0] if settings.OWNER_ID else None + + is_owner = user_id == owner_id + + if not is_owner: + logger.warning( + f"Попытка доступа к команде владельца от user_id={user_id}", + log_type='SECURITY', + message=event if isinstance(event, Message) else None + ) + + return is_owner