diff --git a/bot/middlewares/banwords_mdw.py b/bot/middlewares/banwords_mdw.py new file mode 100644 index 0000000..7d4f459 --- /dev/null +++ b/bot/middlewares/banwords_mdw.py @@ -0,0 +1,342 @@ +""" +Middleware для проверки сообщений на запрещённые слова (банворды). + +✅ ИСПРАВЛЕНО: +- Полная нормализация текста с использованием UNICODE_MAP +- Удаление повторов символов (леееейн → лейн) +- Игнорирование разделителей (л.е.й.н → лейн) +- Поддержка всех типов проверок (SUBSTRING, LEMMA, PART, CONFLICT) +- Белый список и режимы тишины/конфликта +- Нет уведомлений в режиме тишины +""" + +from typing import Callable, Dict, Any, Awaitable, Optional +import re +import unicodedata + +from aiogram import BaseMiddleware +from aiogram.types import Message, InlineKeyboardMarkup, InlineKeyboardButton +from aiogram.exceptions import TelegramBadRequest + +from configs import settings, UNICODE_MAP, LATIN_TO_CYRILLIC, CYRILLIC_NORMALIZE +from database import get_manager, BanWordType +from bot.special import extract_words, get_lemma +from middleware.loggers import logger + +__all__ = ("BanWordsMiddleware",) + + +class TextNormalizer: + """ + Класс для многоступенчатой нормализации текста. + Приводит различные юникод-символы к базовым буквам, + удаляет повторы, убирает разделители. + """ + + # Объединяем все словари замен в один + FULL_MAP = {} + FULL_MAP.update(LATIN_TO_CYRILLIC) + FULL_MAP.update(CYRILLIC_NORMALIZE) + FULL_MAP.update(UNICODE_MAP) + + # Символы-разделители, которые могут быть вставлены между буквами + SEPARATORS = re.compile(r'[\s.\-_,;:|]+', re.UNICODE) + + # Паттерн для поиска повторяющихся букв (3+ раза) + REPEAT_PATTERN = re.compile(r'([а-яёa-z])\1{2,}', re.IGNORECASE) + + @classmethod + def normalize_characters(cls, text: str) -> str: + """ + Заменяет все символы из FULL_MAP на их базовые эквиваленты. + Проходит по строке посимвольно для максимальной замены. + """ + result = [] + for ch in text: + # Сначала пробуем заменить по карте + if ch in cls.FULL_MAP: + result.append(cls.FULL_MAP[ch]) + else: + result.append(ch) + # Приводим к нижнему регистру после замен (чтобы избежать потери регистра в карте) + return ''.join(result).lower() + + @classmethod + def remove_separators(cls, text: str) -> str: + """Удаляет разделители между буквами (пробелы, точки и т.д.)""" + return cls.SEPARATORS.sub('', text) + + @classmethod + def collapse_repeats(cls, text: str, max_repeat: int = 2) -> str: + def repl(m): + ch = m.group(1) + return ch # вместо ch * 2 — теперь схлопываем до одного символа + + return cls.REPEAT_PATTERN.sub(repl, text) + + @classmethod + def normalize_full(cls, text: str, remove_sep: bool = True, collapse: bool = True) -> str: + """ + Полная нормализация: + 1. Unicode нормализация (NFKC) для разложения составных символов + 2. Замена по карте + 3. Приведение к нижнему регистру + 4. Удаление разделителей (опционально) + 5. Схлопывание повторов (опционально) + """ + # NFKC разлагает символы типа "ё" в "е" + умляут, но нам лучше оставить как есть, + # т.к. у нас есть прямые замены. Однако для совместимости применим. + text = unicodedata.normalize('NFKC', text) + # Замена символов + text = cls.normalize_characters(text) + # Удаление разделителей + if remove_sep: + text = cls.remove_separators(text) + # Схлопывание повторов + if collapse: + text = cls.collapse_repeats(text) + return text + + @classmethod + def normalize_for_part(cls, text: str) -> str: + """ + Нормализация для типа PART: + - Полная нормализация + - Удаление всех не-буквенных символов (кроме пробелов) + - Приведение к нижнему регистру + """ + text = cls.normalize_full(text, remove_sep=False, collapse=True) + # Оставляем только буквы и пробелы + text = re.sub(r'[^а-яёa-z\s]', '', text, flags=re.IGNORECASE) + text = re.sub(r'\s+', ' ', text).strip() + return text.lower() + + +class BanWordsMiddleware(BaseMiddleware): + def __init__(self): + super().__init__() + self.manager = get_manager() + self.normalizer = TextNormalizer() + + async def __call__( + self, + handler: Callable[[Message, Dict[str, Any]], Awaitable[Any]], + event: Message, + data: Dict[str, Any] + ) -> Any: + # Проверяем наличие текста или подписи + if not event.text and not event.caption: + return await handler(event, data) + + message_text = event.text or event.caption + + # Игнорируем команды + if message_text.startswith('/'): + return await handler(event, data) + + # Проверка на админа + user_id = event.from_user.id + is_super_admin = user_id in settings.OWNER_ID + is_admin = is_super_admin or self.manager.is_admin_cached(user_id) + if is_admin: + return await handler(event, data) + + # Проверяем сообщение на спам + spam_result = await self._check_message(message_text) + if spam_result: + await self._handle_spam(event, spam_result) + return None # Сообщение удалено, дальше не обрабатываем + + return await handler(event, data) + + async def _check_message(self, text: str) -> Optional[Dict[str, str]]: + """ + Многоступенчатая проверка текста. + Возвращает словарь с причиной блокировки или None. + """ + # 1. Повторяющиеся символы (например, "леееейн") — блокируем сразу + # repeat_result = self._check_repeated_chars(text) + # if repeat_result: + # return repeat_result + + # 2. Получаем кэшированные списки + substring_words = self.manager.get_banwords_cached(BanWordType.SUBSTRING) + lemma_words = self.manager.get_banwords_cached(BanWordType.LEMMA) + part_words = self.manager.get_banwords_cached(BanWordType.PART) + conflict_substring = self.manager.get_banwords_cached(BanWordType.CONFLICT_SUBSTRING) + conflict_lemma = self.manager.get_banwords_cached(BanWordType.CONFLICT_LEMMA) + + # 3. Белый список + if self.manager.is_whitelisted(text): + logger.debug(f"⏭️ Пропуск по белому списку: {text[:30]}", log_type="BANWORDS") + return None + + # 4. Режим тишины + if await self.manager.is_silence_active(): + return {"word": "[режим тишины]", "type": "silence"} + + # 5. Режим конфликта (более мягкие правила) + if await self.manager.is_conflict_active(): + # Проверка conflict_substring (с нормализацией) + normalized_text = self.normalizer.normalize_full(text, remove_sep=True, collapse=True) + for word in conflict_substring: + norm_word = self.normalizer.normalize_full(word, remove_sep=True, collapse=True) + if norm_word in normalized_text: + return {"word": word, "type": "conflict_substring"} + + # conflict_lemma + for word_text in extract_words(text): + lemma = get_lemma(word_text) + if lemma in conflict_lemma: + return {"word": lemma, "type": "conflict_lemma"} + + # Если в конфликтном режиме ничего не найдено — пропускаем + return None + + # 6. Обычный режим: проверка substring (с удалением разделителей и схлопыванием повторов) + normalized_text = self.normalizer.normalize_full(text, remove_sep=True, collapse=True) + for word in substring_words: + norm_word = self.normalizer.normalize_full(word, remove_sep=True, collapse=True) + if norm_word in normalized_text: + logger.info(f"✅ SUBSTRING: '{word}'", log_type="BANWORDS") + return {"word": word, "type": "substring"} + + # 7. Проверка part (строгая нормализация, только буквы и пробелы) + part_normalized = self.normalizer.normalize_for_part(text) + for part in part_words: + norm_part = self.normalizer.normalize_for_part(part) + if norm_part in part_normalized: + logger.info(f"✅ PART: '{part}'", log_type="BANWORDS") + return {"word": part, "type": "part"} + + # 8. Проверка lemma + for word_text in extract_words(text): + # Для леммы тоже применяем нормализацию (удаляем разделители, схлопываем повторы) + normalized_word = self.normalizer.normalize_full(word_text, remove_sep=True, collapse=True) + lemma = get_lemma(normalized_word) + if lemma in lemma_words: + logger.info(f"✅ LEMMA: '{lemma}' из '{word_text}'", log_type="BANWORDS") + return {"word": lemma, "type": "lemma"} + + return None + + def _check_repeated_chars(self, text: str) -> Optional[Dict[str, str]]: + """ + Проверяет на наличие 3+ повторяющихся букв подряд. + Использует сырой текст без нормализации (чтобы поймать "леееейн"). + """ + # Ищем повторения букв (только кириллица/латиница) + pattern = re.compile(r'([а-яёa-zA-Z])\1{2,}', re.IGNORECASE) + matches = pattern.finditer(text) + for match in matches: + char = match.group(1) + count = len(match.group(0)) + if count >= 3: + logger.info(f"🔥 ПОВТОРЫ: '{match.group(0)}' ({count}x)", log_type="BANWORDS") + return {"word": f"'{match.group(0)}' ({count}x)", "type": "repeated_chars"} + return None + + async def _handle_spam(self, message: Message, spam_result: Dict[str, str]) -> None: + """Обрабатывает спам-сообщение: удаляет, логирует, уведомляет (кроме silence)""" + user = message.from_user + matched_word = spam_result["word"] + match_type = spam_result["type"] + message_text = message.text or message.caption or "[нет текста]" + + # В режиме тишины удаляем молча + if match_type == "silence": + try: + await message.delete() + logger.info(f"🔇 SILENCE: @{user.username or user.id} удалено молча", log_type="BANWORDS") + except TelegramBadRequest as e: + logger.error(f"❌ Не удалено (silence): {e}", log_type="BANWORDS") + return + + # Удаляем сообщение + try: + await message.delete() + logger.info(f"🚫 @{user.username or user.id}: '{matched_word}' ({match_type})", log_type="BANWORDS") + except TelegramBadRequest as e: + logger.error(f"❌ Не удалено: {e}", log_type="BANWORDS") + return + + # Логируем в БД + await self.manager.log_spam( + user_id=user.id, + username=user.username or f"id{user.id}", + chat_id=message.chat.id, + message_text=message_text, + matched_word=matched_word, + match_type=match_type + ) + + # Уведомляем админов + await self._notify_admins(message, matched_word, match_type, message_text) + + async def _notify_admins( + self, + message: Message, + matched_word: str, + match_type: str, + message_text: str +) -> None: + """Отправляет уведомление об удалении в админ-чат (берёт ID из БД)""" + user = message.from_user + username = f"@{user.username}" if user.username else f"ID: {user.id}" + spam_count = await self.manager.get_user_spam_count(user.id) + chat_title = message.chat.title or "Без названия" + source_thread_id = message.message_thread_id + + notification_text = ( + f"🚫 Удалено сообщение\n\n" + f"👤 Пользователь: {username}\n" + f"🆔 ID: {user.id}\n" + f"📊 Нарушений: {spam_count}\n\n" + f"💬 Чат: {self._escape_html(chat_title)}\n" + f"🆔 Chat ID: {message.chat.id}\n" + f"{'📌 Topic ID: ' + str(source_thread_id) + '\n' if source_thread_id else ''}" + f"🔗 Message ID: {message.message_id}\n\n" + f"🔍 Триггер: {self._escape_html(matched_word)}\n" + f"📝 Тип: {self._get_type_emoji(match_type)} {self._escape_html(match_type)}\n\n" + f"💬 Текст:\n{self._escape_html(message_text[:500])}" + ) + + keyboard = InlineKeyboardMarkup(inline_keyboard=[ + [ + InlineKeyboardButton(text="🔨 Забанить", callback_data=f"spam_ban:{user.id}:{message.chat.id}"), + InlineKeyboardButton(text="✅ Закрыть", callback_data="spam_close") + ], + [InlineKeyboardButton(text="📊 Статистика", callback_data=f"spam_stats:{user.id}")] + ]) + + try: + # ✅ Получаем настройки из БД (динамические, установленные через /settings) + admin_chat_id = await self.manager.get_bot_setting("admin_chat_id") + admin_thread_id = await self.manager.get_bot_setting("admin_thread_id") + + if admin_chat_id: + await message.bot.send_message( + chat_id=int(admin_chat_id), + text=notification_text, + reply_markup=keyboard, + parse_mode="HTML", + message_thread_id=int(admin_thread_id) if admin_thread_id else None + ) + except Exception as e: + logger.error(f"❌ Уведомление админам: {e}", log_type="BANWORDS") + + @staticmethod + def _get_type_emoji(match_type: str) -> str: + return { + "substring": "🔤", + "lemma": "📖", + "part": "🧩", + "silence": "🔇", + "conflict_substring": "⚔️", + "conflict_lemma": "⚔️", + "repeated_chars": "🔁" + }.get(match_type, "❓") + + @staticmethod + def _escape_html(text: str) -> str: + return str(text).replace("&", "&").replace("<", "<").replace(">", ">")