@@ -1,11 +1,16 @@
"""
Middleware для проверки сообщений на запрещённые слова (банворды).
...
✅ ИСПРАВЛЕНО:
- ❌ PatternError: bad character range 🀀- \\ \\ (исправлено экранир ование Unicode)
- ✅ Н Е Т уведомлений в режиме тишины
- Полная нормализация текста с использ ованием UNICODE_MAP
- Удаление повторов символов (леееейн → лейн)
- Игнорирование разделителей (л.е .й.н → лейн)
- Поддержка всех типов проверок (SUBSTRING, LEMMA, PART, CONFLICT)
- Белый список и режимы тишины/конфликта
- Нет уведомлений в режиме тишины
"""
from typing import Callable , Dict , Any , Awaitable , Optional
from typing import Callable , Dict , Any , Awaitable , Optional , Set
import re
import unicodedata
@@ -13,7 +18,7 @@ from aiogram import BaseMiddleware
from aiogram . types import Message , InlineKeyboardMarkup , InlineKeyboardButton
from aiogram . exceptions import TelegramBadRequest
from configs import settings
from configs import settings , UNICODE_MAP , LATIN_TO_CYRILLIC , CYRILLIC_NORMALIZE
from database import get_manager , BanWordType
from bot . special import process_text , extract_words , get_lemma
from middleware . loggers import logger
@@ -21,10 +26,102 @@ from middleware.loggers import logger
__all__ = ( " BanWordsMiddleware " , )
class TextNormalizer :
"""
Класс для многоступенчатой нормализации текста.
Приводит различные юникод-символы к базовым буквам,
удаляет повторы, убирает разделители.
"""
# Объединяем все словари замен в один
FULL_MAP = { }
FULL_MAP . update ( LATIN_TO_CYRILLIC )
FULL_MAP . update ( CYRILLIC_NORMALIZE )
FULL_MAP . update ( UNICODE_MAP )
# Символы-разделители, которые могут быть вставлены между буквами
SEPARATORS = re . compile ( r ' [ \ s \ . \ -_,;:|]+ ' , re . UNICODE )
# Паттерн для поиска повторяющихся букв (3+ раза)
REPEAT_PATTERN = re . compile ( r ' ([а -яёa-z]) \ 1 { 2,} ' , re . IGNORECASE )
@classmethod
def normalize_characters ( cls , text : str ) - > str :
"""
Заменяет все символы из FULL_MAP на их базовые эквиваленты.
Проходит по строке посимвольно для максимальной замены.
"""
result = [ ]
for ch in text :
# Сначала пробуем заменить по карте
if ch in cls . FULL_MAP :
result . append ( cls . FULL_MAP [ ch ] )
else :
result . append ( ch )
# Приводим к нижнему регистру после замен (чтобы избежать потери регистра в карте)
return ' ' . join ( result ) . lower ( )
@classmethod
def remove_separators ( cls , text : str ) - > str :
""" Удаляет разделители между буквами (пробелы, точки и т.д.) """
return cls . SEPARATORS . sub ( ' ' , text )
@classmethod
def collapse_repeats ( cls , text : str , max_repeat : int = 2 ) - > str :
"""
Заменяет повторения символов более max_repeat подряд на один/два символа.
По умолчанию оставляет максимум 2 (леееейн → леейн? но обычно хватит 2).
Можно настроить: для банворда " лейн " превратит " леееейн " в " леейн " , что всё равно содержит " лейн " .
"""
def repl ( m ) :
ch = m . group ( 1 )
# Оставляем два символа, чтобы не терять удвоенные буквы (например, "дд" в слове "поддон")
return ch * 2
return cls . REPEAT_PATTERN . sub ( repl , text )
@classmethod
def normalize_full ( cls , text : str , remove_sep : bool = True , collapse : bool = True ) - > str :
"""
Полная нормализация:
1. Unicode нормализация (NFKC) для разложения составных символов
2. Замена по карте
3. Приведение к нижнему регистру
4. Удаление разделителей (опционально)
5. Схлопывание повторов (опционально)
"""
# NFKC разлагает символы типа "ё" в "е " + умляут, но нам лучше оставить как есть,
# т.к. у нас есть прямые замены. Однако для совместимости применим.
text = unicodedata . normalize ( ' NFKC ' , text )
# Замена символов
text = cls . normalize_characters ( text )
# Удаление разделителей
if remove_sep :
text = cls . remove_separators ( text )
# Схлопывание повторов
if collapse :
text = cls . collapse_repeats ( text )
return text
@classmethod
def normalize_for_part ( cls , text : str ) - > str :
"""
Нормализация для типа PART:
- Полная нормализация
- Удаление всех не-буквенных символов (кроме пробелов)
- Приведение к нижнему регистру
"""
text = cls . normalize_full ( text , remove_sep = False , collapse = True )
# Оставляем только буквы и пробелы
text = re . sub ( r ' [^а -яёa-z \ s] ' , ' ' , text , flags = re . IGNORECASE )
text = re . sub ( r ' \ s+ ' , ' ' , text ) . strip ( )
return text . lower ( )
class BanWordsMiddleware ( BaseMiddleware ) :
def __init__ ( self ) :
super ( ) . __init__ ( )
self . manager = get_manager ( )
self . normalizer = TextNormalizer ( )
async def __call__ (
self ,
@@ -32,53 +129,110 @@ class BanWordsMiddleware(BaseMiddleware):
event : Message ,
data : Dict [ str , Any ]
) - > Any :
# Проверяем наличие текста или подписи
if not event . text and not event . caption :
return await handler ( event , data )
message_text = event . text or event . caption
# Игнорируем команды
if message_text . startswith ( ' / ' ) :
return await handler ( event , data )
# Админ проверк а
# Проверка на админ а
user_id = event . from_user . id
is_super_admin = user_id in settings . OWNER_ID
is_admin = is_super_admin or self . manager . is_admin_cached ( user_id )
if is_admin :
return await handler ( event , data )
# Проверяем сообщение на спам
spam_result = await self . _check_message ( message_text )
if spam_result :
await self . _handle_spam ( event , spam_result )
return None
return None # Сообщение удалено, дальше не обрабатываем
return await handler ( event , data )
@staticmethod
def _normalize_universal ( text : str , mode : str = " strict " ) - > str :
""" ✅ ИСПРАВЛЕНО: Универсальная нормализация для всех типов проверок """
# БЕЗОПАСНАЯ нормализация - убираем все проблемные символы
text = unicodedata . normalize ( ' NFKC ' , text )
async def _check_message ( self , text : str ) - > Optional [ Dict [ str , str ] ] :
"""
Многоступенчатая проверка текста.
Возвращает словарь с причиной блокировки или None.
"""
# 1. Повторяющиеся символы (например, "леееейн") — блокируем сразу
repeat_result = self . _check_repeated_chars ( text )
if repeat_result :
return repeat_result
if mode == " strict " : # PART - сохраняем буквы, цифры, пробелы
# ✅ ИСПРАВЛЕНО: безопасный паттерн только для букв/цифр/пробелов
text = re . sub ( r ' [^a-zA-Zа -яА-ЯёЁ0-9 \ s] ' , ' ' , text )
text = re . sub ( r ' \ s+ ' , ' ' , text ) . strip ( )
else : # SUBSTRING/LEMMA - только буквы и цифры
text = re . sub ( r ' [^a-zA-Zа -яА-ЯёЁ0-9] ' , ' ' , text )
# 2. Получаем кэшированные списки
substring_words = self . manager . get_banwords_cached ( BanWordType . SUBSTRING )
lemma_words = self . manager . get_banwords_cached ( BanWordType . LEMMA )
part_words = self . manager . get_banwords_cached ( BanWordType . PART )
conflict_substring = self . manager . get_banwords_cached ( BanWordType . CONFLICT_SUBSTRING )
conflict_lemma = self . manager . get_banwords_cached ( BanWordType . CONFLICT_LEMMA )
return BanWordsMiddleware . _normalize_repeated_chars ( text )
# 3. Белый список
if self . manager . is_whitelisted ( text ) :
logger . debug ( f " ⏭️ Пропуск по белому списку: { text [ : 30 ] } " , log_type = " BANWORDS " )
return None
@staticmethod
de f _normalize_repeated_chars ( text : str ) - > str :
""" Убирает повторения >2 (лееееин → лейн) """
return re . sub ( r ' ([а -яёa-z]) \\ 1 { 2,} ' , r ' \\ 1 \\ 1 ' , text , flags = re . IGNORECASE )
# 4. Режим тишины
i f await self . manager . is_silence_active ( ) :
return { " word " : " [режим тишины] " , " type " : " silence " }
# 5. Режим конфликта (более мягкие правила)
if await self . manager . is_conflict_active ( ) :
# Проверка conflict_substring (с нормализацией)
normalized_text = self . normalizer . normalize_full ( text , remove_sep = True , collapse = True )
for word in conflict_substring :
norm_word = self . normalizer . normalize_full ( word , remove_sep = True , collapse = True )
if norm_word in normalized_text :
return { " word " : word , " type " : " conflict_substring " }
# conflict_lemma
for word_text in extract_words ( text ) :
lemma = get_lemma ( word_text )
if lemma in conflict_lemma :
return { " word " : lemma , " type " : " conflict_lemma " }
# Если в конфликтном режиме ничего не найдено — пропускаем
return None
# 6. Обычный режим: проверка substring (с удалением разделителей и схлопыванием повторов)
normalized_text = self . normalizer . normalize_full ( text , remove_sep = True , collapse = True )
for word in substring_words :
norm_word = self . normalizer . normalize_full ( word , remove_sep = True , collapse = True )
if norm_word in normalized_text :
logger . info ( f " ✅ SUBSTRING: ' { word } ' " , log_type = " BANWORDS " )
return { " word " : word , " type " : " substring " }
# 7. Проверка part (строгая нормализация, только буквы и пробелы)
part_normalized = self . normalizer . normalize_for_part ( text )
for part in part_words :
norm_part = self . normalizer . normalize_for_part ( part )
if norm_part in part_normalized :
logger . info ( f " ✅ PART: ' { part } ' " , log_type = " BANWORDS " )
return { " word " : part , " type " : " part " }
# 8. Проверка lemma
for word_text in extract_words ( text ) :
# Для леммы тоже применяем нормализацию (удаляем разделители, схлопываем повторы)
normalized_word = self . normalizer . normalize_full ( word_text , remove_sep = True , collapse = True )
lemma = get_lemma ( normalized_word )
if lemma in lemma_words :
logger . info ( f " ✅ LEMMA: ' { lemma } ' из ' { word_text } ' " , log_type = " BANWORDS " )
return { " word " : lemma , " type " : " lemma " }
return None
def _check_repeated_chars ( self , text : str ) - > Optional [ Dict [ str , str ] ] :
""" 🔥 Блокирует 3+ повторяющиеся символы подряд """
# ✅ ИСПРАВЛЕНО: безопасный паттерн только для букв
pattern = r ' ([а -яёa-zA-Z]) \\ 1 { 2,} '
matches = re . finditer ( pattern , text , flags = re . IGNORECASE )
"""
Проверяет на наличие 3+ повторяющихся букв подряд.
Использует сырой текст без нормализации (чтобы поймать " леееейн " ).
"""
# Ищем повторения букв (только кириллица/латиница)
pattern = re . compile ( r ' ([а -яёa-zA-Z]) \ 1 { 2,} ' , re . IGNORECASE )
matches = pattern . finditer ( text )
for match in matches :
char = match . group ( 1 )
count = len ( match . group ( 0 ) )
@@ -87,97 +241,31 @@ class BanWordsMiddleware(BaseMiddleware):
return { " word " : f " ' { match . group ( 0 ) } ' ( { count } x) " , " type " : " repeated_chars " }
return None
async def _check_message ( self , text : str ) - > Optional [ Dict [ str , str ] ] :
text_lower = text . lower ( )
# 🔥 1. Повторяющиеся символы (лееееин)
repeat_result = self . _check_repeated_chars ( text_lower )
if repeat_result :
return repeat_result
# 2. ✅ БЕЗОПАСНАЯ нормализация
text_universal = self . _normalize_universal ( text_lower , " strict " ) # PART
text_loose = self . _normalize_universal ( text_lower ) # SUBSTRING/LEMMA
text_processed = process_text ( text_lower )
logger . debug (
f " 🔍 | universal= ' { text_universal } ' | loose= ' { text_loose } ' | proc= ' { text_processed } ' " ,
log_type = " BANWORDS "
)
# 3. WHITELIST
if ( self . manager . is_whitelisted ( text_processed ) or
self . manager . is_whitelisted ( text_loose ) or
self . manager . is_whitelisted ( text_universal ) ) :
return None
# 4. SILENCE MODE
if await self . manager . is_silence_active ( ) :
return { " word " : " [режим тишины] " , " type " : " silence " }
# 5. CONFLICT MODE
if await self . manager . is_conflict_active ( ) :
for word in self . manager . get_banwords_cached ( BanWordType . CONFLICT_SUBSTRING ) :
word_norm = self . _normalize_universal ( word . lower ( ) , " loose " )
if word_norm in text_loose :
return { " word " : word , " type " : " conflict_substring " }
conflict_lemma = self . manager . get_banwords_cached ( BanWordType . CONFLICT_LEMMA )
for word_text in extract_words ( text_processed ) :
lemma = get_lemma ( self . _normalize_repeated_chars ( word_text ) )
if lemma in conflict_lemma :
return { " word " : lemma , " type " : " conflict_lemma " }
# 6. SUBSTRING
for word in self . manager . get_banwords_cached ( BanWordType . SUBSTRING ) :
word_norm = self . _normalize_universal ( word . lower ( ) )
if word_norm in text_loose :
logger . info ( f " ✅ SUBSTRING: ' { word } ' → ' { text_loose } ' " , log_type = " BANWORDS " )
return { " word " : word , " type " : " substring " }
# 7. PART (строгая нормализация)
for part in self . manager . get_banwords_cached ( BanWordType . PART ) :
part_norm = self . _normalize_universal ( part . lower ( ) , " strict " )
if part_norm in text_universal :
logger . info ( f " ✅ PART: ' { part } ' → ' { text_universal } ' " , log_type = " BANWORDS " )
return { " word " : part , " type " : " part " }
# 8. LEMMA
for word_text in extract_words ( text_processed ) :
lemma = get_lemma ( self . _normalize_repeated_chars ( word_text ) )
if lemma in self . manager . get_banwords_cached ( BanWordType . LEMMA ) :
logger . info ( f " ✅ LEMMA: ' { lemma } ' из ' { word_text } ' " , log_type = " BANWORDS " )
return { " word " : lemma , " type " : " lemma " }
return None
async def _handle_spam ( self , message : Message , spam_result : Dict [ str , str ] ) - > None :
""" Обрабатывает спам-сообщение: удаляет, логирует, уведомляет (кроме silence) """
user = message . from_user
matched_word = spam_result [ " word " ]
match_type = spam_result [ " type " ]
message_text = message . text or message . caption or " [нет текста] "
# ✅ ПРОВЕРКА: Н Е отправляем уведомления в режиме тишины
# В режиме тишины удаляем молча
if match_type == " silence " :
# Удаляем сообщение молча
try :
await message . delete ( )
logger . info ( f " 🔇 SILENCE: @ { user . username or user . id } удалено молча " ,
log_type = " BANWORDS " )
logger . info ( f " 🔇 SILENCE: @ { user . username or user . id } удалено молча " , log_type = " BANWORDS " )
except TelegramBadRequest as e :
logger . error ( f " ❌ Н е удалено (silence): { e } " , log_type = " BANWORDS " )
return
# Удаляем
# Удаляем сообщение
try :
await message . delete ( )
logger . info ( f " 🚫 @ { user . username or user . id } : ' { matched_word } ' ( { match_type } ) " ,
log_type = " BANWORDS " )
logger . info ( f " 🚫 @ { user . username or user . id } : ' { matched_word } ' ( { match_type } ) " , log_type = " BANWORDS " )
except TelegramBadRequest as e :
logger . error ( f " ❌ Н е удалено: { e } " , log_type = " BANWORDS " )
return
# Логируем в БД (только Н Е silence)
# Логируем в БД
await self . manager . log_spam (
user_id = user . id ,
username = user . username or f " id { user . id } " ,
@@ -187,17 +275,17 @@ class BanWordsMiddleware(BaseMiddleware):
match_type = match_type
)
# Уведомляем админов (только Н Е silence)
# Уведомляем админов
await self . _notify_admins ( message , matched_word , match_type , message_text )
# Остальные методы без изменений...
async def _notify_admins (
self ,
message : Message ,
matched_word : str ,
match_type : str ,
message_text : str
) - > None :
) - > None :
""" Отправляет уведомление о б удалении в админ-чат (берёт ID из БД) """
user = message . from_user
username = f " @ { user . username } " if user . username else f " ID: { user . id } "
spam_count = await self . manager . get_user_spam_count ( user . id )
@@ -205,17 +293,17 @@ class BanWordsMiddleware(BaseMiddleware):
source_thread_id = message . message_thread_id
notification_text = (
f " 🚫 <b>Удалено сообщение</b> \\ n \\ n "
f " 👤 <b>Пользователь:</b> { username } \\ n "
f " 🆔 <b>ID:</b> <code> { user . id } </code> \\ n "
f " 📊 <b>Нарушений:</b> { spam_count } \\ n \\ n "
f " 💬 <b>Чат:</b> { self . _escape_html ( chat_title ) } \\ n "
f " 🆔 <b>Chat ID:</b> <code> { message . chat . id } </code> \\ n "
f " { ' 📌 <b>Topic ID:</b> <code> { source_thread_id} </code> \\ n ' if source_thread_id else ' ' } "
f " 🔗 <b>Message ID:</b> <code> { message . message_id } </code> \\ n \\ n "
f " 🔍 <b>Триггер:</b> <code> { self . _escape_html ( matched_word ) } </code> \\ n "
f " 📝 <b>Тип:</b> { self . _get_type_emoji ( match_type ) } { self . _escape_html ( match_type ) } \\ n \\ n "
f " 💬 <b>Текст:</b> \\ n <code>{ self . _escape_html ( message_text [ : 500 ] ) } </code> "
f " 🚫 <b>Удалено сообщение</b> \n \n "
f " 👤 <b>Пользователь:</b> { username } \n "
f " 🆔 <b>ID:</b> <code> { user . id } </code> \n "
f " 📊 <b>Нарушений:</b> { spam_count } \n \n "
f " 💬 <b>Чат:</b> { self . _escape_html ( chat_title ) } \n "
f " 🆔 <b>Chat ID:</b> <code> { message . chat . id } </code> \n "
f " { ' 📌 <b>Topic ID:</b> <code> ' + str ( source_thread_id) + ' </code> \n ' if source_thread_id else ' ' } "
f " 🔗 <b>Message ID:</b> <code> { message . message_id } </code> \n \n "
f " 🔍 <b>Триггер:</b> <code> { self . _escape_html ( matched_word ) } </code> \n "
f " 📝 <b>Тип:</b> { self . _get_type_emoji ( match_type ) } { self . _escape_html ( match_type ) } \n \n "
f " 💬 <b>Текст:</b> \n <code> { self . _escape_html ( message_text [ : 500 ] ) } </code> "
)
keyboard = InlineKeyboardMarkup ( inline_keyboard = [
@@ -227,15 +315,17 @@ class BanWordsMiddleware(BaseMiddleware):
] )
try :
admin_chat_id = getattr ( settings , " ADMIN_CHAT_ID " , None )
admin_thread _id = getattr ( settings , " ADMIN_THREAD_ID " , None ) or None
# ✅ Получаем настройки из БД (динамические, установленные через /settings )
admin_chat _id = await self . manager . get_bot_ setting( " admin_chat_id " )
admin_thread_id = await self . manager . get_bot_setting ( " admin_thread_id " )
if admin_chat_id :
await message . bot . send_message (
chat_id = admin_chat_id ,
chat_id = int ( admin_chat_id ) ,
text = notification_text ,
reply_markup = keyboard ,
parse_mode = " HTML " ,
message_thread_id = admin_thread_id
message_thread_id = int ( admin_thread_id ) if admin_thread_id else None
)
except Exception as e :
logger . error ( f " ❌ Уведомление админам: { e } " , log_type = " BANWORDS " )