From fe1bcb35097a55f7dce6d75f68592cbb8a8cee13 Mon Sep 17 00:00:00 2001 From: Verum Date: Mon, 23 Feb 2026 14:22:36 +0700 Subject: [PATCH] =?UTF-8?q?=D0=9C=D0=B8=D0=B4=D0=BB=D0=B2=D0=B5=D0=B5?= =?UTF-8?q?=D1=80=20=D0=BD=D0=B0=20=D1=81=D0=BF=D0=B0=D0=BC=20(=D0=9F?= =?UTF-8?q?=D0=9E=D0=A4=D0=98=D0=9A=D0=A1=D0=98=D0=A2=D0=AC=20=D0=9D=D0=90?= =?UTF-8?q?=20=D0=A1=D0=9F=D0=90=D0=9C=20=D0=9E=D0=A2=20=D0=91=D0=9E=D0=A2?= =?UTF-8?q?=D0=90=20=D0=A1=D0=A0=D0=9E=D0=A7=D0=9D=D0=9E!)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bot/middlewares/spam_mdw.py | 595 ++++++++++++++++++++++++++++++++++++ 1 file changed, 595 insertions(+) create mode 100644 bot/middlewares/spam_mdw.py diff --git a/bot/middlewares/spam_mdw.py b/bot/middlewares/spam_mdw.py new file mode 100644 index 0000000..b7eb79c --- /dev/null +++ b/bot/middlewares/spam_mdw.py @@ -0,0 +1,595 @@ +""" +Умный middleware для защиты от спама с адаптивными лимитами +ВЕРСИЯ 2.0: мгновенная блокировка при явном флуде +""" +from time import time +from typing import Callable, Awaitable, Any, Dict, Optional +from dataclasses import dataclass, field +from datetime import datetime +from collections import Counter + +from aiogram import BaseMiddleware +from aiogram.types import TelegramObject, Message, CallbackQuery + +from middleware.loggers import logger +from configs import settings + +__all__ = ('AntiSpamMiddleware', 'spam_stats') + + +@dataclass +class MessageContext: + """Контекст сообщения для умной детекции""" + text: Optional[str] = None + is_forward: bool = False + is_reply: bool = False + is_command: bool = False + media_type: Optional[str] = None + callback_data: Optional[str] = None + timestamp: float = field(default_factory=time) + + +@dataclass +class UserSpamStats: + """ + Расширенная статистика спама для пользователя. + """ + user_id: int + request_times: list[float] = field(default_factory=list) + message_contexts: list[MessageContext] = field(default_factory=list) + warnings: int = 0 + blocked_until: Optional[float] = None + total_requests: int = 0 + total_blocks: int = 0 + first_seen: Optional[float] = None + last_seen: Optional[float] = None + reputation: float = 1.0 # Репутация пользователя (0.5 - 2.0) + + def is_blocked(self, current_time: float) -> bool: + """Проверяет, заблокирован ли пользователь""" + if self.blocked_until is None: + return False + + if current_time < self.blocked_until: + return True + + # Разблокировка + self.blocked_until = None + self.warnings = max(0, self.warnings - 1) # Снижаем предупреждения + return False + + def get_remaining_block_time(self, current_time: float) -> float: + """Возвращает оставшееся время блокировки""" + if self.blocked_until is None or current_time >= self.blocked_until: + return 0.0 + return self.blocked_until - current_time + + def clean_old_requests(self, current_time: float, time_window: float) -> None: + """Удаляет старые запросы за пределами временного окна""" + cutoff_time = current_time - time_window + + new_times = [] + new_contexts = [] + + for req_time, context in zip(self.request_times, self.message_contexts): + if req_time > cutoff_time: + new_times.append(req_time) + new_contexts.append(context) + + self.request_times = new_times + self.message_contexts = new_contexts + + def add_request(self, current_time: float, context: MessageContext) -> None: + """Добавляет новый запрос с контекстом""" + context.timestamp = current_time + self.request_times.append(current_time) + self.message_contexts.append(context) + self.total_requests += 1 + self.last_seen = current_time + + if self.first_seen is None: + self.first_seen = current_time + + def add_warning(self) -> None: + """Добавляет предупреждение и снижает репутацию""" + self.warnings += 1 + self.reputation = max(0.5, self.reputation - 0.1) + + def improve_reputation(self) -> None: + """Улучшает репутацию за хорошее поведение""" + self.reputation = min(2.0, self.reputation + 0.05) + + def block(self, current_time: float, duration: float) -> None: + """Блокирует пользователя""" + self.blocked_until = current_time + duration + self.total_blocks += 1 + self.reputation = max(0.5, self.reputation - 0.3) + + def detect_spam_patterns(self, time_window: float = 10.0) -> Dict[str, Any]: + """ + Умная детекция спама на основе паттернов. + УЛУЧШЕНО: учитывает скорость отправки сообщений. + + Returns: + Dict с результатами анализа + """ + if len(self.message_contexts) < 3: + return {'is_spam': False, 'reason': None, 'severity': 0.0} + + recent_contexts = self.message_contexts[-15:] # Последние 15 сообщений + current_time = time() + + # 1. КРИТИЧНО: Экстремально быстрая отправка (флуд-бот) + very_recent = [ctx for ctx in recent_contexts if (current_time - ctx.timestamp) < 2.0] + if len(very_recent) >= 5: + return { + 'is_spam': True, + 'reason': 'extreme_flood', + 'severity': 1.0, + 'details': f"⚡ Экстремальный флуд: {len(very_recent)} сообщений за 2 секунды", + 'instant_block': True, + 'block_duration': 600.0 + } + + # 2. КРИТИЧНО: 8+ сообщений за 5 секунд => агрессивный флуд + recent_5s = [ctx for ctx in recent_contexts if (current_time - ctx.timestamp) < 5.0] + if len(recent_5s) >= 8: + return { + 'is_spam': True, + 'reason': 'aggressive_flood', + 'severity': 0.95, + 'details': f"🔥 Агрессивный флуд: {len(recent_5s)} сообщений за 5 секунд", + 'instant_block': True, + 'block_duration': 300.0 + } + + # 3. Медиа-флуд + media_contexts = [ctx for ctx in recent_contexts if ctx.media_type] + if len(media_contexts) >= 7: + media_recent = [ctx for ctx in media_contexts if (current_time - ctx.timestamp) < 5.0] + if len(media_recent) >= 6: + return { + 'is_spam': True, + 'reason': 'media_flood_fast', + 'severity': 0.9, + 'details': f"📸 Медиа-флуд: {len(media_recent)} файлов за 5 секунд", + 'instant_block': True, + 'block_duration': 240.0 + } + + return { + 'is_spam': True, + 'reason': 'media_flood', + 'severity': 0.7, + 'details': f"📸 Медиа-флуд: {len(media_contexts)} файлов подряд" + } + + # 4. Проверка идентичных текстовых сообщений + texts = [ctx.text for ctx in recent_contexts if ctx.text and not ctx.is_command] + if texts: + text_counts = Counter(texts) + most_common_text, count = text_counts.most_common(1)[0] + + if count >= 5: + return { + 'is_spam': True, + 'reason': 'identical_messages', + 'severity': 0.85, + 'details': f"📋 Повтор: '{most_common_text[:40]}...' ({count}x)", + 'instant_block': True, + 'block_duration': 180.0 + } + + # 5. Проверка спама callback кнопок + callbacks = [ctx.callback_data for ctx in recent_contexts if ctx.callback_data] + if callbacks: + callback_counts = Counter(callbacks) + most_common_callback, count = callback_counts.most_common(1)[0] + + if count >= 10: + return { + 'is_spam': True, + 'reason': 'callback_spam', + 'severity': 0.8, + 'details': f"🔘 Спам кнопки: {count} нажатий", + 'instant_block': True, + 'block_duration': 120.0 + } + + return {'is_spam': False, 'reason': None, 'severity': 0.0} + + +class SpamStatistics: + """Глобальная статистика по спаму""" + + def __init__(self): + self.users: Dict[int, UserSpamStats] = {} + self.total_blocked_requests: int = 0 + self.total_warnings_issued: int = 0 + self.instant_blocks: int = 0 + + def get_user(self, user_id: int) -> UserSpamStats: + """Получает или создает статистику пользователя""" + if user_id not in self.users: + self.users[user_id] = UserSpamStats(user_id=user_id) + return self.users[user_id] + + def get_top_spammers(self, limit: int = 10) -> list[tuple[int, int]]: + """Возвращает топ спамеров""" + sorted_users = sorted( + self.users.items(), + key=lambda x: x[1].total_blocks, + reverse=True + ) + return [(uid, stats.total_blocks) for uid, stats in sorted_users[:limit]] + + def get_stats_summary(self) -> Dict[str, Any]: + """Возвращает общую статистику""" + return { + 'total_users': len(self.users), + 'total_blocked_requests': self.total_blocked_requests, + 'total_warnings': self.total_warnings_issued, + 'instant_blocks': self.instant_blocks, + 'active_blocks': sum( + 1 for stats in self.users.values() + if stats.blocked_until and stats.blocked_until > time() + ) + } + + def cleanup(self, max_age: float = 86400.0) -> int: + """Удаляет старую статистику (24 часа по умолчанию)""" + current_time = time() + cutoff_time = current_time - max_age + + users_to_delete = [ + uid for uid, stats in self.users.items() + if stats.last_seen and stats.last_seen < cutoff_time + and not stats.is_blocked(current_time) + ] + + for uid in users_to_delete: + del self.users[uid] + + return len(users_to_delete) + + +# Глобальная статистика +spam_stats = SpamStatistics() + + +class AntiSpamMiddleware(BaseMiddleware): + """ + Умный антиспам с мгновенной блокировкой при флуде. + + Особенности v2: + - Мгновенная блокировка при экстремальном флуде (5+ сообщений за 2с) + - Детекция скорости отправки сообщений + - Адаптивная длительность блокировки + - Различает типы активности + - Бот никогда не банит сам себя + """ + + def __init__( + self, + rate_limit_text: int = 8, + rate_limit_forward: int = 20, + rate_limit_callback: int = 12, + rate_limit_media: int = 10, + + time_window: float = 10.0, + + warning_limit: int = 3, + base_block_duration: float = 120.0, + max_block_duration: float = 3600.0, + + whitelist_admins: bool = True, + progressive_blocking: bool = True, + enable_smart_detection: bool = True, + enable_reputation: bool = True, + log_all: bool = False + ): + super().__init__() + self.rate_limit_text = rate_limit_text + self.rate_limit_forward = rate_limit_forward + self.rate_limit_callback = rate_limit_callback + self.rate_limit_media = rate_limit_media + self.time_window = time_window + self.warning_limit = warning_limit + self.base_block_duration = base_block_duration + self.max_block_duration = max_block_duration + self.whitelist_admins = whitelist_admins + self.progressive_blocking = progressive_blocking + self.enable_smart_detection = enable_smart_detection + self.enable_reputation = enable_reputation + self.log_all = log_all + + def _extract_context(self, event: TelegramObject) -> MessageContext: + """Извлекает контекст из события""" + context = MessageContext() + + if isinstance(event, Message): + context.text = event.text or event.caption + context.is_forward = event.forward_date is not None + context.is_reply = event.reply_to_message is not None + context.is_command = bool(context.text and context.text.startswith('/')) + + if event.photo: + context.media_type = 'photo' + elif event.video: + context.media_type = 'video' + elif event.document: + context.media_type = 'document' + elif event.audio: + context.media_type = 'audio' + elif event.voice: + context.media_type = 'voice' + elif event.sticker: + context.media_type = 'sticker' + + elif isinstance(event, CallbackQuery): + context.callback_data = event.data + + return context + + def _get_effective_rate_limit(self, user_stats: UserSpamStats, context: MessageContext) -> int: + """Вычисляет эффективный лимит с учётом типа и репутации""" + if context.is_command: + return 999 # Команды не ограничиваем + elif context.callback_data: + base_limit = self.rate_limit_callback + elif context.is_forward: + base_limit = self.rate_limit_forward + elif context.media_type: + base_limit = self.rate_limit_media + else: + base_limit = self.rate_limit_text + + if self.enable_reputation: + base_limit = int(base_limit * user_stats.reputation) + + return max(3, base_limit) + + def _calculate_block_duration(self, warnings: int) -> float: + """Вычисляет длительность блокировки за накопленные варнинги""" + if not self.progressive_blocking: + return self.base_block_duration + + multiplier = 2 ** (warnings // self.warning_limit) + duration = self.base_block_duration * multiplier + + return min(duration, self.max_block_duration) + + @staticmethod + def _format_duration(seconds: float) -> str: + """Форматирует длительность""" + if seconds < 60: + return f"{int(seconds)} сек" + elif seconds < 3600: + return f"{int(seconds / 60)} мин" + else: + return f"{int(seconds / 3600)} час" + + async def __call__( + self, + handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]], + event: TelegramObject, + data: Dict[str, Any] + ) -> Optional[Any]: + """Основная логика проверки""" + + # Пропускаем не-сообщения и не-callback + if not isinstance(event, (Message, CallbackQuery)): + return await handler(event, data) + + user_id = event.from_user.id if event.from_user else None + if user_id is None: + return await handler(event, data) + + # ✅ ИСПРАВЛЕНИЕ: пропускаем самого бота (предотвращает самобан) + bot = data.get("bot") + if bot and user_id == bot.id: + return await handler(event, data) + + user_str = f"@{event.from_user.username}" if event.from_user.username else f"id{user_id}" + + # Whitelist для администраторов + if self.whitelist_admins and user_id in (settings.OWNER_ID + settings.ADMIN_ID): + if self.log_all: + logger.debug(f"Администратор {user_str} пропущен", log_type='ANTI_SPAM') + return await handler(event, data) + + current_time = time() + user_stats = spam_stats.get_user(user_id) + + # Проверка существующей блокировки + if user_stats.is_blocked(current_time): + remaining = user_stats.get_remaining_block_time(current_time) + spam_stats.total_blocked_requests += 1 + + logger.warning( + f"Запрос от заблокированного пользователя (осталось {self._format_duration(remaining)})", + log_type='ANTI_SPAM', + user=user_str + ) + + # Только для callback — отвечаем алертом, для сообщений молчим + if isinstance(event, CallbackQuery): + await event.answer( + f"🚫 Блокировка: {self._format_duration(remaining)}", + show_alert=True + ) + + return None + + # Извлекаем контекст сообщения + context = self._extract_context(event) + + # Добавляем запрос СНАЧАЛА — важно для детекции скорости флуда + user_stats.add_request(current_time, context) + + # Очищаем старые запросы за пределами временного окна + user_stats.clean_old_requests(current_time, self.time_window) + + # ========== КРИТИЧНО: МГНОВЕННАЯ ДЕТЕКЦИЯ ФЛУДА ========== + if self.enable_smart_detection: + spam_analysis = user_stats.detect_spam_patterns(self.time_window) + + if spam_analysis.get('is_spam') and spam_analysis.get('instant_block'): + block_duration = spam_analysis.get('block_duration', 300.0) + user_stats.block(current_time, block_duration) + user_stats.warnings = self.warning_limit + spam_stats.instant_blocks += 1 + + logger.error( + f"🚨 МГНОВЕННАЯ БЛОКИРОВКА! Причина: {spam_analysis['reason']}\n" + f" └─ {spam_analysis['details']}\n" + f" └─ Длительность: {self._format_duration(block_duration)}", + log_type='ANTI_SPAM', + user=user_str + ) + + block_message = ( + f"🚫 БЛОКИРОВКА ЗА ФЛУД!\n\n" + f"⚠️ {spam_analysis['details']}\n\n" + f"⏳ Длительность: {self._format_duration(block_duration)}\n" + f"💡 Не отправляйте сообщения слишком быстро!" + ) + + if isinstance(event, Message): + try: + await event.answer(block_message, parse_mode="HTML") + except Exception: + pass + elif isinstance(event, CallbackQuery): + await event.answer( + f"🚫 Блокировка: {self._format_duration(block_duration)}", + show_alert=True + ) + + return None + + # ========== ОБЫЧНАЯ ПРОВЕРКА ЛИМИТОВ ========== + effective_limit = self._get_effective_rate_limit(user_stats, context) + + relevant_requests = 0 + for req_context in user_stats.message_contexts: + if context.is_forward and req_context.is_forward: + relevant_requests += 1 + elif context.callback_data and req_context.callback_data: + relevant_requests += 1 + elif context.media_type and req_context.media_type: + relevant_requests += 1 + elif not (req_context.is_forward or req_context.callback_data or req_context.media_type or req_context.is_command): + relevant_requests += 1 + + if self.log_all: + logger.debug( + f"Rate: {relevant_requests}/{effective_limit} | rep: {user_stats.reputation:.2f}", + log_type='ANTI_SPAM', + user=user_str + ) + + if relevant_requests >= effective_limit: + user_stats.add_warning() + spam_stats.total_warnings_issued += 1 + + logger.warning( + f"Превышен лимит ({relevant_requests}/{effective_limit}). " + f"Предупреждение {user_stats.warnings}/{self.warning_limit}", + log_type='ANTI_SPAM', + user=user_str + ) + + if user_stats.warnings >= self.warning_limit: + block_duration = self._calculate_block_duration(user_stats.warnings) + user_stats.block(current_time, block_duration) + + logger.error( + f"Пользователь заблокирован на {self._format_duration(block_duration)} (варнинги). " + f"Всего блокировок: {user_stats.total_blocks}", + log_type='ANTI_SPAM', + user=user_str + ) + + block_message = ( + f"🚫 Вы заблокированы!\n\n" + f"⏳ Длительность: {self._format_duration(block_duration)}\n" + f"⚠️ Причина: Превышение лимита запросов" + ) + + if isinstance(event, Message): + try: + await event.answer(block_message, parse_mode="HTML") + except Exception: + pass + elif isinstance(event, CallbackQuery): + await event.answer( + f"🚫 Блокировка: {self._format_duration(block_duration)}", + show_alert=True + ) + + return None + + if isinstance(event, Message): + warning_message = ( + f"⚠️ Предупреждение {user_stats.warnings}/{self.warning_limit}\n\n" + f"Вы отправляете сообщения слишком часто!" + ) + try: + await event.answer(warning_message, parse_mode="HTML") + except Exception: + pass + + return None + + # Улучшаем репутацию за нормальное поведение + if self.enable_reputation and user_stats.total_requests % 10 == 0: + user_stats.improve_reputation() + + return await handler(event, data) + + +# ================= УПРАВЛЕНИЕ ================= + +async def reset_spam_warnings(user_id: int) -> bool: + """Сбрасывает предупреждения пользователя""" + if user_id in spam_stats.users: + spam_stats.users[user_id].warnings = 0 + spam_stats.users[user_id].blocked_until = None + logger.info(f"Предупреждения сброшены для id{user_id}", log_type='ANTI_SPAM') + return True + return False + + +async def unblock_user(user_id: int) -> bool: + """Разблокирует пользователя""" + if user_id in spam_stats.users: + stats = spam_stats.users[user_id] + if stats.blocked_until: + stats.blocked_until = None + stats.warnings = 0 + logger.info(f"Пользователь id{user_id} разблокирован вручную", log_type='ANTI_SPAM') + return True + return False + + +async def get_user_spam_info(user_id: int) -> Optional[Dict[str, Any]]: + """Получает информацию о спам-статистике пользователя""" + if user_id not in spam_stats.users: + return None + + stats = spam_stats.users[user_id] + current_time = time() + + return { + 'user_id': user_id, + 'warnings': stats.warnings, + 'reputation': stats.reputation, + 'is_blocked': stats.is_blocked(current_time), + 'blocked_until': datetime.fromtimestamp(stats.blocked_until) if stats.blocked_until else None, + 'remaining_block_time': stats.get_remaining_block_time(current_time), + 'total_requests': stats.total_requests, + 'total_blocks': stats.total_blocks, + 'first_seen': datetime.fromtimestamp(stats.first_seen) if stats.first_seen else None, + 'last_seen': datetime.fromtimestamp(stats.last_seen) if stats.last_seen else None + }