"""
Умный middleware для защиты от спама с адаптивными лимитами
"""
from time import time
from typing import Callable, Awaitable, Any, Dict, Optional
from dataclasses import dataclass, field
from datetime import datetime
from collections import Counter
from aiogram import BaseMiddleware
from aiogram.types import TelegramObject, Message, CallbackQuery
from middleware.loggers import logger
from configs import settings
__all__ = ('AntiSpamMiddleware', 'spam_stats')
@dataclass
class MessageContext:
"""Контекст сообщения для умной детекции"""
text: Optional[str] = None
is_forward: bool = False
is_reply: bool = False
is_command: bool = False
media_type: Optional[str] = None
callback_data: Optional[str] = None
@dataclass
class UserSpamStats:
"""
Расширенная статистика спама для пользователя.
"""
user_id: int
request_times: list[float] = field(default_factory=list)
message_contexts: list[MessageContext] = field(default_factory=list)
warnings: int = 0
blocked_until: Optional[float] = None
total_requests: int = 0
total_blocks: int = 0
first_seen: Optional[float] = None
last_seen: Optional[float] = None
reputation: float = 1.0 # Репутация пользователя (0.5 - 2.0)
def is_blocked(self, current_time: float) -> bool:
"""Проверяет, заблокирован ли пользователь"""
if self.blocked_until is None:
return False
if current_time < self.blocked_until:
return True
# Разблокировка
self.blocked_until = None
self.warnings = max(0, self.warnings - 1) # Снижаем предупреждения, но не сбрасываем полностью
return False
def get_remaining_block_time(self, current_time: float) -> float:
"""Возвращает оставшееся время блокировки"""
if self.blocked_until is None or current_time >= self.blocked_until:
return 0.0
return self.blocked_until - current_time
def clean_old_requests(self, current_time: float, time_window: float) -> None:
"""Удаляет старые запросы за пределами временного окна"""
cutoff_time = current_time - time_window
# Удаляем старые запросы
new_times = []
new_contexts = []
for req_time, context in zip(self.request_times, self.message_contexts):
if req_time > cutoff_time:
new_times.append(req_time)
new_contexts.append(context)
self.request_times = new_times
self.message_contexts = new_contexts
def add_request(self, current_time: float, context: MessageContext) -> None:
"""Добавляет новый запрос с контекстом"""
self.request_times.append(current_time)
self.message_contexts.append(context)
self.total_requests += 1
self.last_seen = current_time
if self.first_seen is None:
self.first_seen = current_time
def add_warning(self) -> None:
"""Добавляет предупреждение и снижает репутацию"""
self.warnings += 1
self.reputation = max(0.5, self.reputation - 0.1)
def improve_reputation(self) -> None:
"""Улучшает репутацию за хорошее поведение"""
self.reputation = min(2.0, self.reputation + 0.05)
def block(self, current_time: float, duration: float) -> None:
"""Блокирует пользователя"""
self.blocked_until = current_time + duration
self.total_blocks += 1
self.reputation = max(0.5, self.reputation - 0.3)
def detect_spam_patterns(self) -> Dict[str, Any]:
"""
Умная детекция спама на основе паттернов.
Returns:
Dict с результатами анализа
"""
if len(self.message_contexts) < 3:
return {'is_spam': False, 'reason': None, 'severity': 0.0}
recent_contexts = self.message_contexts[-10:] # Последние 10 сообщений
# 1. Проверка идентичных текстовых сообщений
texts = [ctx.text for ctx in recent_contexts if ctx.text and not ctx.is_command]
if texts:
text_counts = Counter(texts)
most_common_text, count = text_counts.most_common(1)[0]
if count >= 5: # 5 одинаковых сообщений подряд
return {
'is_spam': True,
'reason': 'identical_messages',
'severity': 1.0,
'details': f"Повторяющееся сообщение: '{most_common_text[:50]}...'"
}
# 2. Проверка спама callback кнопок
callbacks = [ctx.callback_data for ctx in recent_contexts if ctx.callback_data]
if callbacks:
callback_counts = Counter(callbacks)
most_common_callback, count = callback_counts.most_common(1)[0]
if count >= 8: # 8 нажатий одной кнопки
return {
'is_spam': True,
'reason': 'callback_spam',
'severity': 0.8,
'details': f"Спам кнопки: {most_common_callback}"
}
# 3. Проверка флуда медиа
media_types = [ctx.media_type for ctx in recent_contexts if ctx.media_type]
if len(media_types) >= 7: # 7+ медиафайлов подряд
return {
'is_spam': True,
'reason': 'media_flood',
'severity': 0.6,
'details': f"Флуд медиа: {len(media_types)} файлов"
}
return {'is_spam': False, 'reason': None, 'severity': 0.0}
class SpamStatistics:
"""Глобальная статистика по спаму"""
def __init__(self):
self.users: Dict[int, UserSpamStats] = {}
self.total_blocked_requests: int = 0
self.total_warnings_issued: int = 0
def get_user(self, user_id: int) -> UserSpamStats:
"""Получает или создает статистику пользователя"""
if user_id not in self.users:
self.users[user_id] = UserSpamStats(user_id=user_id)
return self.users[user_id]
def get_top_spammers(self, limit: int = 10) -> list[tuple[int, int]]:
"""Возвращает топ спамеров"""
sorted_users = sorted(
self.users.items(),
key=lambda x: x[1].total_blocks,
reverse=True
)
return [(uid, stats.total_blocks) for uid, stats in sorted_users[:limit]]
def get_stats_summary(self) -> Dict[str, Any]:
"""Возвращает общую статистику"""
return {
'total_users': len(self.users),
'total_blocked_requests': self.total_blocked_requests,
'total_warnings': self.total_warnings_issued,
'active_blocks': sum(
1 for stats in self.users.values()
if stats.blocked_until and stats.blocked_until > time()
)
}
def cleanup(self, max_age: float = 86400.0) -> int:
"""Удаляет старую статистику (24 часа по умолчанию)"""
current_time = time()
cutoff_time = current_time - max_age
users_to_delete = [
uid for uid, stats in self.users.items()
if stats.last_seen and stats.last_seen < cutoff_time
and not stats.is_blocked(current_time)
]
for uid in users_to_delete:
del self.users[uid]
return len(users_to_delete)
# Глобальная статистика
spam_stats = SpamStatistics()
class AntiSpamMiddleware(BaseMiddleware):
"""
Умный антиспам с адаптивными лимитами.
Особенности:
- Различает типы активности (текст, форварды, команды, callback)
- Адаптивные лимиты в зависимости от типа сообщения
- Система репутации пользователей
- Умная детекция спам-паттернов
- Мягкое отношение к пересылкам и ответам
"""
def __init__(
self,
# Базовые лимиты
rate_limit_text: int = 8, # Текстовых сообщений за окно
rate_limit_forward: int = 20, # Пересылок за окно
rate_limit_callback: int = 10, # Нажатий кнопок за окно
rate_limit_media: int = 10, # Медиа за окно
time_window: float = 10.0, # Временное окно (секунды)
# Предупреждения и блокировки
warning_limit: int = 3,
block_duration: float = 120.0, # 2 минуты базовая блокировка
max_block_duration: float = 3600.0, # 1 час максимум
# Опции
whitelist_admins: bool = True,
progressive_blocking: bool = True,
enable_smart_detection: bool = True,
enable_reputation: bool = True,
log_all: bool = False
):
super().__init__()
self.rate_limit_text = rate_limit_text
self.rate_limit_forward = rate_limit_forward
self.rate_limit_callback = rate_limit_callback
self.rate_limit_media = rate_limit_media
self.time_window = time_window
self.warning_limit = warning_limit
self.block_duration = block_duration
self.max_block_duration = max_block_duration
self.whitelist_admins = whitelist_admins
self.progressive_blocking = progressive_blocking
self.enable_smart_detection = enable_smart_detection
self.enable_reputation = enable_reputation
self.log_all = log_all
def _extract_context(self, event: TelegramObject) -> MessageContext:
"""Извлекает контекст из события"""
context = MessageContext()
if isinstance(event, Message):
context.text = event.text or event.caption
context.is_forward = event.forward_date is not None
context.is_reply = event.reply_to_message is not None
context.is_command = bool(context.text and context.text.startswith('/'))
# Определяем тип медиа
if event.photo:
context.media_type = 'photo'
elif event.video:
context.media_type = 'video'
elif event.document:
context.media_type = 'document'
elif event.audio:
context.media_type = 'audio'
elif event.voice:
context.media_type = 'voice'
elif event.sticker:
context.media_type = 'sticker'
elif isinstance(event, CallbackQuery):
context.callback_data = event.data
return context
def _get_effective_rate_limit(self, user_stats: UserSpamStats, context: MessageContext) -> int:
"""Вычисляет эффективный лимит с учётом типа и репутации"""
# Базовый лимит по типу
if context.is_command:
return 999 # Команды не ограничиваем
elif context.callback_data:
base_limit = self.rate_limit_callback
elif context.is_forward:
base_limit = self.rate_limit_forward
elif context.media_type:
base_limit = self.rate_limit_media
else:
base_limit = self.rate_limit_text
# Применяем репутацию
if self.enable_reputation:
base_limit = int(base_limit * user_stats.reputation)
return max(3, base_limit) # Минимум 3 сообщения
def _calculate_block_duration(self, warnings: int) -> float:
"""Вычисляет длительность блокировки"""
if not self.progressive_blocking:
return self.block_duration
multiplier = 2 ** (warnings // self.warning_limit)
duration = self.block_duration * multiplier
return min(duration, self.max_block_duration)
@staticmethod
def _format_duration(seconds: float) -> str:
"""Форматирует длительность"""
if seconds < 60:
return f"{int(seconds)} сек"
elif seconds < 3600:
return f"{int(seconds / 60)} мин"
else:
return f"{int(seconds / 3600)} час"
async def __call__(
self,
handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]],
event: TelegramObject,
data: Dict[str, Any]
) -> Optional[Any]:
"""Основная логика проверки"""
# Пропускаем не-сообщения и не-callback
if not isinstance(event, (Message, CallbackQuery)):
return await handler(event, data)
user_id = event.from_user.id if event.from_user else None
if user_id is None:
return await handler(event, data)
user_str = f"@{event.from_user.username}" if event.from_user.username else f"id{user_id}"
# Whitelist для администраторов
if self.whitelist_admins and user_id in (settings.OWNER_ID + settings.ADMIN_ID):
if self.log_all:
logger.debug(f"Администратор {user_str} пропущен", log_type='ANTI_SPAM')
return await handler(event, data)
current_time = time()
user_stats = spam_stats.get_user(user_id)
# Проверка блокировки
if user_stats.is_blocked(current_time):
remaining = user_stats.get_remaining_block_time(current_time)
spam_stats.total_blocked_requests += 1
logger.warning(
f"Запрос от заблокированного пользователя (осталось {self._format_duration(remaining)})",
log_type='ANTI_SPAM',
user=user_str
)
block_message = (
f"🚫 Вы заблокированы за спам!\n\n"
f"⏳ Оставшееся время: {self._format_duration(remaining)}\n"
f"⚠️ Предупреждений: {user_stats.warnings}"
)
if isinstance(event, Message):
await event.answer(block_message, parse_mode="HTML")
elif isinstance(event, CallbackQuery):
await event.answer(
f"🚫 Заблокирован на {self._format_duration(remaining)}",
show_alert=True
)
return None
# Извлекаем контекст сообщения
context = self._extract_context(event)
# Очищаем старые запросы
user_stats.clean_old_requests(current_time, self.time_window)
# Умная детекция спам-паттернов
if self.enable_smart_detection:
spam_analysis = user_stats.detect_spam_patterns()
if spam_analysis['is_spam']:
user_stats.add_warning()
spam_stats.total_warnings_issued += 1
logger.warning(
f"Обнаружен спам-паттерн: {spam_analysis['reason']} - {spam_analysis['details']}",
log_type='ANTI_SPAM',
user=user_str
)
# Немедленная блокировка при явном спаме
if spam_analysis['severity'] >= 0.9:
block_duration = self._calculate_block_duration(user_stats.warnings)
user_stats.block(current_time, block_duration)
logger.error(
f"Пользователь заблокирован за спам: {spam_analysis['reason']}",
log_type='ANTI_SPAM',
user=user_str
)
block_message = (
f"🚫 Вы заблокированы за спам!\n\n"
f"⏳ Длительность: {self._format_duration(block_duration)}\n"
f"⚠️ Причина: {spam_analysis['details']}"
)
if isinstance(event, Message):
await event.answer(block_message, parse_mode="HTML")
elif isinstance(event, CallbackQuery):
await event.answer(
f"🚫 Блокировка: {spam_analysis['reason']}",
show_alert=True
)
return None
# Получаем эффективный лимит
effective_limit = self._get_effective_rate_limit(user_stats, context)
# Подсчитываем релевантные запросы
relevant_requests = 0
for req_context in user_stats.message_contexts:
if context.is_forward and req_context.is_forward:
relevant_requests += 1
elif context.callback_data and req_context.callback_data:
relevant_requests += 1
elif context.media_type and req_context.media_type:
relevant_requests += 1
elif not (req_context.is_forward or req_context.callback_data or req_context.media_type or req_context.is_command):
relevant_requests += 1
if self.log_all:
logger.debug(
f"Rate limit: {relevant_requests}/{effective_limit} (тип: {context.media_type or 'text'}, репутация: {user_stats.reputation:.2f})",
log_type='ANTI_SPAM',
user=user_str
)
# Проверка лимита
if relevant_requests >= effective_limit:
user_stats.add_warning()
spam_stats.total_warnings_issued += 1
logger.warning(
f"Превышен rate limit ({relevant_requests}/{effective_limit}). "
f"Предупреждение {user_stats.warnings}/{self.warning_limit}",
log_type='ANTI_SPAM',
user=user_str
)
# Блокировка при достижении лимита предупреждений
if user_stats.warnings >= self.warning_limit:
block_duration = self._calculate_block_duration(user_stats.warnings)
user_stats.block(current_time, block_duration)
logger.error(
f"Пользователь заблокирован на {self._format_duration(block_duration)}. "
f"Всего блокировок: {user_stats.total_blocks}",
log_type='ANTI_SPAM',
user=user_str
)
block_message = (
f"🚫 Вы заблокированы за спам!\n\n"
f"⏳ Длительность: {self._format_duration(block_duration)}\n"
f"⚠️ Причина: Превышение лимита запросов\n"
f"📊 Это блокировка #{user_stats.total_blocks}"
)
if isinstance(event, Message):
await event.answer(block_message, parse_mode="HTML")
elif isinstance(event, CallbackQuery):
await event.answer(
f"🚫 Блокировка на {self._format_duration(block_duration)}",
show_alert=True
)
return None
# Предупреждение
warning_message = (
f"⚠️ Предупреждение #{user_stats.warnings}\n\n"
f"Вы отправляете запросы слишком часто!\n"
f"Лимит: {effective_limit} запросов за {self._format_duration(self.time_window)}\n\n"
f"При {self.warning_limit} предупреждениях последует блокировка."
)
if isinstance(event, Message):
await event.answer(warning_message, parse_mode="HTML")
elif isinstance(event, CallbackQuery):
await event.answer(
f"⚠️ Предупреждение {user_stats.warnings}/{self.warning_limit}",
show_alert=True
)
return None
# Добавляем текущий запрос
user_stats.add_request(current_time, context)
# Улучшаем репутацию за нормальное поведение
if self.enable_reputation and user_stats.total_requests % 10 == 0:
user_stats.improve_reputation()
if self.log_all:
logger.debug(
f"Запрос разрешен. Всего: {user_stats.total_requests}, репутация: {user_stats.reputation:.2f}",
log_type='ANTI_SPAM',
user=user_str
)
return await handler(event, data)
# ================= УПРАВЛЕНИЕ =================
async def reset_spam_warnings(user_id: int) -> bool:
"""Сбрасывает предупреждения пользователя"""
if user_id in spam_stats.users:
spam_stats.users[user_id].warnings = 0
spam_stats.users[user_id].blocked_until = None
logger.info(f"Предупреждения сброшены для id{user_id}", log_type='ANTI_SPAM')
return True
return False
async def unblock_user(user_id: int) -> bool:
"""Разблокирует пользователя"""
if user_id in spam_stats.users:
stats = spam_stats.users[user_id]
if stats.blocked_until:
stats.blocked_until = None
stats.warnings = 0
logger.info(f"Пользователь id{user_id} разблокирован вручную", log_type='ANTI_SPAM')
return True
return False
async def get_user_spam_info(user_id: int) -> Optional[Dict[str, Any]]:
"""Получает информацию о спам-статистике пользователя"""
if user_id not in spam_stats.users:
return None
stats = spam_stats.users[user_id]
current_time = time()
return {
'user_id': user_id,
'warnings': stats.warnings,
'reputation': stats.reputation,
'is_blocked': stats.is_blocked(current_time),
'blocked_until': datetime.fromtimestamp(stats.blocked_until) if stats.blocked_until else None,
'remaining_block_time': stats.get_remaining_block_time(current_time),
'total_requests': stats.total_requests,
'total_blocks': stats.total_blocks,
'first_seen': datetime.fromtimestamp(stats.first_seen) if stats.first_seen else None,
'last_seen': datetime.fromtimestamp(stats.last_seen) if stats.last_seen else None
}