Добавление работы с конфликтными частями и исправление вайтлиста
This commit is contained in:
@@ -47,8 +47,8 @@ def setup_middlewares(
|
||||
bot: Bot,
|
||||
admin_ids: list[int] = settings.ADMIN_ID+settings.OWNER_ID,
|
||||
channel_ids: list[int | str] | None = None,
|
||||
enable_spam_check: bool = False,
|
||||
enable_subscription_check: bool = False,
|
||||
enable_spam_check: bool = settings.enable_spam_check,
|
||||
enable_subscription_check: bool = settings.enable_subscription_check,
|
||||
) -> dict:
|
||||
"""
|
||||
Регистрирует все middleware в диспетчере.
|
||||
@@ -138,5 +138,3 @@ def setup_middlewares(
|
||||
)
|
||||
|
||||
return instances
|
||||
|
||||
|
||||
|
||||
@@ -1,13 +1,5 @@
|
||||
"""
|
||||
Middleware для проверки сообщений на запрещённые слова (банворды).
|
||||
|
||||
✅ ИСПРАВЛЕНО:
|
||||
- Полная нормализация текста с использованием UNICODE_MAP
|
||||
- Удаление повторов символов (леееейн → лейн)
|
||||
- Игнорирование разделителей (л.е.й.н → лейн)
|
||||
- Поддержка всех типов проверок (SUBSTRING, LEMMA, PART, CONFLICT)
|
||||
- Белый список и режимы тишины/конфликта
|
||||
- Нет уведомлений в режиме тишины
|
||||
"""
|
||||
|
||||
from typing import Callable, Dict, Any, Awaitable, Optional
|
||||
@@ -15,7 +7,7 @@ import re
|
||||
import unicodedata
|
||||
|
||||
from aiogram import BaseMiddleware
|
||||
from aiogram.types import Message, InlineKeyboardMarkup, InlineKeyboardButton
|
||||
from aiogram.types import Message
|
||||
from aiogram.exceptions import TelegramBadRequest
|
||||
|
||||
from configs import settings, UNICODE_MAP, LATIN_TO_CYRILLIC, CYRILLIC_NORMALIZE
|
||||
@@ -23,97 +15,86 @@ from database import get_manager, BanWordType
|
||||
from bot.special import extract_words, get_lemma
|
||||
from middleware.loggers import logger
|
||||
|
||||
__all__ = ("BanWordsMiddleware",)
|
||||
|
||||
__all__ = ("BanWordsMiddleware",)
|
||||
URL_PATTERN = re.compile(
|
||||
r'(https?://\S+|www\.\S+)',
|
||||
re.IGNORECASE
|
||||
)
|
||||
|
||||
class TextNormalizer:
|
||||
"""
|
||||
Класс для многоступенчатой нормализации текста.
|
||||
Приводит различные юникод-символы к базовым буквам,
|
||||
удаляет повторы, убирает разделители.
|
||||
"""
|
||||
|
||||
# Объединяем все словари замен в один
|
||||
FULL_MAP = {}
|
||||
FULL_MAP: Dict[str, str] = {}
|
||||
FULL_MAP.update(LATIN_TO_CYRILLIC)
|
||||
FULL_MAP.update(CYRILLIC_NORMALIZE)
|
||||
FULL_MAP.update(UNICODE_MAP)
|
||||
|
||||
# Символы-разделители, которые могут быть вставлены между буквами
|
||||
SEPARATORS = re.compile(r'[\s.\-_,;:|]+', re.UNICODE)
|
||||
|
||||
# Паттерн для поиска повторяющихся букв (3+ раза)
|
||||
REPEAT_PATTERN = re.compile(r'([а-яёa-z])\1{2,}', re.IGNORECASE)
|
||||
|
||||
@classmethod
|
||||
def normalize_characters(cls, text: str) -> str:
|
||||
"""
|
||||
Заменяет все символы из FULL_MAP на их базовые эквиваленты.
|
||||
Проходит по строке посимвольно для максимальной замены.
|
||||
"""
|
||||
result = []
|
||||
result: list[str] = []
|
||||
for ch in text:
|
||||
# Сначала пробуем заменить по карте
|
||||
if ch in cls.FULL_MAP:
|
||||
result.append(cls.FULL_MAP[ch])
|
||||
else:
|
||||
result.append(ch)
|
||||
# Приводим к нижнему регистру после замен (чтобы избежать потери регистра в карте)
|
||||
result.append(cls.FULL_MAP.get(ch, ch))
|
||||
return ''.join(result).lower()
|
||||
|
||||
@classmethod
|
||||
def remove_separators(cls, text: str) -> str:
|
||||
"""Удаляет разделители между буквами (пробелы, точки и т.д.)"""
|
||||
return cls.SEPARATORS.sub('', text)
|
||||
|
||||
@classmethod
|
||||
def collapse_repeats(cls, text: str, max_repeat: int = 2) -> str:
|
||||
def repl(m):
|
||||
ch = m.group(1)
|
||||
return ch # вместо ch * 2 — теперь схлопываем до одного символа
|
||||
|
||||
def collapse_repeats(cls, text: str) -> str:
|
||||
def repl(match: re.Match[str]) -> str:
|
||||
return match.group(1)
|
||||
return cls.REPEAT_PATTERN.sub(repl, text)
|
||||
|
||||
@classmethod
|
||||
def normalize_full(cls, text: str, remove_sep: bool = True, collapse: bool = True) -> str:
|
||||
"""
|
||||
Полная нормализация:
|
||||
1. Unicode нормализация (NFKC) для разложения составных символов
|
||||
2. Замена по карте
|
||||
3. Приведение к нижнему регистру
|
||||
4. Удаление разделителей (опционально)
|
||||
5. Схлопывание повторов (опционально)
|
||||
"""
|
||||
# NFKC разлагает символы типа "ё" в "е" + умляут, но нам лучше оставить как есть,
|
||||
# т.к. у нас есть прямые замены. Однако для совместимости применим.
|
||||
def normalize_full(
|
||||
cls,
|
||||
text: str,
|
||||
remove_sep: bool = True,
|
||||
collapse: bool = True
|
||||
) -> str:
|
||||
text = unicodedata.normalize('NFKC', text)
|
||||
# Замена символов
|
||||
text = cls.normalize_characters(text)
|
||||
# Удаление разделителей
|
||||
|
||||
if remove_sep:
|
||||
text = cls.remove_separators(text)
|
||||
# Схлопывание повторов
|
||||
|
||||
if collapse:
|
||||
text = cls.collapse_repeats(text)
|
||||
|
||||
return text
|
||||
|
||||
@classmethod
|
||||
def normalize_for_part(cls, text: str) -> str:
|
||||
def normalize_for_part_token(cls, text: str) -> str:
|
||||
"""
|
||||
Нормализация для типа PART:
|
||||
- Полная нормализация
|
||||
- Удаление всех не-буквенных символов (кроме пробелов)
|
||||
- Приведение к нижнему регистру
|
||||
Нормализация для PART:
|
||||
- NFKC
|
||||
- lower()
|
||||
- удаление zero-width
|
||||
- схлопывание повторов латиницы
|
||||
- БЕЗ LATIN_TO_CYRILLIC
|
||||
"""
|
||||
text = cls.normalize_full(text, remove_sep=False, collapse=True)
|
||||
# Оставляем только буквы и пробелы
|
||||
text = re.sub(r'[^а-яёa-z\s]', '', text, flags=re.IGNORECASE)
|
||||
text = re.sub(r'\s+', ' ', text).strip()
|
||||
return text.lower()
|
||||
text = unicodedata.normalize('NFKC', text)
|
||||
text = text.lower()
|
||||
|
||||
# удаляем zero-width
|
||||
text = re.sub(r'[\u200B-\u200D\uFEFF]', '', text)
|
||||
|
||||
# схлопываем повторы букв (3+ → 1)
|
||||
text = re.sub(r'([a-z])\1+', r'\1', text)
|
||||
|
||||
return text
|
||||
|
||||
|
||||
class BanWordsMiddleware(BaseMiddleware):
|
||||
def __init__(self):
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self.manager = get_manager()
|
||||
self.normalizer = TextNormalizer()
|
||||
@@ -124,219 +105,178 @@ class BanWordsMiddleware(BaseMiddleware):
|
||||
event: Message,
|
||||
data: Dict[str, Any]
|
||||
) -> Any:
|
||||
# Проверяем наличие текста или подписи
|
||||
|
||||
if not event.text and not event.caption:
|
||||
return await handler(event, data)
|
||||
|
||||
message_text = event.text or event.caption
|
||||
message_text: str = event.text or event.caption
|
||||
|
||||
# Игнорируем команды
|
||||
if message_text.startswith('/'):
|
||||
return await handler(event, data)
|
||||
|
||||
# Проверка на админа
|
||||
user_id = event.from_user.id
|
||||
is_super_admin = user_id in settings.OWNER_ID
|
||||
is_admin = is_super_admin or self.manager.is_admin_cached(user_id)
|
||||
user_id: int = event.from_user.id
|
||||
is_super_admin: bool = user_id in settings.OWNER_ID
|
||||
is_admin: bool = is_super_admin or self.manager.is_admin_cached(user_id)
|
||||
|
||||
if is_admin:
|
||||
return await handler(event, data)
|
||||
|
||||
# Проверяем сообщение на спам
|
||||
spam_result = await self._check_message(message_text)
|
||||
|
||||
if spam_result:
|
||||
await self._handle_spam(event, spam_result)
|
||||
return None # Сообщение удалено, дальше не обрабатываем
|
||||
await self._handle_spam(event)
|
||||
return None
|
||||
|
||||
return await handler(event, data)
|
||||
|
||||
async def _check_message(self, text: str) -> Optional[Dict[str, str]]:
|
||||
"""
|
||||
Многоступенчатая проверка текста.
|
||||
Возвращает словарь с причиной блокировки или None.
|
||||
"""
|
||||
# 1. Повторяющиеся символы (например, "леееейн") — блокируем сразу
|
||||
# repeat_result = self._check_repeated_chars(text)
|
||||
# if repeat_result:
|
||||
# return repeat_result
|
||||
@staticmethod
|
||||
def is_allowed_url(url: str, allowed: str) -> bool:
|
||||
url_lower = url.lower()
|
||||
allowed_lower = allowed.lower()
|
||||
if allowed_lower.endswith('/'):
|
||||
# исключение со слешем: только строгое начало с этим слешем
|
||||
return url_lower.startswith(allowed_lower)
|
||||
else:
|
||||
# исключение без слеша: разрешаем точное совпадение или начало с добавлением слеша
|
||||
return url_lower == allowed_lower or url_lower.startswith(allowed_lower + '/')
|
||||
|
||||
# 2. Получаем кэшированные списки
|
||||
substring_words = self.manager.get_banwords_cached(BanWordType.SUBSTRING)
|
||||
async def _check_message(self, text: str) -> Optional[Dict[str, str]]:
|
||||
whitelist = {
|
||||
w.lower().strip()
|
||||
for w in self.manager.get_whitelist_cached()
|
||||
}
|
||||
|
||||
# ================= URL CHECK =================
|
||||
urls = URL_PATTERN.findall(text)
|
||||
|
||||
for url in urls:
|
||||
url_lower = url.lower()
|
||||
|
||||
# если URL начинается с разрешённого исключения — пропускаем
|
||||
if any(self.is_allowed_url(url_lower, allowed) for allowed in whitelist):
|
||||
continue
|
||||
|
||||
# если нет разрешения — проверяем WORD-правила для URL
|
||||
for word in self.manager.get_banwords_cached(BanWordType.WORD):
|
||||
if word in url_lower:
|
||||
return {"word": word, "type": "word"}
|
||||
# =============================================
|
||||
|
||||
# 2. Убираем URL из текста для word/lemma проверки
|
||||
text_without_urls = URL_PATTERN.sub(' ', text)
|
||||
|
||||
word_words = self.manager.get_banwords_cached(BanWordType.WORD)
|
||||
lemma_words = self.manager.get_banwords_cached(BanWordType.LEMMA)
|
||||
part_words = self.manager.get_banwords_cached(BanWordType.PART)
|
||||
conflict_substring = self.manager.get_banwords_cached(BanWordType.CONFLICT_SUBSTRING)
|
||||
conflict_word = self.manager.get_banwords_cached(BanWordType.CONFLICT_WORD)
|
||||
conflict_lemma = self.manager.get_banwords_cached(BanWordType.CONFLICT_LEMMA)
|
||||
conflict_part = self.manager.get_banwords_cached(BanWordType.CONFLICT_PART)
|
||||
|
||||
# 3. Белый список
|
||||
if self.manager.is_whitelisted(text):
|
||||
logger.debug(f"⏭️ Пропуск по белому списку: {text[:30]}", log_type="BANWORDS")
|
||||
return None
|
||||
|
||||
# 4. Режим тишины
|
||||
if await self.manager.is_silence_active():
|
||||
return {"word": "[режим тишины]", "type": "silence"}
|
||||
|
||||
# 5. Режим конфликта (более мягкие правила)
|
||||
if await self.manager.is_conflict_active():
|
||||
# Проверка conflict_substring (с нормализацией)
|
||||
normalized_text = self.normalizer.normalize_full(text, remove_sep=True, collapse=True)
|
||||
for word in conflict_substring:
|
||||
norm_word = self.normalizer.normalize_full(word, remove_sep=True, collapse=True)
|
||||
if norm_word in normalized_text:
|
||||
return {"word": word, "type": "conflict_substring"}
|
||||
normalized_text = self.normalizer.normalize_full(text)
|
||||
|
||||
for word in conflict_word:
|
||||
if self.normalizer.normalize_full(word) in normalized_text:
|
||||
return {"word": word, "type": "conflict_word"}
|
||||
|
||||
# conflict_lemma
|
||||
for word_text in extract_words(text):
|
||||
lemma = get_lemma(word_text)
|
||||
if lemma in conflict_lemma:
|
||||
return {"word": lemma, "type": "conflict_lemma"}
|
||||
if get_lemma(word_text) in conflict_lemma:
|
||||
return {"word": word_text, "type": "conflict_lemma"}
|
||||
|
||||
# Если в конфликтном режиме ничего не найдено — пропускаем
|
||||
return None
|
||||
|
||||
# 6. Обычный режим: проверка substring (с удалением разделителей и схлопыванием повторов)
|
||||
normalized_text = self.normalizer.normalize_full(text, remove_sep=True, collapse=True)
|
||||
for word in substring_words:
|
||||
norm_word = self.normalizer.normalize_full(word, remove_sep=True, collapse=True)
|
||||
if norm_word in normalized_text:
|
||||
logger.info(f"✅ SUBSTRING: '{word}'", log_type="BANWORDS")
|
||||
return {"word": word, "type": "substring"}
|
||||
# WORD — строгое совпадение как отдельное слово
|
||||
for word in word_words:
|
||||
pattern = r'(?<!\w){}(?!\w)'.format(re.escape(word))
|
||||
|
||||
# 7. Проверка part (строгая нормализация, только буквы и пробелы)
|
||||
part_normalized = self.normalizer.normalize_for_part(text)
|
||||
for part in part_words:
|
||||
norm_part = self.normalizer.normalize_for_part(part)
|
||||
if norm_part in part_normalized:
|
||||
logger.info(f"✅ PART: '{part}'", log_type="BANWORDS")
|
||||
return {"word": part, "type": "part"}
|
||||
for match in re.finditer(pattern, text_without_urls, re.IGNORECASE):
|
||||
matched = match.group(0).lower()
|
||||
|
||||
# 8. Проверка lemma
|
||||
for word_text in extract_words(text):
|
||||
# Для леммы тоже применяем нормализацию (удаляем разделители, схлопываем повторы)
|
||||
normalized_word = self.normalizer.normalize_full(word_text, remove_sep=True, collapse=True)
|
||||
# если совпавшее слово в whitelist — игнорируем
|
||||
if matched in whitelist:
|
||||
continue
|
||||
|
||||
# если это начало URL — пропускаем
|
||||
if text[match.end():match.end() + 3] == '://':
|
||||
continue
|
||||
|
||||
return {"word": word, "type": "word"}
|
||||
|
||||
# PART
|
||||
usernames = re.findall(r'@[\w_]+', text_without_urls)
|
||||
latin_tokens = re.findall(r'\b[a-zA-Z0-9_]*[a-zA-Z]+[a-zA-Z0-9_]*\b', text_without_urls)
|
||||
|
||||
tokens_to_check = usernames + latin_tokens
|
||||
|
||||
# PART
|
||||
for token in tokens_to_check:
|
||||
token_lower = token.lower()
|
||||
|
||||
# если именно этот токен разрешён
|
||||
normalized_for_whitelist = token_lower.lstrip('@')
|
||||
|
||||
if (
|
||||
token_lower in whitelist or
|
||||
normalized_for_whitelist in whitelist or
|
||||
f"@{normalized_for_whitelist}" in whitelist
|
||||
):
|
||||
continue
|
||||
|
||||
normalized_token = self.normalizer.normalize_for_part_token(token)
|
||||
|
||||
for part in part_words:
|
||||
norm_part = self.normalizer.normalize_for_part_token(part)
|
||||
|
||||
if norm_part in normalized_token:
|
||||
return {"word": part, "type": "part"}
|
||||
|
||||
# CONFLICT PART
|
||||
for token in tokens_to_check:
|
||||
token_lower = token.lower()
|
||||
|
||||
normalized_for_whitelist = token_lower.lstrip('@')
|
||||
|
||||
if (
|
||||
token_lower in whitelist or
|
||||
normalized_for_whitelist in whitelist or
|
||||
f"@{normalized_for_whitelist}" in whitelist
|
||||
):
|
||||
continue
|
||||
|
||||
normalized_token = self.normalizer.normalize_for_part_token(token)
|
||||
|
||||
for part in conflict_part:
|
||||
norm_part = self.normalizer.normalize_for_part_token(part)
|
||||
|
||||
if norm_part in normalized_token:
|
||||
return {"word": part, "type": "conflict_part"}
|
||||
|
||||
# LEMMA
|
||||
for word_text in extract_words(text_without_urls):
|
||||
word_lower = word_text.lower()
|
||||
|
||||
# если слово разрешено — пропускаем
|
||||
if word_lower in whitelist:
|
||||
continue
|
||||
|
||||
normalized_word = self.normalizer.normalize_full(word_text)
|
||||
lemma = get_lemma(normalized_word)
|
||||
|
||||
if lemma in lemma_words:
|
||||
logger.info(f"✅ LEMMA: '{lemma}' из '{word_text}'", log_type="BANWORDS")
|
||||
return {"word": lemma, "type": "lemma"}
|
||||
|
||||
return None
|
||||
|
||||
def _check_repeated_chars(self, text: str) -> Optional[Dict[str, str]]:
|
||||
"""
|
||||
Проверяет на наличие 3+ повторяющихся букв подряд.
|
||||
Использует сырой текст без нормализации (чтобы поймать "леееейн").
|
||||
"""
|
||||
# Ищем повторения букв (только кириллица/латиница)
|
||||
pattern = re.compile(r'([а-яёa-zA-Z])\1{2,}', re.IGNORECASE)
|
||||
matches = pattern.finditer(text)
|
||||
for match in matches:
|
||||
char = match.group(1)
|
||||
count = len(match.group(0))
|
||||
if count >= 3:
|
||||
logger.info(f"🔥 ПОВТОРЫ: '{match.group(0)}' ({count}x)", log_type="BANWORDS")
|
||||
return {"word": f"'{match.group(0)}' ({count}x)", "type": "repeated_chars"}
|
||||
return None
|
||||
@staticmethod
|
||||
async def _handle_spam(
|
||||
message: Message,
|
||||
) -> None:
|
||||
|
||||
async def _handle_spam(self, message: Message, spam_result: Dict[str, str]) -> None:
|
||||
"""Обрабатывает спам-сообщение: удаляет, логирует, уведомляет (кроме silence)"""
|
||||
user = message.from_user
|
||||
matched_word = spam_result["word"]
|
||||
match_type = spam_result["type"]
|
||||
message_text = message.text or message.caption or "[нет текста]"
|
||||
|
||||
# В режиме тишины удаляем молча
|
||||
if match_type == "silence":
|
||||
try:
|
||||
await message.delete()
|
||||
logger.info(f"🔇 SILENCE: @{user.username or user.id} удалено молча", log_type="BANWORDS")
|
||||
except TelegramBadRequest as e:
|
||||
logger.error(f"❌ Не удалено (silence): {e}", log_type="BANWORDS")
|
||||
return
|
||||
|
||||
# Удаляем сообщение
|
||||
try:
|
||||
await message.delete()
|
||||
logger.info(f"🚫 @{user.username or user.id}: '{matched_word}' ({match_type})", log_type="BANWORDS")
|
||||
except TelegramBadRequest as e:
|
||||
logger.error(f"❌ Не удалено: {e}", log_type="BANWORDS")
|
||||
logger.info(f"Удалено сообщение: {message.text}")
|
||||
except TelegramBadRequest:
|
||||
return
|
||||
|
||||
# Логируем в БД
|
||||
await self.manager.log_spam(
|
||||
user_id=user.id,
|
||||
username=user.username or f"id{user.id}",
|
||||
chat_id=message.chat.id,
|
||||
message_text=message_text,
|
||||
matched_word=matched_word,
|
||||
match_type=match_type
|
||||
)
|
||||
|
||||
# Уведомляем админов
|
||||
await self._notify_admins(message, matched_word, match_type, message_text)
|
||||
|
||||
async def _notify_admins(
|
||||
self,
|
||||
message: Message,
|
||||
matched_word: str,
|
||||
match_type: str,
|
||||
message_text: str
|
||||
) -> None:
|
||||
"""Отправляет уведомление об удалении в админ-чат (берёт ID из БД)"""
|
||||
user = message.from_user
|
||||
username = f"@{user.username}" if user.username else f"ID: {user.id}"
|
||||
spam_count = await self.manager.get_user_spam_count(user.id)
|
||||
chat_title = message.chat.title or "Без названия"
|
||||
source_thread_id = message.message_thread_id
|
||||
|
||||
notification_text = (
|
||||
f"🚫 <b>Удалено сообщение</b>\n\n"
|
||||
f"👤 <b>Пользователь:</b> {username}\n"
|
||||
f"🆔 <b>ID:</b> <code>{user.id}</code>\n"
|
||||
f"📊 <b>Нарушений:</b> {spam_count}\n\n"
|
||||
f"💬 <b>Чат:</b> {self._escape_html(chat_title)}\n"
|
||||
f"🆔 <b>Chat ID:</b> <code>{message.chat.id}</code>\n"
|
||||
f"{'📌 <b>Topic ID:</b> <code>' + str(source_thread_id) + '</code>\n' if source_thread_id else ''}"
|
||||
f"🔗 <b>Message ID:</b> <code>{message.message_id}</code>\n\n"
|
||||
f"🔍 <b>Триггер:</b> <code>{self._escape_html(matched_word)}</code>\n"
|
||||
f"📝 <b>Тип:</b> {self._get_type_emoji(match_type)} {self._escape_html(match_type)}\n\n"
|
||||
f"💬 <b>Текст:</b>\n<code>{self._escape_html(message_text[:500])}</code>"
|
||||
)
|
||||
|
||||
keyboard = InlineKeyboardMarkup(inline_keyboard=[
|
||||
[
|
||||
InlineKeyboardButton(text="🔨 Забанить", callback_data=f"spam_ban:{user.id}:{message.chat.id}"),
|
||||
InlineKeyboardButton(text="✅ Закрыть", callback_data="spam_close")
|
||||
],
|
||||
[InlineKeyboardButton(text="📊 Статистика", callback_data=f"spam_stats:{user.id}")]
|
||||
])
|
||||
|
||||
try:
|
||||
# ✅ Получаем настройки из БД (динамические, установленные через /settings)
|
||||
admin_chat_id = await self.manager.get_bot_setting("admin_chat_id")
|
||||
admin_thread_id = await self.manager.get_bot_setting("admin_thread_id")
|
||||
|
||||
if admin_chat_id:
|
||||
await message.bot.send_message(
|
||||
chat_id=int(admin_chat_id),
|
||||
text=notification_text,
|
||||
reply_markup=keyboard,
|
||||
parse_mode="HTML",
|
||||
message_thread_id=int(admin_thread_id) if admin_thread_id else None
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Уведомление админам: {e}", log_type="BANWORDS")
|
||||
|
||||
@staticmethod
|
||||
def _get_type_emoji(match_type: str) -> str:
|
||||
return {
|
||||
"substring": "🔤",
|
||||
"lemma": "📖",
|
||||
"part": "🧩",
|
||||
"silence": "🔇",
|
||||
"conflict_substring": "⚔️",
|
||||
"conflict_lemma": "⚔️",
|
||||
"repeated_chars": "🔁"
|
||||
}.get(match_type, "❓")
|
||||
|
||||
@staticmethod
|
||||
def _escape_html(text: str) -> str:
|
||||
return str(text).replace("&", "&").replace("<", "<").replace(">", ">")
|
||||
|
||||
@@ -105,7 +105,7 @@ class UserSpamStats:
|
||||
self.total_blocks += 1
|
||||
self.reputation = max(0.5, self.reputation - 0.3)
|
||||
|
||||
def detect_spam_patterns(self, time_window: float = 10.0) -> Dict[str, Any]:
|
||||
def detect_spam_patterns(self, time_window: float = 2.0) -> Dict[str, Any]:
|
||||
"""
|
||||
Умная детекция спама на основе паттернов.
|
||||
УЛУЧШЕНО: учитывает скорость отправки сообщений.
|
||||
@@ -120,7 +120,7 @@ class UserSpamStats:
|
||||
current_time = time()
|
||||
|
||||
# 1. КРИТИЧНО: Экстремально быстрая отправка (флуд-бот)
|
||||
very_recent = [ctx for ctx in recent_contexts if (current_time - ctx.timestamp) < 2.0]
|
||||
very_recent = [ctx for ctx in recent_contexts if (current_time - ctx.timestamp) < time_window]
|
||||
if len(very_recent) >= 5:
|
||||
return {
|
||||
'is_spam': True,
|
||||
@@ -133,7 +133,7 @@ class UserSpamStats:
|
||||
|
||||
# 2. КРИТИЧНО: 8+ сообщений за 5 секунд => агрессивный флуд
|
||||
recent_5s = [ctx for ctx in recent_contexts if (current_time - ctx.timestamp) < 5.0]
|
||||
if len(recent_5s) >= 8:
|
||||
if len(recent_5s) >= 15:
|
||||
return {
|
||||
'is_spam': True,
|
||||
'reason': 'aggressive_flood',
|
||||
@@ -145,7 +145,7 @@ class UserSpamStats:
|
||||
|
||||
# 3. Медиа-флуд
|
||||
media_contexts = [ctx for ctx in recent_contexts if ctx.media_type]
|
||||
if len(media_contexts) >= 7:
|
||||
if len(media_contexts) >= 15:
|
||||
media_recent = [ctx for ctx in media_contexts if (current_time - ctx.timestamp) < 5.0]
|
||||
if len(media_recent) >= 6:
|
||||
return {
|
||||
@@ -303,7 +303,8 @@ class AntiSpamMiddleware(BaseMiddleware):
|
||||
self.enable_reputation = enable_reputation
|
||||
self.log_all = log_all
|
||||
|
||||
def _extract_context(self, event: TelegramObject) -> MessageContext:
|
||||
@staticmethod
|
||||
def _extract_context(event: TelegramObject) -> MessageContext:
|
||||
"""Извлекает контекст из события"""
|
||||
context = MessageContext()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user