Основной фильтр на запрещенные слова

This commit is contained in:
2026-02-23 14:34:06 +07:00
parent 69e7342195
commit 9d5c380a6c

View File

@@ -0,0 +1,342 @@
"""
Middleware для проверки сообщений на запрещённые слова (банворды).
✅ ИСПРАВЛЕНО:
- Полная нормализация текста с использованием UNICODE_MAP
- Удаление повторов символов (леееейн → лейн)
- Игнорирование разделителей (л.е.й.н → лейн)
- Поддержка всех типов проверок (SUBSTRING, LEMMA, PART, CONFLICT)
- Белый список и режимы тишины/конфликта
- Нет уведомлений в режиме тишины
"""
from typing import Callable, Dict, Any, Awaitable, Optional
import re
import unicodedata
from aiogram import BaseMiddleware
from aiogram.types import Message, InlineKeyboardMarkup, InlineKeyboardButton
from aiogram.exceptions import TelegramBadRequest
from configs import settings, UNICODE_MAP, LATIN_TO_CYRILLIC, CYRILLIC_NORMALIZE
from database import get_manager, BanWordType
from bot.special import extract_words, get_lemma
from middleware.loggers import logger
__all__ = ("BanWordsMiddleware",)
class TextNormalizer:
"""
Класс для многоступенчатой нормализации текста.
Приводит различные юникод-символы к базовым буквам,
удаляет повторы, убирает разделители.
"""
# Объединяем все словари замен в один
FULL_MAP = {}
FULL_MAP.update(LATIN_TO_CYRILLIC)
FULL_MAP.update(CYRILLIC_NORMALIZE)
FULL_MAP.update(UNICODE_MAP)
# Символы-разделители, которые могут быть вставлены между буквами
SEPARATORS = re.compile(r'[\s.\-_,;:|]+', re.UNICODE)
# Паттерн для поиска повторяющихся букв (3+ раза)
REPEAT_PATTERN = re.compile(r'([а-яёa-z])\1{2,}', re.IGNORECASE)
@classmethod
def normalize_characters(cls, text: str) -> str:
"""
Заменяет все символы из FULL_MAP на их базовые эквиваленты.
Проходит по строке посимвольно для максимальной замены.
"""
result = []
for ch in text:
# Сначала пробуем заменить по карте
if ch in cls.FULL_MAP:
result.append(cls.FULL_MAP[ch])
else:
result.append(ch)
# Приводим к нижнему регистру после замен (чтобы избежать потери регистра в карте)
return ''.join(result).lower()
@classmethod
def remove_separators(cls, text: str) -> str:
"""Удаляет разделители между буквами (пробелы, точки и т.д.)"""
return cls.SEPARATORS.sub('', text)
@classmethod
def collapse_repeats(cls, text: str, max_repeat: int = 2) -> str:
def repl(m):
ch = m.group(1)
return ch # вместо ch * 2 — теперь схлопываем до одного символа
return cls.REPEAT_PATTERN.sub(repl, text)
@classmethod
def normalize_full(cls, text: str, remove_sep: bool = True, collapse: bool = True) -> str:
"""
Полная нормализация:
1. Unicode нормализация (NFKC) для разложения составных символов
2. Замена по карте
3. Приведение к нижнему регистру
4. Удаление разделителей (опционально)
5. Схлопывание повторов (опционально)
"""
# NFKC разлагает символы типа "ё" в "е" + умляут, но нам лучше оставить как есть,
# т.к. у нас есть прямые замены. Однако для совместимости применим.
text = unicodedata.normalize('NFKC', text)
# Замена символов
text = cls.normalize_characters(text)
# Удаление разделителей
if remove_sep:
text = cls.remove_separators(text)
# Схлопывание повторов
if collapse:
text = cls.collapse_repeats(text)
return text
@classmethod
def normalize_for_part(cls, text: str) -> str:
"""
Нормализация для типа PART:
- Полная нормализация
- Удаление всех не-буквенных символов (кроме пробелов)
- Приведение к нижнему регистру
"""
text = cls.normalize_full(text, remove_sep=False, collapse=True)
# Оставляем только буквы и пробелы
text = re.sub(r'[^а-яёa-z\s]', '', text, flags=re.IGNORECASE)
text = re.sub(r'\s+', ' ', text).strip()
return text.lower()
class BanWordsMiddleware(BaseMiddleware):
def __init__(self):
super().__init__()
self.manager = get_manager()
self.normalizer = TextNormalizer()
async def __call__(
self,
handler: Callable[[Message, Dict[str, Any]], Awaitable[Any]],
event: Message,
data: Dict[str, Any]
) -> Any:
# Проверяем наличие текста или подписи
if not event.text and not event.caption:
return await handler(event, data)
message_text = event.text or event.caption
# Игнорируем команды
if message_text.startswith('/'):
return await handler(event, data)
# Проверка на админа
user_id = event.from_user.id
is_super_admin = user_id in settings.OWNER_ID
is_admin = is_super_admin or self.manager.is_admin_cached(user_id)
if is_admin:
return await handler(event, data)
# Проверяем сообщение на спам
spam_result = await self._check_message(message_text)
if spam_result:
await self._handle_spam(event, spam_result)
return None # Сообщение удалено, дальше не обрабатываем
return await handler(event, data)
async def _check_message(self, text: str) -> Optional[Dict[str, str]]:
"""
Многоступенчатая проверка текста.
Возвращает словарь с причиной блокировки или None.
"""
# 1. Повторяющиеся символы (например, "леееейн") — блокируем сразу
# repeat_result = self._check_repeated_chars(text)
# if repeat_result:
# return repeat_result
# 2. Получаем кэшированные списки
substring_words = self.manager.get_banwords_cached(BanWordType.SUBSTRING)
lemma_words = self.manager.get_banwords_cached(BanWordType.LEMMA)
part_words = self.manager.get_banwords_cached(BanWordType.PART)
conflict_substring = self.manager.get_banwords_cached(BanWordType.CONFLICT_SUBSTRING)
conflict_lemma = self.manager.get_banwords_cached(BanWordType.CONFLICT_LEMMA)
# 3. Белый список
if self.manager.is_whitelisted(text):
logger.debug(f"⏭️ Пропуск по белому списку: {text[:30]}", log_type="BANWORDS")
return None
# 4. Режим тишины
if await self.manager.is_silence_active():
return {"word": "[режим тишины]", "type": "silence"}
# 5. Режим конфликта (более мягкие правила)
if await self.manager.is_conflict_active():
# Проверка conflict_substring (с нормализацией)
normalized_text = self.normalizer.normalize_full(text, remove_sep=True, collapse=True)
for word in conflict_substring:
norm_word = self.normalizer.normalize_full(word, remove_sep=True, collapse=True)
if norm_word in normalized_text:
return {"word": word, "type": "conflict_substring"}
# conflict_lemma
for word_text in extract_words(text):
lemma = get_lemma(word_text)
if lemma in conflict_lemma:
return {"word": lemma, "type": "conflict_lemma"}
# Если в конфликтном режиме ничего не найдено — пропускаем
return None
# 6. Обычный режим: проверка substring (с удалением разделителей и схлопыванием повторов)
normalized_text = self.normalizer.normalize_full(text, remove_sep=True, collapse=True)
for word in substring_words:
norm_word = self.normalizer.normalize_full(word, remove_sep=True, collapse=True)
if norm_word in normalized_text:
logger.info(f"✅ SUBSTRING: '{word}'", log_type="BANWORDS")
return {"word": word, "type": "substring"}
# 7. Проверка part (строгая нормализация, только буквы и пробелы)
part_normalized = self.normalizer.normalize_for_part(text)
for part in part_words:
norm_part = self.normalizer.normalize_for_part(part)
if norm_part in part_normalized:
logger.info(f"✅ PART: '{part}'", log_type="BANWORDS")
return {"word": part, "type": "part"}
# 8. Проверка lemma
for word_text in extract_words(text):
# Для леммы тоже применяем нормализацию (удаляем разделители, схлопываем повторы)
normalized_word = self.normalizer.normalize_full(word_text, remove_sep=True, collapse=True)
lemma = get_lemma(normalized_word)
if lemma in lemma_words:
logger.info(f"✅ LEMMA: '{lemma}' из '{word_text}'", log_type="BANWORDS")
return {"word": lemma, "type": "lemma"}
return None
def _check_repeated_chars(self, text: str) -> Optional[Dict[str, str]]:
"""
Проверяет на наличие 3+ повторяющихся букв подряд.
Использует сырой текст без нормализации (чтобы поймать "леееейн").
"""
# Ищем повторения букв (только кириллица/латиница)
pattern = re.compile(r'([а-яёa-zA-Z])\1{2,}', re.IGNORECASE)
matches = pattern.finditer(text)
for match in matches:
char = match.group(1)
count = len(match.group(0))
if count >= 3:
logger.info(f"🔥 ПОВТОРЫ: '{match.group(0)}' ({count}x)", log_type="BANWORDS")
return {"word": f"'{match.group(0)}' ({count}x)", "type": "repeated_chars"}
return None
async def _handle_spam(self, message: Message, spam_result: Dict[str, str]) -> None:
"""Обрабатывает спам-сообщение: удаляет, логирует, уведомляет (кроме silence)"""
user = message.from_user
matched_word = spam_result["word"]
match_type = spam_result["type"]
message_text = message.text or message.caption or "[нет текста]"
# В режиме тишины удаляем молча
if match_type == "silence":
try:
await message.delete()
logger.info(f"🔇 SILENCE: @{user.username or user.id} удалено молча", log_type="BANWORDS")
except TelegramBadRequest as e:
logger.error(f"Не удалено (silence): {e}", log_type="BANWORDS")
return
# Удаляем сообщение
try:
await message.delete()
logger.info(f"🚫 @{user.username or user.id}: '{matched_word}' ({match_type})", log_type="BANWORDS")
except TelegramBadRequest as e:
logger.error(f"Не удалено: {e}", log_type="BANWORDS")
return
# Логируем в БД
await self.manager.log_spam(
user_id=user.id,
username=user.username or f"id{user.id}",
chat_id=message.chat.id,
message_text=message_text,
matched_word=matched_word,
match_type=match_type
)
# Уведомляем админов
await self._notify_admins(message, matched_word, match_type, message_text)
async def _notify_admins(
self,
message: Message,
matched_word: str,
match_type: str,
message_text: str
) -> None:
"""Отправляет уведомление об удалении в админ-чат (берёт ID из БД)"""
user = message.from_user
username = f"@{user.username}" if user.username else f"ID: {user.id}"
spam_count = await self.manager.get_user_spam_count(user.id)
chat_title = message.chat.title or "Без названия"
source_thread_id = message.message_thread_id
notification_text = (
f"🚫 <b>Удалено сообщение</b>\n\n"
f"👤 <b>Пользователь:</b> {username}\n"
f"🆔 <b>ID:</b> <code>{user.id}</code>\n"
f"📊 <b>Нарушений:</b> {spam_count}\n\n"
f"💬 <b>Чат:</b> {self._escape_html(chat_title)}\n"
f"🆔 <b>Chat ID:</b> <code>{message.chat.id}</code>\n"
f"{'📌 <b>Topic ID:</b> <code>' + str(source_thread_id) + '</code>\n' if source_thread_id else ''}"
f"🔗 <b>Message ID:</b> <code>{message.message_id}</code>\n\n"
f"🔍 <b>Триггер:</b> <code>{self._escape_html(matched_word)}</code>\n"
f"📝 <b>Тип:</b> {self._get_type_emoji(match_type)} {self._escape_html(match_type)}\n\n"
f"💬 <b>Текст:</b>\n<code>{self._escape_html(message_text[:500])}</code>"
)
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[
InlineKeyboardButton(text="🔨 Забанить", callback_data=f"spam_ban:{user.id}:{message.chat.id}"),
InlineKeyboardButton(text="✅ Закрыть", callback_data="spam_close")
],
[InlineKeyboardButton(text="📊 Статистика", callback_data=f"spam_stats:{user.id}")]
])
try:
# ✅ Получаем настройки из БД (динамические, установленные через /settings)
admin_chat_id = await self.manager.get_bot_setting("admin_chat_id")
admin_thread_id = await self.manager.get_bot_setting("admin_thread_id")
if admin_chat_id:
await message.bot.send_message(
chat_id=int(admin_chat_id),
text=notification_text,
reply_markup=keyboard,
parse_mode="HTML",
message_thread_id=int(admin_thread_id) if admin_thread_id else None
)
except Exception as e:
logger.error(f"❌ Уведомление админам: {e}", log_type="BANWORDS")
@staticmethod
def _get_type_emoji(match_type: str) -> str:
return {
"substring": "🔤",
"lemma": "📖",
"part": "🧩",
"silence": "🔇",
"conflict_substring": "⚔️",
"conflict_lemma": "⚔️",
"repeated_chars": "🔁"
}.get(match_type, "")
@staticmethod
def _escape_html(text: str) -> str:
return str(text).replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;")