"""
Обработчики команды статистики
"""
from datetime import datetime
from aiogram import Router, F
from aiogram.filters import Command
from aiogram.types import Message, CallbackQuery, InlineKeyboardMarkup, InlineKeyboardButton
from aiogram.utils.keyboard import InlineKeyboardBuilder
from bot.filters.admin import IsAdmin
from configs import settings, COMMANDS
from database import get_manager
from middleware.loggers import logger
from bot.utils.decorators import log_action
__all__ = ("router",)
router: Router = Router(name="stats_router")
# ================= ВСПОМОГАТЕЛЬНЫЕ ФУНКЦИИ =================
def format_number(num: int) -> str:
"""Форматирует большие числа с разделителями"""
return f"{num:,}".replace(",", " ")
def create_text_bar(value: int, max_value: int, length: int = 10) -> str:
"""Создает текстовую полоску прогресса"""
if max_value == 0:
return "░" * length
filled = int((value / max_value) * length)
filled = max(0, min(filled, length))
empty = length - filled
return "█" * filled + "░" * empty
def format_datetime(dt: datetime) -> str:
"""Форматирует datetime в читабельный формат"""
return dt.strftime("%d.%m.%Y %H:%M")
def format_time_remaining(minutes: int) -> str:
"""
Форматирует оставшееся время в читабельный формат.
Args:
minutes: Количество минут
Returns:
Отформатированная строка времени
"""
if minutes <= 0:
return "истёк"
elif minutes < 60:
return f"{minutes} мин"
elif minutes < 1440: # < 24 часов
hours = minutes // 60
mins = minutes % 60
if mins > 0:
return f"{hours}ч {mins}м"
return f"{hours}ч"
else: # >= 24 часов
days = minutes // 1440
hours = (minutes % 1440) // 60
if hours > 0:
return f"{days}д {hours}ч"
return f"{days}д"
def get_stats_keyboard():
"""Клавиатура для статистики"""
ikb = InlineKeyboardBuilder()
ikb.button(text="🔄 Обновить", callback_data="stats:refresh")
ikb.button(text="📊 Детали", callback_data="stats:details")
ikb.button(text="🏆 Топ-спамеры", callback_data="stats:top_spammers")
ikb.button(text="🔤 Топ-слова", callback_data="stats:top_words")
ikb.button(text="🚀 Назад", callback_data="start")
ikb.adjust(2, 2, 1)
return ikb.as_markup()
# ================= ОСНОВНАЯ СТАТИСТИКА =================
@router.callback_query(F.data == "stats:refresh")
@router.callback_query(F.data == "stats")
@router.message(Command(*COMMANDS.get("stats", ["stats"]), prefix=settings.PREFIX, ignore_case=True), IsAdmin())
@log_action(action_name="VIEW_STATS")
async def stats_cmd(update: Message | CallbackQuery) -> None:
"""
Показывает общую статистику работы бота.
Включает:
- Общее количество удалений
- Активные режимы
- Статистику банвордов
- Топ спамеров
Использование: /stats
"""
# Определяем тип update
if isinstance(update, CallbackQuery):
message = update.message
is_callback = True
else:
message = update
is_callback = False
manager = get_manager()
try:
# Получаем данные
stats = await manager.get_stats()
data = await manager.get_all_words_list()
top_spammers = await manager.get_top_spammers(limit=5)
# Проверяем активные режимы
is_silence = await manager.is_silence_active()
is_conflict = await manager.is_conflict_active()
# === ФОРМИРУЕМ ВЫВОД ===
output = "📊 СТАТИСТИКА PRIMOGUARD\n\n"
# Общая информация
total_deletions = stats.get('total_deletions', 0)
output += f"🗑 Всего удалений: {format_number(total_deletions)}\n\n"
# Активные режимы
if is_silence or is_conflict:
output += "🔴 АКТИВНЫЕ РЕЖИМЫ:\n\n"
if is_silence:
silence_until_str = await manager.repo.get_setting("silence_until")
silence_until = datetime.fromtimestamp(float(silence_until_str))
time_left_seconds = (silence_until - datetime.now()).total_seconds()
time_left_minutes = int(time_left_seconds / 60)
output += f"🔇 Режим тишины\n"
output += f"├─ ⏱ Осталось: {format_time_remaining(time_left_minutes)}\n"
output += f"└─ 🕐 До: {format_datetime(silence_until)}\n"
if is_conflict:
output += "│\n"
if is_conflict:
conflict_until_str = await manager.repo.get_setting("conflict_until")
conflict_until = datetime.fromtimestamp(float(conflict_until_str))
time_left_seconds = (conflict_until - datetime.now()).total_seconds()
time_left_minutes = int(time_left_seconds / 60)
conflict_words_count = len(data.get('conflict_substring', set()))
conflict_lemmas_count = len(data.get('conflict_lemma', set()))
total_conflict = conflict_words_count + conflict_lemmas_count
output += f"⚔️ Режим антиконфликта\n"
output += f"├─ ⏱ Осталось: {format_time_remaining(time_left_minutes)}\n"
output += f"├─ 🕐 До: {format_datetime(conflict_until)}\n"
output += f"└─ 📊 Правил: {total_conflict}\n"
output += "\n"
# Статистика правил
total_rules = (
len(data.get('substring', set())) +
len(data.get('lemma', set())) +
len(data.get('part', set())) +
len(data.get('temp_substring', set())) +
len(data.get('temp_lemma', set())) +
len(data.get('conflict_substring', set())) +
len(data.get('conflict_lemma', set()))
)
output += f"📋 Правила модерации:\n"
output += f"├─ Всего правил: {total_rules}\n"
output += f"├─ Постоянные: {len(data.get('substring', set())) + len(data.get('lemma', set())) + len(data.get('part', set()))}\n"
output += f"├─ Временные: {len(data.get('temp_substring', set())) + len(data.get('temp_lemma', set()))}\n"
output += f"├─ Конфликтные: {len(data.get('conflict_substring', set())) + len(data.get('conflict_lemma', set()))}\n"
output += f"└─ Исключения: {len(data.get('whitelist', set()))}\n\n"
# Топ-5 спамеров
if top_spammers:
output += "🏆 Топ-5 спамеров:\n"
max_count = top_spammers[0][1] if top_spammers else 1
for idx, (user_id, count) in enumerate(top_spammers, 1):
bar = create_text_bar(count, max_count, length=8)
output += f'{idx}. {user_id} — {count} [{bar}]\n'
output += "\n"
else:
output += "🏆 Топ-5 спамеров:\n"
output += "└─ Нет данных\n\n"
# Администраторы
admins_count = len(settings.OWNER_ID) + len(data.get('admins', set()))
output += f"👥 Администраторов: {admins_count}\n\n"
# Подсказка
output += "💡 Используйте кнопки для детальной информации"
# Клавиатура
keyboard = get_stats_keyboard()
# Отправка
if is_callback:
await message.edit_text(
text=output,
parse_mode="HTML",
reply_markup=keyboard
)
await update.answer("✅ Статистика обновлена")
else:
await message.answer(
text=output,
parse_mode="HTML",
reply_markup=keyboard
)
except Exception as e:
logger.error(f"Ошибка получения статистики: {e}", log_type="STATS")
error_text = "❌ Ошибка загрузки статистики\n\nПопробуйте позже"
if is_callback:
await update.answer("❌ Ошибка загрузки", show_alert=True)
else:
await message.answer(error_text, parse_mode="HTML")
# ================= ДЕТАЛЬНАЯ СТАТИСТИКА =================
@router.callback_query(F.data == "stats:details")
@log_action(action_name="VIEW_DETAILED_STATS")
async def stats_details_callback(callback: CallbackQuery) -> None:
"""Показывает детальную статистику"""
manager = get_manager()
try:
stats = await manager.get_stats()
data = await manager.get_all_words_list()
output = "📊 ДЕТАЛЬНАЯ СТАТИСТИКА\n\n"
# Подробная статистика удалений
total_deletions = stats.get('total_deletions', 0)
output += f"🗑 Удаления сообщений:\n"
output += f"├─ Всего: {format_number(total_deletions)}\n"
output += "\n"
# Активные режимы (детально)
is_silence = await manager.is_silence_active()
is_conflict = await manager.is_conflict_active()
if is_silence or is_conflict:
output += "🔴 Активные режимы:\n\n"
if is_silence:
silence_until_str = await manager.repo.get_setting("silence_until")
silence_until = datetime.fromtimestamp(float(silence_until_str))
time_left_seconds = (silence_until - datetime.now()).total_seconds()
time_left_minutes = int(time_left_seconds / 60)
output += f"🔇 Режим тишины:\n"
output += f"├─ Статус: ✅ Активен\n"
output += f"├─ Осталось: {format_time_remaining(time_left_minutes)}\n"
output += f"├─ Окончание: {format_datetime(silence_until)}\n"
output += f"└─ Эффект: Удаляются ВСЕ сообщения\n\n"
if is_conflict:
conflict_until_str = await manager.repo.get_setting("conflict_until")
conflict_until = datetime.fromtimestamp(float(conflict_until_str))
time_left_seconds = (conflict_until - datetime.now()).total_seconds()
time_left_minutes = int(time_left_seconds / 60)
conflict_words_count = len(data.get('conflict_substring', set()))
conflict_lemmas_count = len(data.get('conflict_lemma', set()))
output += f"⚔️ Режим антиконфликта:\n"
output += f"├─ Статус: ✅ Активен\n"
output += f"├─ Осталось: {format_time_remaining(time_left_minutes)}\n"
output += f"├─ Окончание: {format_datetime(conflict_until)}\n"
output += f"├─ Слов: {conflict_words_count}\n"
output += f"├─ Лемм: {conflict_lemmas_count}\n"
output += f"└─ Эффект: Обычные банворды отключены\n\n"
# Детальная статистика правил
output += f"📋 Правила модерации:\n\n"
output += f"🔴 Постоянные:\n"
output += f"├─ Подстроки: {len(data.get('substring', set()))}\n"
output += f"├─ Леммы: {len(data.get('lemma', set()))}\n"
output += f"└─ Части: {len(data.get('part', set()))}\n\n"
output += f"⏱ Временные:\n"
output += f"├─ Подстроки: {len(data.get('temp_substring', set()))}\n"
output += f"└─ Леммы: {len(data.get('temp_lemma', set()))}\n\n"
output += f"⚔️ Конфликтные:\n"
output += f"├─ Слова: {len(data.get('conflict_substring', set()))}\n"
output += f"└─ Леммы: {len(data.get('conflict_lemma', set()))}\n\n"
output += f"✅ Исключения: {len(data.get('whitelist', set()))}\n\n"
# Информация о кэше
cache_info = stats.get('cache_active', False)
cache_updated = stats.get('cache_updated_at', None)
output += f"💾 Кэш:\n"
output += f"├─ Статус: {'✅ Активен' if cache_info else '❌ Неактивен'}\n"
if cache_updated and isinstance(cache_updated, str):
try:
updated_dt = datetime.fromisoformat(cache_updated)
output += f"└─ Обновлён: {format_datetime(updated_dt)}\n"
except (ValueError, TypeError):
output += f"└─ Обновлён: недавно\n"
else:
output += f"└─ Не обновлялся\n"
# Кнопка возврата
ikb = InlineKeyboardBuilder()
ikb.button(text="◀️ Назад", callback_data="stats:refresh")
await callback.message.edit_text(
text=output,
parse_mode="HTML",
reply_markup=ikb.as_markup()
)
await callback.answer()
except Exception as e:
logger.error(f"Ошибка получения детальной статистики: {e}", log_type="STATS")
await callback.answer("❌ Ошибка загрузки", show_alert=True)
# ================= ТОП СПАМЕРОВ =================
@router.callback_query(F.data == "stats:top_spammers")
@log_action(action_name="VIEW_TOP_SPAMMERS")
async def stats_top_spammers_callback(callback: CallbackQuery) -> None:
"""Показывает топ-10 спамеров"""
manager = get_manager()
try:
top_spammers = await manager.get_top_spammers(limit=10)
output = "🏆 ТОП-10 СПАМЕРОВ\n\n"
if top_spammers:
max_count = top_spammers[0][1] if top_spammers else 1
for idx, (user_id, count) in enumerate(top_spammers, 1):
bar = create_text_bar(count, max_count, length=10)
# Эмодзи для топ-3
if idx == 1:
medal = "🥇"
elif idx == 2:
medal = "🥈"
elif idx == 3:
medal = "🥉"
else:
medal = f"{idx}."
output += f'{medal} {user_id}\n'
output += f" └─ {format_number(count)} удалений [{bar}]\n\n"
# Общая статистика
total_spammers = len(top_spammers)
total_deletions = sum(count for _, count in top_spammers)
output += f"📊 Статистика:\n"
output += f"├─ Всего пользователей: {total_spammers}\n"
output += f"└─ Всего удалений: {format_number(total_deletions)}\n\n"
output += "💡 ID можно использовать для проверки пользователя"
else:
output += "└─ Нет данных об удалениях\n\n"
output += "💡 Когда бот начнёт удалять сообщения, здесь появится статистика"
# Кнопка возврата
ikb = InlineKeyboardBuilder()
ikb.button(text="◀️ Назад", callback_data="stats:refresh")
await callback.message.edit_text(
text=output,
parse_mode="HTML",
reply_markup=ikb.as_markup()
)
await callback.answer()
except Exception as e:
logger.error(f"Ошибка получения топ спамеров: {e}", log_type="STATS")
await callback.answer("❌ Ошибка загрузки", show_alert=True)
# ================= ТОП СЛОВ =================
@router.callback_query(F.data == "stats:top_words")
async def stats_top_words_callback(callback: CallbackQuery) -> None:
"""Показывает топ-10 самых частых срабатываний"""
await callback.answer()
manager = get_manager()
# Получаем топ слов
top_words = await manager.get_top_words(limit=10)
if not top_words:
text = (
"🔤 ТОП-10 СРАБАТЫВАНИЙ ПО СЛОВАМ\n\n"
"📭 Статистика пока пуста\n\n"
"Срабатывания появятся после удаления\n"
"первых спам-сообщений."
)
else:
text = "🔤 ТОП-10 СРАБАТЫВАНИЙ ПО СЛОВАМ\n\n"
# Эмодзи для типов
type_emoji = {
"substring": "🔤",
"lemma": "📖",
"part": "🧩",
"silence": "🔇",
"conflict_substring": "⚔️",
"conflict_lemma": "⚔️"
}
for i, word_data in enumerate(top_words, 1):
word = word_data['word']
count = word_data['count']
word_type = word_data['type']
emoji = type_emoji.get(word_type, "❓")
# Медали для топ-3
medal = ""
if i == 1:
medal = "🥇 "
elif i == 2:
medal = "🥈 "
elif i == 3:
medal = "🥉 "
text += f"{medal}{i}. {emoji} {word} — {count} раз\n"
# Общая статистика
total = await manager.get_total_spam_count()
text += f"\n📊 Всего удалено: {total} сообщений"
# Кнопка назад
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="◀️ Назад", callback_data="stats:refresh")]
])
try:
await callback.message.edit_text(
text=text,
reply_markup=keyboard,
parse_mode="HTML"
)
except Exception as e:
logger.error(f"Ошибка показа топ-слов: {e}", log_type="ERROR")
await callback.answer("❌ Ошибка загрузки статистики", show_alert=True)
# ================= СТАТИСТИКА ПОЛЬЗОВАТЕЛЯ =================
@router.message(Command(*COMMANDS.get("userstats", ["userstats"]), prefix=settings.PREFIX, ignore_case=True), IsAdmin())
@log_action(action_name="VIEW_USER_STATS", log_args=True)
async def user_stats_cmd(message: Message) -> None:
"""
Показывает статистику конкретного пользователя.
Использование: /userstats
Пример: /userstats 123456789
"""
parts = message.text.split(maxsplit=1)
if len(parts) < 2:
await message.answer(
"❌ Использование: /userstats [ID]\n\n"
"Пример: /userstats 123456789",
parse_mode="HTML"
)
return
try:
user_id = int(parts[1].strip())
except ValueError:
await message.answer("❌ ID должен быть числом", parse_mode="HTML")
return
manager = get_manager()
try:
# Получаем статистику пользователя
user_spam_count = await manager.get_user_spam_count(user_id)
user_spam_stats = await manager.get_spam_stats(limit=10, user_id=user_id)
output = f"👤 СТАТИСТИКА ПОЛЬЗОВАТЕЛЯ\n\n"
output += f'🆔 ID: {user_id}\n\n'
if user_spam_count > 0:
output += f"🗑 Удалено сообщений: {format_number(user_spam_count)}\n\n"
if user_spam_stats:
output += f"📝 Последние удаления:\n"
for stat in user_spam_stats[:5]:
deleted_at = stat.deleted_at
matched_word = stat.matched_word or "неизвестно"
match_type = stat.match_type or "unknown"
output += f"├─ {format_datetime(deleted_at)}\n"
output += f"│ └─ Слово: {matched_word} ({match_type})\n"
output += "\n"
else:
output += "✅ Нет нарушений\n\n"
output += "Этот пользователь не нарушал правила чата"
await message.answer(output, parse_mode="HTML")
except Exception as e:
logger.error(f"Ошибка получения статистики пользователя: {e}", log_type="STATS")
await message.answer("❌ Ошибка загрузки статистики", parse_mode="HTML")
# ================= СБРОС СТАТИСТИКИ =================
@router.message(Command(*COMMANDS.get("resetstats", ["resetstats"]), prefix=settings.PREFIX, ignore_case=True),
IsAdmin())
@log_action(action_name="RESET_STATS")
async def reset_stats_cmd(message: Message) -> None:
"""
Сбрасывает всю статистику удалений.
⚠️ ВНИМАНИЕ: Это действие необратимо!
Использование: /resetstats confirm
"""
parts = message.text.split(maxsplit=1)
if len(parts) < 2 or parts[1].lower() != "confirm":
await message.answer(
"⚠️ ВНИМАНИЕ!\n\n"
"Эта команда удалит ВСЮ статистику удалений:\n"
"• Счётчики удалений пользователей\n"
"• Историю удалённых сообщений\n"
"• Топ спамеров\n\n"
"Правила модерации НЕ будут удалены.\n\n"
"Для подтверждения используйте:\n"
"/resetstats confirm",
parse_mode="HTML"
)
return
manager = get_manager()
try:
# Сбрасываем статистику
deleted_count = await manager.reset_spam_stats()
if deleted_count > 0:
await message.answer(
f"✅ Статистика сброшена\n\n"
f"Удалено записей: {deleted_count}\n\n"
f"Новые данные начнут собираться\n"
f"с этого момента.",
parse_mode="HTML"
)
logger.warning(
f"Статистика сброшена пользователем {message.from_user.id}: "
f"удалено {deleted_count} записей",
log_type="STATS"
)
else:
await message.answer(
"ℹ️ Статистика уже пуста",
parse_mode="HTML"
)
except Exception as e:
logger.error(f"Ошибка сброса статистики: {e}", log_type="STATS")
await message.answer("❌ Ошибка сброса статистики", parse_mode="HTML")