""" Фильтры для проверки прав администратора """ from typing import Union from aiogram.filters import BaseFilter from aiogram.types import Message, CallbackQuery from configs import settings from database import get_manager from middleware.loggers import logger __all__ = ('IsSuperAdmin', 'IsAdmin', 'IsOwner') class IsSuperAdmin(BaseFilter): """ Проверяет, является ли пользователь суперадминистратором (из .env). Суперадмины имеют полный доступ ко всем командам бота. Example: ```python @router.message(Command("addadmin"), IsSuperAdmin()) async def add_admin_command(message: Message): await message.answer("Добавление админа...") ``` """ async def __call__(self, event: Union[Message, CallbackQuery]) -> bool: user_id = event.from_user.id is_super_admin = user_id in settings.OWNER_ID if not is_super_admin: logger.warning( f"Попытка доступа к команде суперадмина от user_id={user_id}", log_type='SECURITY', message=event if isinstance(event, Message) else None ) return is_super_admin class IsAdmin(BaseFilter): """ Проверяет, является ли пользователь администратором (суперадмин или доп. админ). Администраторы могут управлять банвордами, но не могут добавлять других админов. Список дополнительных админов загружается из БД через BanWordsManager. Example: ```python @router.message(Command("addword"), IsAdmin()) async def add_word_command(message: Message): await message.answer("Добавление банворда...") ``` """ async def __call__(self, event: Union[Message, CallbackQuery]) -> bool: user_id = event.from_user.id # Проверка суперадмина if user_id in settings.OWNER_ID: return True # Проверка доп. админа из БД (через кэш) manager = get_manager() is_db_admin = manager.is_admin_cached(user_id) if not is_db_admin: logger.warning( f"Попытка доступа к админ-команде от user_id={user_id}", log_type='SECURITY', message=event if isinstance(event, Message) else None ) return is_db_admin class IsOwner(BaseFilter): """ Проверяет, является ли пользователь первым владельцем бота (OWNER_ID[0]). Используется для критических операций (например, полная очистка данных). Example: ```python @router.message(Command("reset_all"), IsOwner()) async def reset_command(message: Message): await message.answer("⚠️ Сброс всех данных...") ``` """ async def __call__(self, event: Union[Message, CallbackQuery]) -> bool: user_id = event.from_user.id # Берём первого суперадмина как владельца owner_id = settings.OWNER_ID[0] if settings.OWNER_ID else None is_owner = user_id == owner_id if not is_owner: logger.warning( f"Попытка доступа к команде владельца от user_id={user_id}", log_type='SECURITY', message=event if isinstance(event, Message) else None ) return is_owner