259 lines
12 KiB
Python
259 lines
12 KiB
Python
from aiogram import Router
|
||
from aiogram.filters import Command
|
||
from aiogram.fsm.context import FSMContext
|
||
from aiogram.types import Message, User
|
||
from html import escape
|
||
|
||
from bot.filters import IsAdmin
|
||
from bot.utils import status_clear
|
||
from configs import COMMANDS
|
||
from database import db
|
||
|
||
# Настройки роутера
|
||
__all__ = ("router",)
|
||
|
||
from middleware import logger
|
||
|
||
CMD: str = "ban"
|
||
router: Router = Router(name=f"{CMD}_cmd_router")
|
||
|
||
|
||
@router.message(Command(*COMMANDS[CMD], ignore_case=True), IsAdmin())
|
||
async def ban_user_cmd(message: Message, state: FSMContext) -> None:
|
||
"""
|
||
Команда /ban для блокировки пользователей.
|
||
Использование: /ban <user_id> или ответ на сообщение пользователя + /ban
|
||
"""
|
||
await status_clear(message=message, state=state)
|
||
|
||
try:
|
||
# Проверяем есть ли ответ на сообщение
|
||
if message.reply_to_message:
|
||
# Бан по ответу на сообщение
|
||
target_user: User | None = message.reply_to_message.from_user
|
||
if not target_user:
|
||
await message.answer("❌ Не удалось определить пользователя")
|
||
return
|
||
|
||
target_user_id: int = target_user.id
|
||
target_username: str = target_user.username or target_user.full_name or f"ID{target_user_id}"
|
||
|
||
# Проверяем, не пытаемся ли забанить бота
|
||
if target_user_id == message.bot.id:
|
||
await message.answer("❌ Нельзя заблокировать бота!")
|
||
return
|
||
|
||
# Баним пользователя
|
||
success: bool = await _ban_user(target_user_id, target_username, message)
|
||
|
||
if success:
|
||
safe_username: str = escape(target_username)
|
||
response_text = f"✅ Пользователь {safe_username} (ID: {target_user_id}) заблокирован!"
|
||
|
||
# Пытаемся забанить в чате (если команда вызвана в группе/чате)
|
||
if message.chat.type in ["group", "supergroup"]:
|
||
try:
|
||
await message.bot.ban_chat_member(
|
||
chat_id=message.chat.id,
|
||
user_id=target_user_id
|
||
)
|
||
response_text += "\n🚫 Пользователь исключен из чата."
|
||
except Exception as e:
|
||
logger.warning(f"Не удалось исключить пользователя из чата: {e}")
|
||
response_text += "\n⚠️ Не удалось исключить пользователя из чата."
|
||
|
||
await message.answer(
|
||
text=response_text,
|
||
parse_mode=None # Отключаем разметку
|
||
)
|
||
else:
|
||
await message.answer("❌ Не удалось заблокировать пользователя")
|
||
|
||
else:
|
||
# Бан по ID пользователя
|
||
command_parts: list[str] = message.text.split()
|
||
if len(command_parts) < 2:
|
||
await message.answer(
|
||
"ℹ️ Использование команды:\n"
|
||
"• Ответьте на сообщение пользователя командой /ban\n"
|
||
"• Или укажите ID: /ban <user_id>"
|
||
)
|
||
return
|
||
|
||
try:
|
||
target_user_id: int = int(command_parts[1])
|
||
|
||
# Проверяем, не пытаемся ли забанить бота
|
||
if target_user_id == message.bot.id:
|
||
await message.answer("❌ Нельзя заблокировать бота!")
|
||
return
|
||
|
||
success: bool = await _ban_user(target_user_id, f"ID{target_user_id}", message)
|
||
|
||
if success:
|
||
response_text = f"✅ Пользователь (ID: {target_user_id}) заблокирован!"
|
||
|
||
# Пытаемся забанить в чате
|
||
if message.chat.type in ["group", "supergroup"]:
|
||
try:
|
||
await message.bot.ban_chat_member(
|
||
chat_id=message.chat.id,
|
||
user_id=target_user_id
|
||
)
|
||
response_text += "\n🚫 Пользователь исключен из чата."
|
||
except Exception as e:
|
||
logger.warning(f"Не удалось исключить пользователя из чата: {e}")
|
||
response_text += "\n⚠️ Не удалось исключить пользователя из чата."
|
||
|
||
await message.answer(
|
||
text=response_text,
|
||
parse_mode=None
|
||
)
|
||
else:
|
||
await message.answer("❌ Пользователь не найден или уже заблокирован")
|
||
|
||
except ValueError:
|
||
await message.answer("❌ Неверный формат ID пользователя")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Ошибка в команде /ban: {e}")
|
||
await message.answer(
|
||
"⚠️ Произошла непредвиденная ошибка при выполнении команды.\n"
|
||
"Попробуйте повторить действие позже или нажмите /start"
|
||
)
|
||
|
||
|
||
async def _ban_user(user_id: int, username: str, message: Message) -> bool:
|
||
"""
|
||
Внутренняя функция для блокировки пользователя.
|
||
"""
|
||
try:
|
||
# Сначала проверяем существует ли пользователь
|
||
user: User | None = await db.get_user(user_id)
|
||
|
||
if not user:
|
||
# Если пользователя нет - создаем его забаненным
|
||
await db.add_user(
|
||
user_id=user_id,
|
||
username=username,
|
||
full_name=username
|
||
)
|
||
|
||
# Баним пользователя
|
||
await db.ban_user(user_id)
|
||
|
||
# Логируем действие
|
||
admin_username = message.from_user.username or message.from_user.full_name or f"ID{message.from_user.id}"
|
||
logger.info(f"🛑 Админ @{admin_username} заблокировал пользователя @{username} (ID: {user_id})")
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ Ошибка при блокировке пользователя {user_id}: {e}")
|
||
return False
|
||
|
||
|
||
@router.message(Command("unban", ignore_case=True), IsAdmin())
|
||
async def unban_user_cmd(message: Message, state: FSMContext) -> None:
|
||
"""
|
||
Команда /unban для разблокировки пользователей.
|
||
"""
|
||
await status_clear(message=message, state=state)
|
||
|
||
try:
|
||
if message.reply_to_message:
|
||
target_user: User | None = message.reply_to_message.from_user
|
||
if not target_user:
|
||
await message.answer("❌ Не удалось определить пользователя")
|
||
return
|
||
|
||
target_user_id: int = target_user.id
|
||
target_username: str = target_user.username or target_user.full_name or f"ID{target_user_id}"
|
||
else:
|
||
command_parts: list[str] = message.text.split()
|
||
if len(command_parts) < 2:
|
||
await message.answer(
|
||
"ℹ️ Использование команды:\n"
|
||
"• Ответьте на сообщение пользователя командой /unban\n"
|
||
"• Или укажите ID: /unban <user_id>"
|
||
)
|
||
return
|
||
|
||
try:
|
||
target_user_id: int = int(command_parts[1])
|
||
target_username: str = f"ID{target_user_id}"
|
||
except ValueError:
|
||
await message.answer("❌ Неверный формат ID пользователя")
|
||
return
|
||
|
||
# Разбаниваем пользователя
|
||
await db.unban_user(target_user_id)
|
||
|
||
# Логируем действие
|
||
admin_username: str = message.from_user.username or message.from_user.full_name or f"ID{message.from_user.id}"
|
||
logger.info(f"🔓 Админ @{admin_username} разблокировал пользователя @{target_username} (ID: {target_user_id})")
|
||
|
||
# Экранируем специальные символы
|
||
safe_username: str = escape(target_username)
|
||
|
||
response_text = f"✅ Пользователь {safe_username} (ID: {target_user_id}) разблокирован!"
|
||
|
||
# Пытаемся разбанить в чате
|
||
if message.chat.type in ["group", "supergroup"]:
|
||
try:
|
||
await message.bot.unban_chat_member(
|
||
chat_id=message.chat.id,
|
||
user_id=target_user_id
|
||
)
|
||
response_text += "\n👥 Пользователь может вернуться в чат."
|
||
except Exception as e:
|
||
logger.warning(f"Не удалось разблокировать пользователя в чате: {e}")
|
||
|
||
await message.answer(
|
||
text=response_text,
|
||
parse_mode=None
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ Ошибка при разблокировке пользователя: {e}")
|
||
await message.answer("❌ Не удалось разблокировать пользователя")
|
||
|
||
|
||
@router.message(Command("banned_list", ignore_case=True), IsAdmin())
|
||
async def banned_list_cmd(message: Message, state: FSMContext) -> None:
|
||
"""
|
||
Команда /banned_list для просмотра списка забаненных пользователей.
|
||
"""
|
||
await status_clear(message=message, state=state)
|
||
|
||
try:
|
||
# Получаем всех пользователей включая забаненных
|
||
all_users: list[User] = await db.get_all_users(include_banned=True)
|
||
|
||
# Фильтруем только забаненных
|
||
banned_users: list[User] = [user for user in all_users if getattr(user, 'status', None) == "banned"]
|
||
|
||
if not banned_users:
|
||
await message.answer("📭 Список забаненных пользователей пуст")
|
||
return
|
||
|
||
# Формируем сообщение со списком
|
||
banned_list: str = "🚫 Заблокированные пользователи:\n\n"
|
||
|
||
for user in banned_users[:50]: # Ограничиваем вывод
|
||
username: str = f"@{user.username}" if getattr(user, 'username', None) else getattr(user, 'full_name',
|
||
'Неизвестно')
|
||
# Экранируем специальные символы
|
||
safe_username = escape(username)
|
||
user_id = getattr(user, 'id', 'N/A')
|
||
banned_list += f"• {safe_username} (ID: {user_id})\n"
|
||
|
||
if len(banned_users) > 50:
|
||
banned_list += f"\n... и еще {len(banned_users) - 50} пользователей"
|
||
|
||
await message.answer(banned_list, parse_mode=None)
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ Ошибка при получении списка забаненных: {e}")
|
||
await message.answer("❌ Не удалось получить список забаненных пользователей")
|