diff --git a/bot/handlers/commands/admins/ban_cmd.py b/bot/handlers/commands/admins/ban_cmd.py new file mode 100644 index 0000000..9e5f369 --- /dev/null +++ b/bot/handlers/commands/admins/ban_cmd.py @@ -0,0 +1,258 @@ +from aiogram import Router +from aiogram.filters import Command +from aiogram.fsm.context import FSMContext +from aiogram.types import Message, User +from html import escape + +from bot.filters import IsAdmin +from bot.utils import status_clear +from configs import COMMANDS +from database import db + +# Настройки роутера +__all__ = ("router",) + +from middleware import logger + +CMD: str = "ban" +router: Router = Router(name=f"{CMD}_cmd_router") + + +@router.message(Command(*COMMANDS[CMD], ignore_case=True), IsAdmin()) +async def ban_user_cmd(message: Message, state: FSMContext) -> None: + """ + Команда /ban для блокировки пользователей. + Использование: /ban или ответ на сообщение пользователя + /ban + """ + await status_clear(update=message, state=state) + + try: + # Проверяем есть ли ответ на сообщение + if message.reply_to_message: + # Бан по ответу на сообщение + target_user: User | None = message.reply_to_message.from_user + if not target_user: + await message.answer("❌ Не удалось определить пользователя") + return + + target_user_id: int = target_user.id + target_username: str = target_user.username or target_user.full_name or f"ID{target_user_id}" + + # Проверяем, не пытаемся ли забанить бота + if target_user_id == message.bot.id: + await message.answer("❌ Нельзя заблокировать бота!") + return + + # Баним пользователя + success: bool = await _ban_user(target_user_id, target_username, message) + + if success: + safe_username: str = escape(target_username) + response_text = f"✅ Пользователь {safe_username} (ID: {target_user_id}) заблокирован!" + + # Пытаемся забанить в чате (если команда вызвана в группе/чате) + if message.chat.type in ["group", "supergroup"]: + try: + await message.bot.ban_chat_member( + chat_id=message.chat.id, + user_id=target_user_id + ) + response_text += "\n🚫 Пользователь исключен из чата." + except Exception as e: + logger.warning(f"Не удалось исключить пользователя из чата: {e}") + response_text += "\n⚠️ Не удалось исключить пользователя из чата." + + await message.answer( + text=response_text, + parse_mode=None # Отключаем разметку + ) + else: + await message.answer("❌ Не удалось заблокировать пользователя") + + else: + # Бан по ID пользователя + command_parts: list[str] = message.text.split() + if len(command_parts) < 2: + await message.answer( + "ℹ️ Использование команды:\n" + "• Ответьте на сообщение пользователя командой /ban\n" + "• Или укажите ID: /ban " + ) + return + + try: + target_user_id: int = int(command_parts[1]) + + # Проверяем, не пытаемся ли забанить бота + if target_user_id == message.bot.id: + await message.answer("❌ Нельзя заблокировать бота!") + return + + success: bool = await _ban_user(target_user_id, f"ID{target_user_id}", message) + + if success: + response_text = f"✅ Пользователь (ID: {target_user_id}) заблокирован!" + + # Пытаемся забанить в чате + if message.chat.type in ["group", "supergroup"]: + try: + await message.bot.ban_chat_member( + chat_id=message.chat.id, + user_id=target_user_id + ) + response_text += "\n🚫 Пользователь исключен из чата." + except Exception as e: + logger.warning(f"Не удалось исключить пользователя из чата: {e}") + response_text += "\n⚠️ Не удалось исключить пользователя из чата." + + await message.answer( + text=response_text, + parse_mode=None + ) + else: + await message.answer("❌ Пользователь не найден или уже заблокирован") + + except ValueError: + await message.answer("❌ Неверный формат ID пользователя") + + except Exception as e: + logger.error(f"Ошибка в команде /ban: {e}") + await message.answer( + "⚠️ Произошла непредвиденная ошибка при выполнении команды.\n" + "Попробуйте повторить действие позже или нажмите /start" + ) + + +async def _ban_user(user_id: int, username: str, message: Message) -> bool: + """ + Внутренняя функция для блокировки пользователя. + """ + try: + # Сначала проверяем существует ли пользователь + user: User | None = await db.get_user(user_id) + + if not user: + # Если пользователя нет - создаем его забаненным + await db.add_user( + user_id=user_id, + username=username, + full_name=username + ) + + # Баним пользователя + await db.ban_user(user_id) + + # Логируем действие + admin_username = message.from_user.username or message.from_user.full_name or f"ID{message.from_user.id}" + logger.info(f"🛑 Админ @{admin_username} заблокировал пользователя @{username} (ID: {user_id})") + + return True + + except Exception as e: + logger.error(f"❌ Ошибка при блокировке пользователя {user_id}: {e}") + return False + + +@router.message(Command("unban", ignore_case=True), IsAdmin()) +async def unban_user_cmd(message: Message, state: FSMContext) -> None: + """ + Команда /unban для разблокировки пользователей. + """ + await status_clear(update=message, state=state) + + try: + if message.reply_to_message: + target_user: User | None = message.reply_to_message.from_user + if not target_user: + await message.answer("❌ Не удалось определить пользователя") + return + + target_user_id: int = target_user.id + target_username: str = target_user.username or target_user.full_name or f"ID{target_user_id}" + else: + command_parts: list[str] = message.text.split() + if len(command_parts) < 2: + await message.answer( + "ℹ️ Использование команды:\n" + "• Ответьте на сообщение пользователя командой /unban\n" + "• Или укажите ID: /unban " + ) + return + + try: + target_user_id: int = int(command_parts[1]) + target_username: str = f"ID{target_user_id}" + except ValueError: + await message.answer("❌ Неверный формат ID пользователя") + return + + # Разбаниваем пользователя + await db.unban_user(target_user_id) + + # Логируем действие + admin_username: str = message.from_user.username or message.from_user.full_name or f"ID{message.from_user.id}" + logger.info(f"🔓 Админ @{admin_username} разблокировал пользователя @{target_username} (ID: {target_user_id})") + + # Экранируем специальные символы + safe_username: str = escape(target_username) + + response_text = f"✅ Пользователь {safe_username} (ID: {target_user_id}) разблокирован!" + + # Пытаемся разбанить в чате + if message.chat.type in ["group", "supergroup"]: + try: + await message.bot.unban_chat_member( + chat_id=message.chat.id, + user_id=target_user_id + ) + response_text += "\n👥 Пользователь может вернуться в чат." + except Exception as e: + logger.warning(f"Не удалось разблокировать пользователя в чате: {e}") + + await message.answer( + text=response_text, + parse_mode=None + ) + + except Exception as e: + logger.error(f"❌ Ошибка при разблокировке пользователя: {e}") + await message.answer("❌ Не удалось разблокировать пользователя") + + +@router.message(Command("banned_list", ignore_case=True), IsAdmin()) +async def banned_list_cmd(message: Message, state: FSMContext) -> None: + """ + Команда /banned_list для просмотра списка забаненных пользователей. + """ + await status_clear(update=message, state=state) + + try: + # Получаем всех пользователей включая забаненных + all_users: list[User] = await db.get_all_users(include_banned=True) + + # Фильтруем только забаненных + banned_users: list[User] = [user for user in all_users if getattr(user, 'status', None) == "banned"] + + if not banned_users: + await message.answer("📭 Список забаненных пользователей пуст") + return + + # Формируем сообщение со списком + banned_list: str = "🚫 Заблокированные пользователи:\n\n" + + for user in banned_users[:50]: # Ограничиваем вывод + username: str = f"@{user.username}" if getattr(user, 'username', None) else getattr(user, 'full_name', + 'Неизвестно') + # Экранируем специальные символы + safe_username = escape(username) + user_id = getattr(user, 'id', 'N/A') + banned_list += f"• {safe_username} (ID: {user_id})\n" + + if len(banned_users) > 50: + banned_list += f"\n... и еще {len(banned_users) - 50} пользователей" + + await message.answer(banned_list, parse_mode=None) + + except Exception as e: + logger.error(f"❌ Ошибка при получении списка забаненных: {e}") + await message.answer("❌ Не удалось получить список забаненных пользователей")