diff --git a/bot/middlewares/spam_mdw.py b/bot/middlewares/spam_mdw.py
new file mode 100644
index 0000000..b7eb79c
--- /dev/null
+++ b/bot/middlewares/spam_mdw.py
@@ -0,0 +1,595 @@
+"""
+Умный middleware для защиты от спама с адаптивными лимитами
+ВЕРСИЯ 2.0: мгновенная блокировка при явном флуде
+"""
+from time import time
+from typing import Callable, Awaitable, Any, Dict, Optional
+from dataclasses import dataclass, field
+from datetime import datetime
+from collections import Counter
+
+from aiogram import BaseMiddleware
+from aiogram.types import TelegramObject, Message, CallbackQuery
+
+from middleware.loggers import logger
+from configs import settings
+
+__all__ = ('AntiSpamMiddleware', 'spam_stats')
+
+
+@dataclass
+class MessageContext:
+ """Контекст сообщения для умной детекции"""
+ text: Optional[str] = None
+ is_forward: bool = False
+ is_reply: bool = False
+ is_command: bool = False
+ media_type: Optional[str] = None
+ callback_data: Optional[str] = None
+ timestamp: float = field(default_factory=time)
+
+
+@dataclass
+class UserSpamStats:
+ """
+ Расширенная статистика спама для пользователя.
+ """
+ user_id: int
+ request_times: list[float] = field(default_factory=list)
+ message_contexts: list[MessageContext] = field(default_factory=list)
+ warnings: int = 0
+ blocked_until: Optional[float] = None
+ total_requests: int = 0
+ total_blocks: int = 0
+ first_seen: Optional[float] = None
+ last_seen: Optional[float] = None
+ reputation: float = 1.0 # Репутация пользователя (0.5 - 2.0)
+
+ def is_blocked(self, current_time: float) -> bool:
+ """Проверяет, заблокирован ли пользователь"""
+ if self.blocked_until is None:
+ return False
+
+ if current_time < self.blocked_until:
+ return True
+
+ # Разблокировка
+ self.blocked_until = None
+ self.warnings = max(0, self.warnings - 1) # Снижаем предупреждения
+ return False
+
+ def get_remaining_block_time(self, current_time: float) -> float:
+ """Возвращает оставшееся время блокировки"""
+ if self.blocked_until is None or current_time >= self.blocked_until:
+ return 0.0
+ return self.blocked_until - current_time
+
+ def clean_old_requests(self, current_time: float, time_window: float) -> None:
+ """Удаляет старые запросы за пределами временного окна"""
+ cutoff_time = current_time - time_window
+
+ new_times = []
+ new_contexts = []
+
+ for req_time, context in zip(self.request_times, self.message_contexts):
+ if req_time > cutoff_time:
+ new_times.append(req_time)
+ new_contexts.append(context)
+
+ self.request_times = new_times
+ self.message_contexts = new_contexts
+
+ def add_request(self, current_time: float, context: MessageContext) -> None:
+ """Добавляет новый запрос с контекстом"""
+ context.timestamp = current_time
+ self.request_times.append(current_time)
+ self.message_contexts.append(context)
+ self.total_requests += 1
+ self.last_seen = current_time
+
+ if self.first_seen is None:
+ self.first_seen = current_time
+
+ def add_warning(self) -> None:
+ """Добавляет предупреждение и снижает репутацию"""
+ self.warnings += 1
+ self.reputation = max(0.5, self.reputation - 0.1)
+
+ def improve_reputation(self) -> None:
+ """Улучшает репутацию за хорошее поведение"""
+ self.reputation = min(2.0, self.reputation + 0.05)
+
+ def block(self, current_time: float, duration: float) -> None:
+ """Блокирует пользователя"""
+ self.blocked_until = current_time + duration
+ self.total_blocks += 1
+ self.reputation = max(0.5, self.reputation - 0.3)
+
+ def detect_spam_patterns(self, time_window: float = 10.0) -> Dict[str, Any]:
+ """
+ Умная детекция спама на основе паттернов.
+ УЛУЧШЕНО: учитывает скорость отправки сообщений.
+
+ Returns:
+ Dict с результатами анализа
+ """
+ if len(self.message_contexts) < 3:
+ return {'is_spam': False, 'reason': None, 'severity': 0.0}
+
+ recent_contexts = self.message_contexts[-15:] # Последние 15 сообщений
+ current_time = time()
+
+ # 1. КРИТИЧНО: Экстремально быстрая отправка (флуд-бот)
+ very_recent = [ctx for ctx in recent_contexts if (current_time - ctx.timestamp) < 2.0]
+ if len(very_recent) >= 5:
+ return {
+ 'is_spam': True,
+ 'reason': 'extreme_flood',
+ 'severity': 1.0,
+ 'details': f"⚡ Экстремальный флуд: {len(very_recent)} сообщений за 2 секунды",
+ 'instant_block': True,
+ 'block_duration': 600.0
+ }
+
+ # 2. КРИТИЧНО: 8+ сообщений за 5 секунд => агрессивный флуд
+ recent_5s = [ctx for ctx in recent_contexts if (current_time - ctx.timestamp) < 5.0]
+ if len(recent_5s) >= 8:
+ return {
+ 'is_spam': True,
+ 'reason': 'aggressive_flood',
+ 'severity': 0.95,
+ 'details': f"🔥 Агрессивный флуд: {len(recent_5s)} сообщений за 5 секунд",
+ 'instant_block': True,
+ 'block_duration': 300.0
+ }
+
+ # 3. Медиа-флуд
+ media_contexts = [ctx for ctx in recent_contexts if ctx.media_type]
+ if len(media_contexts) >= 7:
+ media_recent = [ctx for ctx in media_contexts if (current_time - ctx.timestamp) < 5.0]
+ if len(media_recent) >= 6:
+ return {
+ 'is_spam': True,
+ 'reason': 'media_flood_fast',
+ 'severity': 0.9,
+ 'details': f"📸 Медиа-флуд: {len(media_recent)} файлов за 5 секунд",
+ 'instant_block': True,
+ 'block_duration': 240.0
+ }
+
+ return {
+ 'is_spam': True,
+ 'reason': 'media_flood',
+ 'severity': 0.7,
+ 'details': f"📸 Медиа-флуд: {len(media_contexts)} файлов подряд"
+ }
+
+ # 4. Проверка идентичных текстовых сообщений
+ texts = [ctx.text for ctx in recent_contexts if ctx.text and not ctx.is_command]
+ if texts:
+ text_counts = Counter(texts)
+ most_common_text, count = text_counts.most_common(1)[0]
+
+ if count >= 5:
+ return {
+ 'is_spam': True,
+ 'reason': 'identical_messages',
+ 'severity': 0.85,
+ 'details': f"📋 Повтор: '{most_common_text[:40]}...' ({count}x)",
+ 'instant_block': True,
+ 'block_duration': 180.0
+ }
+
+ # 5. Проверка спама callback кнопок
+ callbacks = [ctx.callback_data for ctx in recent_contexts if ctx.callback_data]
+ if callbacks:
+ callback_counts = Counter(callbacks)
+ most_common_callback, count = callback_counts.most_common(1)[0]
+
+ if count >= 10:
+ return {
+ 'is_spam': True,
+ 'reason': 'callback_spam',
+ 'severity': 0.8,
+ 'details': f"🔘 Спам кнопки: {count} нажатий",
+ 'instant_block': True,
+ 'block_duration': 120.0
+ }
+
+ return {'is_spam': False, 'reason': None, 'severity': 0.0}
+
+
+class SpamStatistics:
+ """Глобальная статистика по спаму"""
+
+ def __init__(self):
+ self.users: Dict[int, UserSpamStats] = {}
+ self.total_blocked_requests: int = 0
+ self.total_warnings_issued: int = 0
+ self.instant_blocks: int = 0
+
+ def get_user(self, user_id: int) -> UserSpamStats:
+ """Получает или создает статистику пользователя"""
+ if user_id not in self.users:
+ self.users[user_id] = UserSpamStats(user_id=user_id)
+ return self.users[user_id]
+
+ def get_top_spammers(self, limit: int = 10) -> list[tuple[int, int]]:
+ """Возвращает топ спамеров"""
+ sorted_users = sorted(
+ self.users.items(),
+ key=lambda x: x[1].total_blocks,
+ reverse=True
+ )
+ return [(uid, stats.total_blocks) for uid, stats in sorted_users[:limit]]
+
+ def get_stats_summary(self) -> Dict[str, Any]:
+ """Возвращает общую статистику"""
+ return {
+ 'total_users': len(self.users),
+ 'total_blocked_requests': self.total_blocked_requests,
+ 'total_warnings': self.total_warnings_issued,
+ 'instant_blocks': self.instant_blocks,
+ 'active_blocks': sum(
+ 1 for stats in self.users.values()
+ if stats.blocked_until and stats.blocked_until > time()
+ )
+ }
+
+ def cleanup(self, max_age: float = 86400.0) -> int:
+ """Удаляет старую статистику (24 часа по умолчанию)"""
+ current_time = time()
+ cutoff_time = current_time - max_age
+
+ users_to_delete = [
+ uid for uid, stats in self.users.items()
+ if stats.last_seen and stats.last_seen < cutoff_time
+ and not stats.is_blocked(current_time)
+ ]
+
+ for uid in users_to_delete:
+ del self.users[uid]
+
+ return len(users_to_delete)
+
+
+# Глобальная статистика
+spam_stats = SpamStatistics()
+
+
+class AntiSpamMiddleware(BaseMiddleware):
+ """
+ Умный антиспам с мгновенной блокировкой при флуде.
+
+ Особенности v2:
+ - Мгновенная блокировка при экстремальном флуде (5+ сообщений за 2с)
+ - Детекция скорости отправки сообщений
+ - Адаптивная длительность блокировки
+ - Различает типы активности
+ - Бот никогда не банит сам себя
+ """
+
+ def __init__(
+ self,
+ rate_limit_text: int = 8,
+ rate_limit_forward: int = 20,
+ rate_limit_callback: int = 12,
+ rate_limit_media: int = 10,
+
+ time_window: float = 10.0,
+
+ warning_limit: int = 3,
+ base_block_duration: float = 120.0,
+ max_block_duration: float = 3600.0,
+
+ whitelist_admins: bool = True,
+ progressive_blocking: bool = True,
+ enable_smart_detection: bool = True,
+ enable_reputation: bool = True,
+ log_all: bool = False
+ ):
+ super().__init__()
+ self.rate_limit_text = rate_limit_text
+ self.rate_limit_forward = rate_limit_forward
+ self.rate_limit_callback = rate_limit_callback
+ self.rate_limit_media = rate_limit_media
+ self.time_window = time_window
+ self.warning_limit = warning_limit
+ self.base_block_duration = base_block_duration
+ self.max_block_duration = max_block_duration
+ self.whitelist_admins = whitelist_admins
+ self.progressive_blocking = progressive_blocking
+ self.enable_smart_detection = enable_smart_detection
+ self.enable_reputation = enable_reputation
+ self.log_all = log_all
+
+ def _extract_context(self, event: TelegramObject) -> MessageContext:
+ """Извлекает контекст из события"""
+ context = MessageContext()
+
+ if isinstance(event, Message):
+ context.text = event.text or event.caption
+ context.is_forward = event.forward_date is not None
+ context.is_reply = event.reply_to_message is not None
+ context.is_command = bool(context.text and context.text.startswith('/'))
+
+ if event.photo:
+ context.media_type = 'photo'
+ elif event.video:
+ context.media_type = 'video'
+ elif event.document:
+ context.media_type = 'document'
+ elif event.audio:
+ context.media_type = 'audio'
+ elif event.voice:
+ context.media_type = 'voice'
+ elif event.sticker:
+ context.media_type = 'sticker'
+
+ elif isinstance(event, CallbackQuery):
+ context.callback_data = event.data
+
+ return context
+
+ def _get_effective_rate_limit(self, user_stats: UserSpamStats, context: MessageContext) -> int:
+ """Вычисляет эффективный лимит с учётом типа и репутации"""
+ if context.is_command:
+ return 999 # Команды не ограничиваем
+ elif context.callback_data:
+ base_limit = self.rate_limit_callback
+ elif context.is_forward:
+ base_limit = self.rate_limit_forward
+ elif context.media_type:
+ base_limit = self.rate_limit_media
+ else:
+ base_limit = self.rate_limit_text
+
+ if self.enable_reputation:
+ base_limit = int(base_limit * user_stats.reputation)
+
+ return max(3, base_limit)
+
+ def _calculate_block_duration(self, warnings: int) -> float:
+ """Вычисляет длительность блокировки за накопленные варнинги"""
+ if not self.progressive_blocking:
+ return self.base_block_duration
+
+ multiplier = 2 ** (warnings // self.warning_limit)
+ duration = self.base_block_duration * multiplier
+
+ return min(duration, self.max_block_duration)
+
+ @staticmethod
+ def _format_duration(seconds: float) -> str:
+ """Форматирует длительность"""
+ if seconds < 60:
+ return f"{int(seconds)} сек"
+ elif seconds < 3600:
+ return f"{int(seconds / 60)} мин"
+ else:
+ return f"{int(seconds / 3600)} час"
+
+ async def __call__(
+ self,
+ handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]],
+ event: TelegramObject,
+ data: Dict[str, Any]
+ ) -> Optional[Any]:
+ """Основная логика проверки"""
+
+ # Пропускаем не-сообщения и не-callback
+ if not isinstance(event, (Message, CallbackQuery)):
+ return await handler(event, data)
+
+ user_id = event.from_user.id if event.from_user else None
+ if user_id is None:
+ return await handler(event, data)
+
+ # ✅ ИСПРАВЛЕНИЕ: пропускаем самого бота (предотвращает самобан)
+ bot = data.get("bot")
+ if bot and user_id == bot.id:
+ return await handler(event, data)
+
+ user_str = f"@{event.from_user.username}" if event.from_user.username else f"id{user_id}"
+
+ # Whitelist для администраторов
+ if self.whitelist_admins and user_id in (settings.OWNER_ID + settings.ADMIN_ID):
+ if self.log_all:
+ logger.debug(f"Администратор {user_str} пропущен", log_type='ANTI_SPAM')
+ return await handler(event, data)
+
+ current_time = time()
+ user_stats = spam_stats.get_user(user_id)
+
+ # Проверка существующей блокировки
+ if user_stats.is_blocked(current_time):
+ remaining = user_stats.get_remaining_block_time(current_time)
+ spam_stats.total_blocked_requests += 1
+
+ logger.warning(
+ f"Запрос от заблокированного пользователя (осталось {self._format_duration(remaining)})",
+ log_type='ANTI_SPAM',
+ user=user_str
+ )
+
+ # Только для callback — отвечаем алертом, для сообщений молчим
+ if isinstance(event, CallbackQuery):
+ await event.answer(
+ f"🚫 Блокировка: {self._format_duration(remaining)}",
+ show_alert=True
+ )
+
+ return None
+
+ # Извлекаем контекст сообщения
+ context = self._extract_context(event)
+
+ # Добавляем запрос СНАЧАЛА — важно для детекции скорости флуда
+ user_stats.add_request(current_time, context)
+
+ # Очищаем старые запросы за пределами временного окна
+ user_stats.clean_old_requests(current_time, self.time_window)
+
+ # ========== КРИТИЧНО: МГНОВЕННАЯ ДЕТЕКЦИЯ ФЛУДА ==========
+ if self.enable_smart_detection:
+ spam_analysis = user_stats.detect_spam_patterns(self.time_window)
+
+ if spam_analysis.get('is_spam') and spam_analysis.get('instant_block'):
+ block_duration = spam_analysis.get('block_duration', 300.0)
+ user_stats.block(current_time, block_duration)
+ user_stats.warnings = self.warning_limit
+ spam_stats.instant_blocks += 1
+
+ logger.error(
+ f"🚨 МГНОВЕННАЯ БЛОКИРОВКА! Причина: {spam_analysis['reason']}\n"
+ f" └─ {spam_analysis['details']}\n"
+ f" └─ Длительность: {self._format_duration(block_duration)}",
+ log_type='ANTI_SPAM',
+ user=user_str
+ )
+
+ block_message = (
+ f"🚫 БЛОКИРОВКА ЗА ФЛУД!\n\n"
+ f"⚠️ {spam_analysis['details']}\n\n"
+ f"⏳ Длительность: {self._format_duration(block_duration)}\n"
+ f"💡 Не отправляйте сообщения слишком быстро!"
+ )
+
+ if isinstance(event, Message):
+ try:
+ await event.answer(block_message, parse_mode="HTML")
+ except Exception:
+ pass
+ elif isinstance(event, CallbackQuery):
+ await event.answer(
+ f"🚫 Блокировка: {self._format_duration(block_duration)}",
+ show_alert=True
+ )
+
+ return None
+
+ # ========== ОБЫЧНАЯ ПРОВЕРКА ЛИМИТОВ ==========
+ effective_limit = self._get_effective_rate_limit(user_stats, context)
+
+ relevant_requests = 0
+ for req_context in user_stats.message_contexts:
+ if context.is_forward and req_context.is_forward:
+ relevant_requests += 1
+ elif context.callback_data and req_context.callback_data:
+ relevant_requests += 1
+ elif context.media_type and req_context.media_type:
+ relevant_requests += 1
+ elif not (req_context.is_forward or req_context.callback_data or req_context.media_type or req_context.is_command):
+ relevant_requests += 1
+
+ if self.log_all:
+ logger.debug(
+ f"Rate: {relevant_requests}/{effective_limit} | rep: {user_stats.reputation:.2f}",
+ log_type='ANTI_SPAM',
+ user=user_str
+ )
+
+ if relevant_requests >= effective_limit:
+ user_stats.add_warning()
+ spam_stats.total_warnings_issued += 1
+
+ logger.warning(
+ f"Превышен лимит ({relevant_requests}/{effective_limit}). "
+ f"Предупреждение {user_stats.warnings}/{self.warning_limit}",
+ log_type='ANTI_SPAM',
+ user=user_str
+ )
+
+ if user_stats.warnings >= self.warning_limit:
+ block_duration = self._calculate_block_duration(user_stats.warnings)
+ user_stats.block(current_time, block_duration)
+
+ logger.error(
+ f"Пользователь заблокирован на {self._format_duration(block_duration)} (варнинги). "
+ f"Всего блокировок: {user_stats.total_blocks}",
+ log_type='ANTI_SPAM',
+ user=user_str
+ )
+
+ block_message = (
+ f"🚫 Вы заблокированы!\n\n"
+ f"⏳ Длительность: {self._format_duration(block_duration)}\n"
+ f"⚠️ Причина: Превышение лимита запросов"
+ )
+
+ if isinstance(event, Message):
+ try:
+ await event.answer(block_message, parse_mode="HTML")
+ except Exception:
+ pass
+ elif isinstance(event, CallbackQuery):
+ await event.answer(
+ f"🚫 Блокировка: {self._format_duration(block_duration)}",
+ show_alert=True
+ )
+
+ return None
+
+ if isinstance(event, Message):
+ warning_message = (
+ f"⚠️ Предупреждение {user_stats.warnings}/{self.warning_limit}\n\n"
+ f"Вы отправляете сообщения слишком часто!"
+ )
+ try:
+ await event.answer(warning_message, parse_mode="HTML")
+ except Exception:
+ pass
+
+ return None
+
+ # Улучшаем репутацию за нормальное поведение
+ if self.enable_reputation and user_stats.total_requests % 10 == 0:
+ user_stats.improve_reputation()
+
+ return await handler(event, data)
+
+
+# ================= УПРАВЛЕНИЕ =================
+
+async def reset_spam_warnings(user_id: int) -> bool:
+ """Сбрасывает предупреждения пользователя"""
+ if user_id in spam_stats.users:
+ spam_stats.users[user_id].warnings = 0
+ spam_stats.users[user_id].blocked_until = None
+ logger.info(f"Предупреждения сброшены для id{user_id}", log_type='ANTI_SPAM')
+ return True
+ return False
+
+
+async def unblock_user(user_id: int) -> bool:
+ """Разблокирует пользователя"""
+ if user_id in spam_stats.users:
+ stats = spam_stats.users[user_id]
+ if stats.blocked_until:
+ stats.blocked_until = None
+ stats.warnings = 0
+ logger.info(f"Пользователь id{user_id} разблокирован вручную", log_type='ANTI_SPAM')
+ return True
+ return False
+
+
+async def get_user_spam_info(user_id: int) -> Optional[Dict[str, Any]]:
+ """Получает информацию о спам-статистике пользователя"""
+ if user_id not in spam_stats.users:
+ return None
+
+ stats = spam_stats.users[user_id]
+ current_time = time()
+
+ return {
+ 'user_id': user_id,
+ 'warnings': stats.warnings,
+ 'reputation': stats.reputation,
+ 'is_blocked': stats.is_blocked(current_time),
+ 'blocked_until': datetime.fromtimestamp(stats.blocked_until) if stats.blocked_until else None,
+ 'remaining_block_time': stats.get_remaining_block_time(current_time),
+ 'total_requests': stats.total_requests,
+ 'total_blocks': stats.total_blocks,
+ 'first_seen': datetime.fromtimestamp(stats.first_seen) if stats.first_seen else None,
+ 'last_seen': datetime.fromtimestamp(stats.last_seen) if stats.last_seen else None
+ }