diff --git a/bot/filters/spam.py b/bot/filters/spam.py new file mode 100644 index 0000000..9722ab8 --- /dev/null +++ b/bot/filters/spam.py @@ -0,0 +1,111 @@ +""" +Фильтры для проверки сообщений на спам и банворды +""" +from typing import Optional, Callable + +from aiogram.filters import BaseFilter +from aiogram.types import Message + +from middleware.loggers import logger + +__all__ = ('HasSpam', 'IsWhitelisted') + + +class HasSpam(BaseFilter): + """ + Проверяет, содержит ли сообщение запрещенные слова (спам). + + Attributes: + check_spam_func: Функция проверки спама (передается при инициализации) + + Example: + ```python + from utils.spam_checker import check_spam + + @router.message(HasSpam(check_spam)) + async def spam_detected(message: Message): + await message.delete() + await message.answer("⚠️ Сообщение содержит запрещенные слова") + ``` + """ + + def __init__(self, check_spam_func: Callable[[str], bool]): + """ + Args: + check_spam_func: Функция для проверки спама + """ + self.check_spam = check_spam_func + + async def __call__(self, message: Message) -> Optional[dict]: + """ + Проверка сообщения на спам. + + Returns: + dict или None: Информация о найденном спаме или None + """ + if not message.text: + return None + + text_lower = message.text.lower() + has_spam = self.check_spam(text_lower) + + if has_spam: + logger.warning( + f"Обнаружен спам в сообщении", + log_type='SPAM', + message=message + ) + return {'has_spam': True, 'text': text_lower} + + return None + + +class IsWhitelisted(BaseFilter): + """ + Проверяет, содержит ли сообщение слова из белого списка (исключения). + + Используется для защиты от ложных срабатываний спам-фильтра. + + Attributes: + check_whitelist_func: Функция проверки белого списка + + Example: + ```python + from utils.spam_checker import check_whitelist + + @router.message(IsWhitelisted(check_whitelist)) + async def whitelisted_message(message: Message): + # Сообщение содержит исключение, пропускаем проверку спама + pass + ``` + """ + + def __init__(self, check_whitelist_func: Callable[[str], bool]): + """ + Args: + check_whitelist_func: Функция для проверки белого списка + """ + self.check_whitelist = check_whitelist_func + + async def __call__(self, message: Message) -> Optional[bool]: + """ + Проверка на наличие в белом списке. + + Returns: + bool или None: True если в белом списке, None если нет + """ + if not message.text: + return None + + text_lower = message.text.lower() + is_whitelisted = self.check_whitelist(text_lower) + + if is_whitelisted: + logger.debug( + f"Сообщение содержит исключение из белого списка", + log_type='WHITELIST', + message=message + ) + return True + + return None