"""
Middleware для проверки сообщений на запрещённые слова (банворды).
...
✅ ИСПРАВЛЕНО:
- ❌ PatternError: bad character range 🀀-\\\\ (исправлено экранирование Unicode)
- ✅ НЕТ уведомлений в режиме тишины
"""
from typing import Callable, Dict, Any, Awaitable, Optional
import re
import unicodedata
from aiogram import BaseMiddleware
from aiogram.types import Message, InlineKeyboardMarkup, InlineKeyboardButton
from aiogram.exceptions import TelegramBadRequest
from configs import settings
from database import get_manager, BanWordType
from bot.special import process_text, extract_words, get_lemma
from middleware.loggers import logger
__all__ = ("BanWordsMiddleware",)
class BanWordsMiddleware(BaseMiddleware):
def __init__(self):
super().__init__()
self.manager = get_manager()
async def __call__(
self,
handler: Callable[[Message, Dict[str, Any]], Awaitable[Any]],
event: Message,
data: Dict[str, Any]
) -> Any:
if not event.text and not event.caption:
return await handler(event, data)
message_text = event.text or event.caption
if message_text.startswith('/'):
return await handler(event, data)
# Админ проверка
user_id = event.from_user.id
is_super_admin = user_id in settings.OWNER_ID
is_admin = is_super_admin or self.manager.is_admin_cached(user_id)
if is_admin:
return await handler(event, data)
spam_result = await self._check_message(message_text)
if spam_result:
await self._handle_spam(event, spam_result)
return None
return await handler(event, data)
@staticmethod
def _normalize_universal(text: str, mode: str = "strict") -> str:
"""✅ ИСПРАВЛЕНО: Универсальная нормализация для всех типов проверок"""
# БЕЗОПАСНАЯ нормализация - убираем все проблемные символы
text = unicodedata.normalize('NFKC', text)
if mode == "strict": # PART - сохраняем буквы, цифры, пробелы
# ✅ ИСПРАВЛЕНО: безопасный паттерн только для букв/цифр/пробелов
text = re.sub(r'[^a-zA-Zа-яА-ЯёЁ0-9\s]', '', text)
text = re.sub(r'\s+', ' ', text).strip()
else: # SUBSTRING/LEMMA - только буквы и цифры
text = re.sub(r'[^a-zA-Zа-яА-ЯёЁ0-9]', '', text)
return BanWordsMiddleware._normalize_repeated_chars(text)
@staticmethod
def _normalize_repeated_chars(text: str) -> str:
"""Убирает повторения >2 (лееееин → лейн)"""
return re.sub(r'([а-яёa-z])\\1{2,}', r'\\1\\1', text, flags=re.IGNORECASE)
def _check_repeated_chars(self, text: str) -> Optional[Dict[str, str]]:
"""🔥 Блокирует 3+ повторяющиеся символы подряд"""
# ✅ ИСПРАВЛЕНО: безопасный паттерн только для букв
pattern = r'([а-яёa-zA-Z])\\1{2,}'
matches = re.finditer(pattern, text, flags=re.IGNORECASE)
for match in matches:
char = match.group(1)
count = len(match.group(0))
if count >= 3:
logger.info(f"🔥 ПОВТОРЫ: '{match.group(0)}' ({count}x)", log_type="BANWORDS")
return {"word": f"'{match.group(0)}' ({count}x)", "type": "repeated_chars"}
return None
async def _check_message(self, text: str) -> Optional[Dict[str, str]]:
text_lower = text.lower()
# 🔥 1. Повторяющиеся символы (лееееин)
repeat_result = self._check_repeated_chars(text_lower)
if repeat_result:
return repeat_result
# 2. ✅ БЕЗОПАСНАЯ нормализация
text_universal = self._normalize_universal(text_lower, "strict") # PART
text_loose = self._normalize_universal(text_lower) # SUBSTRING/LEMMA
text_processed = process_text(text_lower)
logger.debug(
f"🔍 | universal='{text_universal}' | loose='{text_loose}' | proc='{text_processed}'",
log_type="BANWORDS"
)
# 3. WHITELIST
if (self.manager.is_whitelisted(text_processed) or
self.manager.is_whitelisted(text_loose) or
self.manager.is_whitelisted(text_universal)):
return None
# 4. SILENCE MODE
if await self.manager.is_silence_active():
return {"word": "[режим тишины]", "type": "silence"}
# 5. CONFLICT MODE
if await self.manager.is_conflict_active():
for word in self.manager.get_banwords_cached(BanWordType.CONFLICT_SUBSTRING):
word_norm = self._normalize_universal(word.lower(), "loose")
if word_norm in text_loose:
return {"word": word, "type": "conflict_substring"}
conflict_lemma = self.manager.get_banwords_cached(BanWordType.CONFLICT_LEMMA)
for word_text in extract_words(text_processed):
lemma = get_lemma(self._normalize_repeated_chars(word_text))
if lemma in conflict_lemma:
return {"word": lemma, "type": "conflict_lemma"}
# 6. SUBSTRING
for word in self.manager.get_banwords_cached(BanWordType.SUBSTRING):
word_norm = self._normalize_universal(word.lower())
if word_norm in text_loose:
logger.info(f"✅ SUBSTRING: '{word}' → '{text_loose}'", log_type="BANWORDS")
return {"word": word, "type": "substring"}
# 7. PART (строгая нормализация)
for part in self.manager.get_banwords_cached(BanWordType.PART):
part_norm = self._normalize_universal(part.lower(), "strict")
if part_norm in text_universal:
logger.info(f"✅ PART: '{part}' → '{text_universal}'", log_type="BANWORDS")
return {"word": part, "type": "part"}
# 8. LEMMA
for word_text in extract_words(text_processed):
lemma = get_lemma(self._normalize_repeated_chars(word_text))
if lemma in self.manager.get_banwords_cached(BanWordType.LEMMA):
logger.info(f"✅ LEMMA: '{lemma}' из '{word_text}'", log_type="BANWORDS")
return {"word": lemma, "type": "lemma"}
return None
async def _handle_spam(self, message: Message, spam_result: Dict[str, str]) -> None:
user = message.from_user
matched_word = spam_result["word"]
match_type = spam_result["type"]
message_text = message.text or message.caption or "[нет текста]"
# ✅ ПРОВЕРКА: НЕ отправляем уведомления в режиме тишины
if match_type == "silence":
# Удаляем сообщение молча
try:
await message.delete()
logger.info(f"🔇 SILENCE: @{user.username or user.id} удалено молча",
log_type="BANWORDS")
except TelegramBadRequest as e:
logger.error(f"❌ Не удалено (silence): {e}", log_type="BANWORDS")
return
# Удаляем
try:
await message.delete()
logger.info(f"🚫 @{user.username or user.id}: '{matched_word}' ({match_type})",
log_type="BANWORDS")
except TelegramBadRequest as e:
logger.error(f"❌ Не удалено: {e}", log_type="BANWORDS")
return
# Логируем в БД (только НЕ silence)
await self.manager.log_spam(
user_id=user.id,
username=user.username or f"id{user.id}",
chat_id=message.chat.id,
message_text=message_text,
matched_word=matched_word,
match_type=match_type
)
# Уведомляем админов (только НЕ silence)
await self._notify_admins(message, matched_word, match_type, message_text)
# Остальные методы без изменений...
async def _notify_admins(
self,
message: Message,
matched_word: str,
match_type: str,
message_text: str
) -> None:
user = message.from_user
username = f"@{user.username}" if user.username else f"ID: {user.id}"
spam_count = await self.manager.get_user_spam_count(user.id)
chat_title = message.chat.title or "Без названия"
source_thread_id = message.message_thread_id
notification_text = (
f"🚫 Удалено сообщение\\n\\n"
f"👤 Пользователь: {username}\\n"
f"🆔 ID: {user.id}\\n"
f"📊 Нарушений: {spam_count}\\n\\n"
f"💬 Чат: {self._escape_html(chat_title)}\\n"
f"🆔 Chat ID: {message.chat.id}\\n"
f"{'📌 Topic ID: {source_thread_id}\\n' if source_thread_id else ''}"
f"🔗 Message ID: {message.message_id}\\n\\n"
f"🔍 Триггер: {self._escape_html(matched_word)}\\n"
f"📝 Тип: {self._get_type_emoji(match_type)} {self._escape_html(match_type)}\\n\\n"
f"💬 Текст:\\n{self._escape_html(message_text[:500])}"
)
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[
InlineKeyboardButton(text="🔨 Забанить", callback_data=f"spam_ban:{user.id}:{message.chat.id}"),
InlineKeyboardButton(text="✅ Закрыть", callback_data="spam_close")
],
[InlineKeyboardButton(text="📊 Статистика", callback_data=f"spam_stats:{user.id}")]
])
try:
admin_chat_id = getattr(settings, "ADMIN_CHAT_ID", None)
admin_thread_id = getattr(settings, "ADMIN_THREAD_ID", None) or None
await message.bot.send_message(
chat_id=admin_chat_id,
text=notification_text,
reply_markup=keyboard,
parse_mode="HTML",
message_thread_id=admin_thread_id
)
except Exception as e:
logger.error(f"❌ Уведомление админам: {e}", log_type="BANWORDS")
@staticmethod
def _get_type_emoji(match_type: str) -> str:
return {
"substring": "🔤",
"lemma": "📖",
"part": "🧩",
"silence": "🔇",
"conflict_substring": "⚔️",
"conflict_lemma": "⚔️",
"repeated_chars": "🔁"
}.get(match_type, "❓")
@staticmethod
def _escape_html(text: str) -> str:
return str(text).replace("&", "&").replace("<", "<").replace(">", ">")