"""
Умный middleware для защиты от спама с адаптивными лимитами
ВЕРСИЯ 2.0: мгновенная блокировка при явном флуде
"""
from time import time
from typing import Callable, Awaitable, Any, Dict, Optional
from dataclasses import dataclass, field
from datetime import datetime
from collections import Counter
from aiogram import BaseMiddleware
from aiogram.types import TelegramObject, Message, CallbackQuery
from middleware.loggers import logger
from configs import settings
__all__ = ('AntiSpamMiddleware', 'spam_stats')
@dataclass
class MessageContext:
"""Контекст сообщения для умной детекции"""
text: Optional[str] = None
is_forward: bool = False
is_reply: bool = False
is_command: bool = False
media_type: Optional[str] = None
callback_data: Optional[str] = None
timestamp: float = field(default_factory=time)
@dataclass
class UserSpamStats:
"""
Расширенная статистика спама для пользователя.
"""
user_id: int
request_times: list[float] = field(default_factory=list)
message_contexts: list[MessageContext] = field(default_factory=list)
warnings: int = 0
blocked_until: Optional[float] = None
total_requests: int = 0
total_blocks: int = 0
first_seen: Optional[float] = None
last_seen: Optional[float] = None
reputation: float = 1.0 # Репутация пользователя (0.5 - 2.0)
def is_blocked(self, current_time: float) -> bool:
"""Проверяет, заблокирован ли пользователь"""
if self.blocked_until is None:
return False
if current_time < self.blocked_until:
return True
# Разблокировка
self.blocked_until = None
self.warnings = max(0, self.warnings - 1) # Снижаем предупреждения
return False
def get_remaining_block_time(self, current_time: float) -> float:
"""Возвращает оставшееся время блокировки"""
if self.blocked_until is None or current_time >= self.blocked_until:
return 0.0
return self.blocked_until - current_time
def clean_old_requests(self, current_time: float, time_window: float) -> None:
"""Удаляет старые запросы за пределами временного окна"""
cutoff_time = current_time - time_window
new_times = []
new_contexts = []
for req_time, context in zip(self.request_times, self.message_contexts):
if req_time > cutoff_time:
new_times.append(req_time)
new_contexts.append(context)
self.request_times = new_times
self.message_contexts = new_contexts
def add_request(self, current_time: float, context: MessageContext) -> None:
"""Добавляет новый запрос с контекстом"""
context.timestamp = current_time
self.request_times.append(current_time)
self.message_contexts.append(context)
self.total_requests += 1
self.last_seen = current_time
if self.first_seen is None:
self.first_seen = current_time
def add_warning(self) -> None:
"""Добавляет предупреждение и снижает репутацию"""
self.warnings += 1
self.reputation = max(0.5, self.reputation - 0.1)
def improve_reputation(self) -> None:
"""Улучшает репутацию за хорошее поведение"""
self.reputation = min(2.0, self.reputation + 0.05)
def block(self, current_time: float, duration: float) -> None:
"""Блокирует пользователя"""
self.blocked_until = current_time + duration
self.total_blocks += 1
self.reputation = max(0.5, self.reputation - 0.3)
def detect_spam_patterns(self, time_window: float = 10.0) -> Dict[str, Any]:
"""
Умная детекция спама на основе паттернов.
УЛУЧШЕНО: учитывает скорость отправки сообщений.
Returns:
Dict с результатами анализа
"""
if len(self.message_contexts) < 3:
return {'is_spam': False, 'reason': None, 'severity': 0.0}
recent_contexts = self.message_contexts[-15:] # Последние 15 сообщений
current_time = time()
# 1. КРИТИЧНО: Экстремально быстрая отправка (флуд-бот)
very_recent = [ctx for ctx in recent_contexts if (current_time - ctx.timestamp) < 2.0]
if len(very_recent) >= 5:
return {
'is_spam': True,
'reason': 'extreme_flood',
'severity': 1.0,
'details': f"⚡ Экстремальный флуд: {len(very_recent)} сообщений за 2 секунды",
'instant_block': True,
'block_duration': 600.0
}
# 2. КРИТИЧНО: 8+ сообщений за 5 секунд => агрессивный флуд
recent_5s = [ctx for ctx in recent_contexts if (current_time - ctx.timestamp) < 5.0]
if len(recent_5s) >= 8:
return {
'is_spam': True,
'reason': 'aggressive_flood',
'severity': 0.95,
'details': f"🔥 Агрессивный флуд: {len(recent_5s)} сообщений за 5 секунд",
'instant_block': True,
'block_duration': 300.0
}
# 3. Медиа-флуд
media_contexts = [ctx for ctx in recent_contexts if ctx.media_type]
if len(media_contexts) >= 7:
media_recent = [ctx for ctx in media_contexts if (current_time - ctx.timestamp) < 5.0]
if len(media_recent) >= 6:
return {
'is_spam': True,
'reason': 'media_flood_fast',
'severity': 0.9,
'details': f"📸 Медиа-флуд: {len(media_recent)} файлов за 5 секунд",
'instant_block': True,
'block_duration': 240.0
}
return {
'is_spam': True,
'reason': 'media_flood',
'severity': 0.7,
'details': f"📸 Медиа-флуд: {len(media_contexts)} файлов подряд"
}
# 4. Проверка идентичных текстовых сообщений
texts = [ctx.text for ctx in recent_contexts if ctx.text and not ctx.is_command]
if texts:
text_counts = Counter(texts)
most_common_text, count = text_counts.most_common(1)[0]
if count >= 5:
return {
'is_spam': True,
'reason': 'identical_messages',
'severity': 0.85,
'details': f"📋 Повтор: '{most_common_text[:40]}...' ({count}x)",
'instant_block': True,
'block_duration': 180.0
}
# 5. Проверка спама callback кнопок
callbacks = [ctx.callback_data for ctx in recent_contexts if ctx.callback_data]
if callbacks:
callback_counts = Counter(callbacks)
most_common_callback, count = callback_counts.most_common(1)[0]
if count >= 10:
return {
'is_spam': True,
'reason': 'callback_spam',
'severity': 0.8,
'details': f"🔘 Спам кнопки: {count} нажатий",
'instant_block': True,
'block_duration': 120.0
}
return {'is_spam': False, 'reason': None, 'severity': 0.0}
class SpamStatistics:
"""Глобальная статистика по спаму"""
def __init__(self):
self.users: Dict[int, UserSpamStats] = {}
self.total_blocked_requests: int = 0
self.total_warnings_issued: int = 0
self.instant_blocks: int = 0
def get_user(self, user_id: int) -> UserSpamStats:
"""Получает или создает статистику пользователя"""
if user_id not in self.users:
self.users[user_id] = UserSpamStats(user_id=user_id)
return self.users[user_id]
def get_top_spammers(self, limit: int = 10) -> list[tuple[int, int]]:
"""Возвращает топ спамеров"""
sorted_users = sorted(
self.users.items(),
key=lambda x: x[1].total_blocks,
reverse=True
)
return [(uid, stats.total_blocks) for uid, stats in sorted_users[:limit]]
def get_stats_summary(self) -> Dict[str, Any]:
"""Возвращает общую статистику"""
return {
'total_users': len(self.users),
'total_blocked_requests': self.total_blocked_requests,
'total_warnings': self.total_warnings_issued,
'instant_blocks': self.instant_blocks,
'active_blocks': sum(
1 for stats in self.users.values()
if stats.blocked_until and stats.blocked_until > time()
)
}
def cleanup(self, max_age: float = 86400.0) -> int:
"""Удаляет старую статистику (24 часа по умолчанию)"""
current_time = time()
cutoff_time = current_time - max_age
users_to_delete = [
uid for uid, stats in self.users.items()
if stats.last_seen and stats.last_seen < cutoff_time
and not stats.is_blocked(current_time)
]
for uid in users_to_delete:
del self.users[uid]
return len(users_to_delete)
# Глобальная статистика
spam_stats = SpamStatistics()
class AntiSpamMiddleware(BaseMiddleware):
"""
Умный антиспам с мгновенной блокировкой при флуде.
Особенности v2:
- Мгновенная блокировка при экстремальном флуде (5+ сообщений за 2с)
- Детекция скорости отправки сообщений
- Адаптивная длительность блокировки
- Различает типы активности
- Бот никогда не банит сам себя
"""
def __init__(
self,
rate_limit_text: int = 8,
rate_limit_forward: int = 20,
rate_limit_callback: int = 12,
rate_limit_media: int = 10,
time_window: float = 10.0,
warning_limit: int = 3,
base_block_duration: float = 120.0,
max_block_duration: float = 3600.0,
whitelist_admins: bool = True,
progressive_blocking: bool = True,
enable_smart_detection: bool = True,
enable_reputation: bool = True,
log_all: bool = False
):
super().__init__()
self.rate_limit_text = rate_limit_text
self.rate_limit_forward = rate_limit_forward
self.rate_limit_callback = rate_limit_callback
self.rate_limit_media = rate_limit_media
self.time_window = time_window
self.warning_limit = warning_limit
self.base_block_duration = base_block_duration
self.max_block_duration = max_block_duration
self.whitelist_admins = whitelist_admins
self.progressive_blocking = progressive_blocking
self.enable_smart_detection = enable_smart_detection
self.enable_reputation = enable_reputation
self.log_all = log_all
def _extract_context(self, event: TelegramObject) -> MessageContext:
"""Извлекает контекст из события"""
context = MessageContext()
if isinstance(event, Message):
context.text = event.text or event.caption
context.is_forward = event.forward_date is not None
context.is_reply = event.reply_to_message is not None
context.is_command = bool(context.text and context.text.startswith('/'))
if event.photo:
context.media_type = 'photo'
elif event.video:
context.media_type = 'video'
elif event.document:
context.media_type = 'document'
elif event.audio:
context.media_type = 'audio'
elif event.voice:
context.media_type = 'voice'
elif event.sticker:
context.media_type = 'sticker'
elif isinstance(event, CallbackQuery):
context.callback_data = event.data
return context
def _get_effective_rate_limit(self, user_stats: UserSpamStats, context: MessageContext) -> int:
"""Вычисляет эффективный лимит с учётом типа и репутации"""
if context.is_command:
return 999 # Команды не ограничиваем
elif context.callback_data:
base_limit = self.rate_limit_callback
elif context.is_forward:
base_limit = self.rate_limit_forward
elif context.media_type:
base_limit = self.rate_limit_media
else:
base_limit = self.rate_limit_text
if self.enable_reputation:
base_limit = int(base_limit * user_stats.reputation)
return max(3, base_limit)
def _calculate_block_duration(self, warnings: int) -> float:
"""Вычисляет длительность блокировки за накопленные варнинги"""
if not self.progressive_blocking:
return self.base_block_duration
multiplier = 2 ** (warnings // self.warning_limit)
duration = self.base_block_duration * multiplier
return min(duration, self.max_block_duration)
@staticmethod
def _format_duration(seconds: float) -> str:
"""Форматирует длительность"""
if seconds < 60:
return f"{int(seconds)} сек"
elif seconds < 3600:
return f"{int(seconds / 60)} мин"
else:
return f"{int(seconds / 3600)} час"
async def __call__(
self,
handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]],
event: TelegramObject,
data: Dict[str, Any]
) -> Optional[Any]:
"""Основная логика проверки"""
# Пропускаем не-сообщения и не-callback
if not isinstance(event, (Message, CallbackQuery)):
return await handler(event, data)
user_id = event.from_user.id if event.from_user else None
if user_id is None:
return await handler(event, data)
# ✅ ИСПРАВЛЕНИЕ: пропускаем самого бота (предотвращает самобан)
bot = data.get("bot")
if bot and user_id == bot.id:
return await handler(event, data)
user_str = f"@{event.from_user.username}" if event.from_user.username else f"id{user_id}"
# Whitelist для администраторов
if self.whitelist_admins and user_id in (settings.OWNER_ID + settings.ADMIN_ID):
if self.log_all:
logger.debug(f"Администратор {user_str} пропущен", log_type='ANTI_SPAM')
return await handler(event, data)
current_time = time()
user_stats = spam_stats.get_user(user_id)
# Проверка существующей блокировки
if user_stats.is_blocked(current_time):
remaining = user_stats.get_remaining_block_time(current_time)
spam_stats.total_blocked_requests += 1
logger.warning(
f"Запрос от заблокированного пользователя (осталось {self._format_duration(remaining)})",
log_type='ANTI_SPAM',
user=user_str
)
# Только для callback — отвечаем алертом, для сообщений молчим
if isinstance(event, CallbackQuery):
await event.answer(
f"🚫 Блокировка: {self._format_duration(remaining)}",
show_alert=True
)
return None
# Извлекаем контекст сообщения
context = self._extract_context(event)
# Добавляем запрос СНАЧАЛА — важно для детекции скорости флуда
user_stats.add_request(current_time, context)
# Очищаем старые запросы за пределами временного окна
user_stats.clean_old_requests(current_time, self.time_window)
# ========== КРИТИЧНО: МГНОВЕННАЯ ДЕТЕКЦИЯ ФЛУДА ==========
if self.enable_smart_detection:
spam_analysis = user_stats.detect_spam_patterns(self.time_window)
if spam_analysis.get('is_spam') and spam_analysis.get('instant_block'):
block_duration = spam_analysis.get('block_duration', 300.0)
user_stats.block(current_time, block_duration)
user_stats.warnings = self.warning_limit
spam_stats.instant_blocks += 1
logger.error(
f"🚨 МГНОВЕННАЯ БЛОКИРОВКА! Причина: {spam_analysis['reason']}\n"
f" └─ {spam_analysis['details']}\n"
f" └─ Длительность: {self._format_duration(block_duration)}",
log_type='ANTI_SPAM',
user=user_str
)
block_message = (
f"🚫 БЛОКИРОВКА ЗА ФЛУД!\n\n"
f"⚠️ {spam_analysis['details']}\n\n"
f"⏳ Длительность: {self._format_duration(block_duration)}\n"
f"💡 Не отправляйте сообщения слишком быстро!"
)
if isinstance(event, Message):
try:
await event.answer(block_message, parse_mode="HTML")
except Exception:
pass
elif isinstance(event, CallbackQuery):
await event.answer(
f"🚫 Блокировка: {self._format_duration(block_duration)}",
show_alert=True
)
return None
# ========== ОБЫЧНАЯ ПРОВЕРКА ЛИМИТОВ ==========
effective_limit = self._get_effective_rate_limit(user_stats, context)
relevant_requests = 0
for req_context in user_stats.message_contexts:
if context.is_forward and req_context.is_forward:
relevant_requests += 1
elif context.callback_data and req_context.callback_data:
relevant_requests += 1
elif context.media_type and req_context.media_type:
relevant_requests += 1
elif not (req_context.is_forward or req_context.callback_data or req_context.media_type or req_context.is_command):
relevant_requests += 1
if self.log_all:
logger.debug(
f"Rate: {relevant_requests}/{effective_limit} | rep: {user_stats.reputation:.2f}",
log_type='ANTI_SPAM',
user=user_str
)
if relevant_requests >= effective_limit:
user_stats.add_warning()
spam_stats.total_warnings_issued += 1
logger.warning(
f"Превышен лимит ({relevant_requests}/{effective_limit}). "
f"Предупреждение {user_stats.warnings}/{self.warning_limit}",
log_type='ANTI_SPAM',
user=user_str
)
if user_stats.warnings >= self.warning_limit:
block_duration = self._calculate_block_duration(user_stats.warnings)
user_stats.block(current_time, block_duration)
logger.error(
f"Пользователь заблокирован на {self._format_duration(block_duration)} (варнинги). "
f"Всего блокировок: {user_stats.total_blocks}",
log_type='ANTI_SPAM',
user=user_str
)
block_message = (
f"🚫 Вы заблокированы!\n\n"
f"⏳ Длительность: {self._format_duration(block_duration)}\n"
f"⚠️ Причина: Превышение лимита запросов"
)
if isinstance(event, Message):
try:
await event.answer(block_message, parse_mode="HTML")
except Exception:
pass
elif isinstance(event, CallbackQuery):
await event.answer(
f"🚫 Блокировка: {self._format_duration(block_duration)}",
show_alert=True
)
return None
if isinstance(event, Message):
warning_message = (
f"⚠️ Предупреждение {user_stats.warnings}/{self.warning_limit}\n\n"
f"Вы отправляете сообщения слишком часто!"
)
try:
await event.answer(warning_message, parse_mode="HTML")
except Exception:
pass
return None
# Улучшаем репутацию за нормальное поведение
if self.enable_reputation and user_stats.total_requests % 10 == 0:
user_stats.improve_reputation()
return await handler(event, data)
# ================= УПРАВЛЕНИЕ =================
async def reset_spam_warnings(user_id: int) -> bool:
"""Сбрасывает предупреждения пользователя"""
if user_id in spam_stats.users:
spam_stats.users[user_id].warnings = 0
spam_stats.users[user_id].blocked_until = None
logger.info(f"Предупреждения сброшены для id{user_id}", log_type='ANTI_SPAM')
return True
return False
async def unblock_user(user_id: int) -> bool:
"""Разблокирует пользователя"""
if user_id in spam_stats.users:
stats = spam_stats.users[user_id]
if stats.blocked_until:
stats.blocked_until = None
stats.warnings = 0
logger.info(f"Пользователь id{user_id} разблокирован вручную", log_type='ANTI_SPAM')
return True
return False
async def get_user_spam_info(user_id: int) -> Optional[Dict[str, Any]]:
"""Получает информацию о спам-статистике пользователя"""
if user_id not in spam_stats.users:
return None
stats = spam_stats.users[user_id]
current_time = time()
return {
'user_id': user_id,
'warnings': stats.warnings,
'reputation': stats.reputation,
'is_blocked': stats.is_blocked(current_time),
'blocked_until': datetime.fromtimestamp(stats.blocked_until) if stats.blocked_until else None,
'remaining_block_time': stats.get_remaining_block_time(current_time),
'total_requests': stats.total_requests,
'total_blocks': stats.total_blocks,
'first_seen': datetime.fromtimestamp(stats.first_seen) if stats.first_seen else None,
'last_seen': datetime.fromtimestamp(stats.last_seen) if stats.last_seen else None
}