From 91bae2612fb799d18250a93f86daef78f8f1f0cc Mon Sep 17 00:00:00 2001 From: Verum Date: Mon, 23 Feb 2026 14:36:26 +0700 Subject: [PATCH] =?UTF-8?q?=D0=9E=D1=81=D0=BD=D0=BE=D0=B2=D0=BD=D1=8B?= =?UTF-8?q?=D0=B5=20=D0=BA=D0=BE=D0=BC=D0=B0=D0=BD=D0=B4=D1=8B=20=D0=B4?= =?UTF-8?q?=D0=BB=D1=8F=20=D1=83=D0=BF=D1=80=D0=B0=D0=B2=D0=BB=D0=B5=D0=BD?= =?UTF-8?q?=D0=B8=D1=8F=20=D0=B1=D0=B0=D0=BD=D0=BB=D0=B8=D1=81=D1=82=D0=BE?= =?UTF-8?q?=D0=BC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bot/handlers/commands/users/word.py | 540 ++++++++++++++++++++++++++++ 1 file changed, 540 insertions(+) create mode 100644 bot/handlers/commands/users/word.py diff --git a/bot/handlers/commands/users/word.py b/bot/handlers/commands/users/word.py new file mode 100644 index 0000000..5a5d82a --- /dev/null +++ b/bot/handlers/commands/users/word.py @@ -0,0 +1,540 @@ +""" +Обработчики команд добавления и удаления банвордов +""" +from aiogram import Router +from aiogram.filters import Command +from aiogram.types import Message + +from bot.filters.admin import IsAdmin +from configs import settings, COMMANDS +from database import get_manager +from database.models import BanWordType +from middleware.loggers import logger +from bot.utils.decorators import log_action + +__all__ = ("router",) + +router: Router = Router(name="manage_words_router") + + +# ================= ВСПОМОГАТЕЛЬНЫЕ ФУНКЦИИ ================= + +def parse_args(text: str, command: str, min_args: int = 1, max_args: int = 2) -> tuple[bool, str | list]: + """ + Парсит аргументы команды. + + Args: + text: Полный текст сообщения + command: Название команды + min_args: Минимальное количество аргументов + max_args: Максимальное количество аргументов + + Returns: + (success, result): result это либо список аргументов, либо текст ошибки + """ + # Убираем команду из текста + parts = text.split(maxsplit=max_args) + + if len(parts) < min_args + 1: + usage = f"/{command} <слово>" if min_args == 1 else f"/{command} <слово> <минуты>" + return False, f"❌ Использование: {usage}" + + args = parts[1:] + + # Валидация длины слова + if args and len(args[0]) < 2: + return False, "❌ Слово должно содержать минимум 2 символа" + + if args and len(args[0]) > 100: + return False, "❌ Слово слишком длинное (максимум 100 символов)" + + return True, args + + +def format_success_message(action: str, word: str, word_type: str, extra: str = "") -> str: + """Форматирует сообщение об успехе""" + emoji_map = { + 'добавлена': '✅', + 'добавлен': '✅', + 'добавлено': '✅', + 'удалена': '🗑', + 'удален': '🗑', + 'удалено': '🗑' + } + + emoji = emoji_map.get(action, '✅') + + message = f"{emoji} {word_type.capitalize()} {word} {action}" + + if extra: + message += f"\n{extra}" + + return message + + +def format_time(minutes: int) -> str: + """Форматирует время в читаемый вид""" + if minutes < 60: + return f"{minutes} мин" + elif minutes < 1440: + hours = minutes // 60 + mins = minutes % 60 + return f"{hours}ч {mins}м" if mins else f"{hours}ч" + else: + days = minutes // 1440 + hours = (minutes % 1440) // 60 + return f"{days}д {hours}ч" if hours else f"{days}д" + + +# ================= КОМАНДЫ ДОБАВЛЕНИЯ ================= + +@router.message(Command(*COMMANDS.get("addword", ["addword"]), prefix=settings.PREFIX, ignore_case=True), IsAdmin()) +@log_action(action_name="ADD_WORD", log_args=True) +async def add_word_cmd(message: Message) -> None: + """ + Добавляет банворд-подстроку (постоянно). + + Использование: /addword <слово> + """ + success, result = parse_args(message.text, "addword", min_args=1, max_args=1) + + if not success: + await message.answer(result, parse_mode="HTML") + return + + word = result[0].lower().strip() + manager = get_manager() + + try: + added = await manager.add_banword( + word=word, + word_type=BanWordType.SUBSTRING, + added_by=message.from_user.id, + reason=f"Добавлено через команду" + ) + + if added: + text = format_success_message( + "добавлена", + word, + "подстрока", + "🔍 Тип проверки: простой поиск в тексте" + ) + else: + text = f"⚠️ Подстрока {word} уже существует" + + await message.answer(text, parse_mode="HTML") + + except Exception as e: + logger.error(f"Ошибка добавления банворда: {e}", log_type="CMD", exc_info=True) + await message.answer("❌ Ошибка добавления\n\nПопробуйте позже", parse_mode="HTML") + + +@router.message(Command(*COMMANDS.get("addlemma", ["addlemma"]), prefix=settings.PREFIX, ignore_case=True), IsAdmin()) +@log_action(action_name="ADD_LEMMA", log_args=True) +async def add_lemma_cmd(message: Message) -> None: + """ + Добавляет банворд-лемму (постоянно). + + Использование: /addlemma <слово> + """ + success, result = parse_args(message.text, "addlemma", min_args=1, max_args=1) + + if not success: + await message.answer(result, parse_mode="HTML") + return + + word = result[0].lower().strip() + manager = get_manager() + + try: + added = await manager.add_banword( + word=word, + word_type=BanWordType.LEMMA, + added_by=message.from_user.id, + reason=f"Добавлено через команду" + ) + + if added: + text = format_success_message( + "добавлена", + word, + "лемма", + "🔤 Тип проверки: все формы слова (купить→куплю, купил, купишь...)" + ) + else: + text = f"⚠️ Лемма {word} уже существует" + + await message.answer(text, parse_mode="HTML") + + except Exception as e: + logger.error(f"Ошибка добавления леммы: {e}", log_type="CMD", exc_info=True) + await message.answer("❌ Ошибка добавления\n\nПопробуйте позже", parse_mode="HTML") + + +@router.message(Command(*COMMANDS.get("addpart", ["addpart"]), prefix=settings.PREFIX, ignore_case=True), IsAdmin()) +@log_action(action_name="ADD_PART", log_args=True) +async def add_part_cmd(message: Message) -> None: + """ + Добавляет банворд-часть (постоянно). + + Использование: /addpart <комбинация> + """ + success, result = parse_args(message.text, "addpart", min_args=1, max_args=1) + + if not success: + await message.answer(result, parse_mode="HTML") + return + + word = result[0].lower().strip() + manager = get_manager() + + try: + added = await manager.add_banword( + word=word, + word_type=BanWordType.PART, + added_by=message.from_user.id, + reason=f"Добавлено через команду" + ) + + if added: + text = format_success_message( + "добавлена", + word, + "часть", + "🧩 Тип проверки: поиск без пробелов (обходит \"к у п и т ь\")" + ) + else: + text = f"⚠️ Часть {word} уже существует" + + await message.answer(text, parse_mode="HTML") + + except Exception as e: + logger.error(f"Ошибка добавления части: {e}", log_type="CMD", exc_info=True) + await message.answer("❌ Ошибка добавления\n\nПопробуйте позже", parse_mode="HTML") + + +@router.message(Command(*COMMANDS.get("addtempword", ["addtempword"]), prefix=settings.PREFIX, ignore_case=True), + IsAdmin()) +@log_action(action_name="ADD_TEMP_WORD", log_args=True) +async def add_temp_word_cmd(message: Message) -> None: + """ + Добавляет временную банворд-подстроку. + + Использование: /addtempword <слово> <минуты> + """ + success, result = parse_args(message.text, "addtempword", min_args=2, max_args=2) + + if not success: + await message.answer(result, parse_mode="HTML") + return + + word = result[0].lower().strip() + + # Валидация минут + try: + minutes = int(result[1]) + if minutes < 1 or minutes > 10080: # Максимум неделя + await message.answer("❌ Время должно быть от 1 минуты до 10080 минут (7 дней)", parse_mode="HTML") + return + except ValueError: + await message.answer("❌ Неверный формат времени. Укажите число минут", parse_mode="HTML") + return + + manager = get_manager() + + try: + added = await manager.add_temp_banword( + word=word, + word_type=BanWordType.SUBSTRING, + minutes=minutes, + added_by=message.from_user.id + ) + + if added: + time_str = format_time(minutes) + text = format_success_message( + "добавлена", + word, + "временная подстрока", + f"⏱ Автоматически удалится через {time_str}" + ) + else: + text = f"⚠️ Временная подстрока {word} уже существует" + + await message.answer(text, parse_mode="HTML") + + except Exception as e: + logger.error(f"Ошибка добавления временного банворда: {e}", log_type="CMD", exc_info=True) + await message.answer("❌ Ошибка добавления\n\nПопробуйте позже", parse_mode="HTML") + + +@router.message(Command(*COMMANDS.get("addtemplemma", ["addtemplemma"]), prefix=settings.PREFIX, ignore_case=True), + IsAdmin()) +@log_action(action_name="ADD_TEMP_LEMMA", log_args=True) +async def add_temp_lemma_cmd(message: Message) -> None: + """ + Добавляет временную банворд-лемму. + + Использование: /addtemplemma <слово> <минуты> + """ + success, result = parse_args(message.text, "addtemplemma", min_args=2, max_args=2) + + if not success: + await message.answer(result, parse_mode="HTML") + return + + word = result[0].lower().strip() + + try: + minutes = int(result[1]) + if minutes < 1 or minutes > 10080: + await message.answer("❌ Время должно быть от 1 минуты до 10080 минут (7 дней)", parse_mode="HTML") + return + except ValueError: + await message.answer("❌ Неверный формат времени. Укажите число минут", parse_mode="HTML") + return + + manager = get_manager() + + try: + added = await manager.add_temp_banword( + word=word, + word_type=BanWordType.LEMMA, + minutes=minutes, + added_by=message.from_user.id + ) + + if added: + time_str = format_time(minutes) + text = format_success_message( + "добавлена", + word, + "временная лемма", + f"⏱ Автоматически удалится через {time_str}" + ) + else: + text = f"⚠️ Временная лемма {word} уже существует" + + await message.answer(text, parse_mode="HTML") + + except Exception as e: + logger.error(f"Ошибка добавления временной леммы: {e}", log_type="CMD", exc_info=True) + await message.answer("❌ Ошибка добавления\n\nПопробуйте позже", parse_mode="HTML") + + +@router.message(Command(*COMMANDS.get("addexcept", ["addexcept"]), prefix=settings.PREFIX, ignore_case=True), IsAdmin()) +@log_action(action_name="ADD_EXCEPTION", log_args=True) +async def add_exception_cmd(message: Message) -> None: + """ + Добавляет исключение в whitelist. + + Использование: /addexcept <текст> + """ + success, result = parse_args(message.text, "addexcept", min_args=1, max_args=1) + + if not success: + await message.answer(result, parse_mode="HTML") + return + + word = result[0].lower().strip() + manager = get_manager() + + try: + added = await manager.add_whitelist( + word=word, + added_by=message.from_user.id, + reason="Добавлено через команду" + ) + + if added: + text = format_success_message( + "добавлено", + word, + "исключение", + "✅ Сообщения с этим текстом не будут проверяться" + ) + else: + text = f"⚠️ Исключение {word} уже существует" + + await message.answer(text, parse_mode="HTML") + + except Exception as e: + logger.error(f"Ошибка добавления исключения: {e}", log_type="CMD", exc_info=True) + await message.answer("❌ Ошибка добавления\n\nПопробуйте позже", parse_mode="HTML") + + +# ================= КОМАНДЫ УДАЛЕНИЯ ================= + +@router.message(Command(*COMMANDS.get("remword", ["remword"]), prefix=settings.PREFIX, ignore_case=True), IsAdmin()) +@log_action(action_name="REMOVE_WORD", log_args=True) +async def remove_word_cmd(message: Message) -> None: + """ + Удаляет банворд-подстроку. + + Использование: /remword <слово> + """ + success, result = parse_args(message.text, "remword", min_args=1, max_args=1) + + if not success: + await message.answer(result, parse_mode="HTML") + return + + word = result[0].lower().strip() + manager = get_manager() + + try: + removed = await manager.remove_banword(word=word, word_type=BanWordType.SUBSTRING) + + if removed: + text = format_success_message("удалена", word, "подстрока") + else: + text = f"⚠️ Подстрока {word} не найдена" + + await message.answer(text, parse_mode="HTML") + + except Exception as e: + logger.error(f"Ошибка удаления банворда: {e}", log_type="CMD", exc_info=True) + await message.answer("❌ Ошибка удаления\n\nПопробуйте позже", parse_mode="HTML") + + +@router.message(Command(*COMMANDS.get("remlemma", ["remlemma"]), prefix=settings.PREFIX, ignore_case=True), IsAdmin()) +@log_action(action_name="REMOVE_LEMMA", log_args=True) +async def remove_lemma_cmd(message: Message) -> None: + """Удаляет банворд-лемму""" + success, result = parse_args(message.text, "remlemma", min_args=1, max_args=1) + + if not success: + await message.answer(result, parse_mode="HTML") + return + + word = result[0].lower().strip() + manager = get_manager() + + try: + removed = await manager.remove_banword(word=word, word_type=BanWordType.LEMMA) + + if removed: + text = format_success_message("удалена", word, "лемма") + else: + text = f"⚠️ Лемма {word} не найдена" + + await message.answer(text, parse_mode="HTML") + + except Exception as e: + logger.error(f"Ошибка удаления леммы: {e}", log_type="CMD", exc_info=True) + await message.answer("❌ Ошибка удаления\n\nПопробуйте позже", parse_mode="HTML") + + +@router.message(Command(*COMMANDS.get("rempart", ["rempart"]), prefix=settings.PREFIX, ignore_case=True), IsAdmin()) +@log_action(action_name="REMOVE_PART", log_args=True) +async def remove_part_cmd(message: Message) -> None: + """Удаляет банворд-часть""" + success, result = parse_args(message.text, "rempart", min_args=1, max_args=1) + + if not success: + await message.answer(result, parse_mode="HTML") + return + + word = result[0].lower().strip() + manager = get_manager() + + try: + removed = await manager.remove_banword(word=word, word_type=BanWordType.PART) + + if removed: + text = format_success_message("удалена", word, "часть") + else: + text = f"⚠️ Часть {word} не найдена" + + await message.answer(text, parse_mode="HTML") + + except Exception as e: + logger.error(f"Ошибка удаления части: {e}", log_type="CMD", exc_info=True) + await message.answer("❌ Ошибка удаления\n\nПопробуйте позже", parse_mode="HTML") + + +@router.message(Command(*COMMANDS.get("remtempword", ["remtempword"]), prefix=settings.PREFIX, ignore_case=True), + IsAdmin()) +@log_action(action_name="REMOVE_TEMP_WORD", log_args=True) +async def remove_temp_word_cmd(message: Message) -> None: + """Удаляет временную подстроку""" + success, result = parse_args(message.text, "remtempword", min_args=1, max_args=1) + + if not success: + await message.answer(result, parse_mode="HTML") + return + + word = result[0].lower().strip() + manager = get_manager() + + try: + removed = await manager.remove_temp_banword(word=word, word_type=BanWordType.SUBSTRING) + + if removed: + text = format_success_message("удалена", word, "временная подстрока") + else: + text = f"⚠️ Временная подстрока {word} не найдена" + + await message.answer(text, parse_mode="HTML") + + except Exception as e: + logger.error(f"Ошибка удаления временного банворда: {e}", log_type="CMD", exc_info=True) + await message.answer("❌ Ошибка удаления\n\nПопробуйте позже", parse_mode="HTML") + + +@router.message(Command(*COMMANDS.get("remtemplemma", ["remtemplemma"]), prefix=settings.PREFIX, ignore_case=True), + IsAdmin()) +@log_action(action_name="REMOVE_TEMP_LEMMA", log_args=True) +async def remove_temp_lemma_cmd(message: Message) -> None: + """Удаляет временную лемму""" + success, result = parse_args(message.text, "remtemplemma", min_args=1, max_args=1) + + if not success: + await message.answer(result, parse_mode="HTML") + return + + word = result[0].lower().strip() + manager = get_manager() + + try: + removed = await manager.remove_temp_banword(word=word, word_type=BanWordType.LEMMA) + + if removed: + text = format_success_message("удалена", word, "временная лемма") + else: + text = f"⚠️ Временная лемма {word} не найдена" + + await message.answer(text, parse_mode="HTML") + + except Exception as e: + logger.error(f"Ошибка удаления временной леммы: {e}", log_type="CMD", exc_info=True) + await message.answer("❌ Ошибка удаления\n\nПопробуйте позже", parse_mode="HTML") + + +@router.message(Command(*COMMANDS.get("remexcept", ["remexcept"]), prefix=settings.PREFIX, ignore_case=True), IsAdmin()) +@log_action(action_name="REMOVE_EXCEPTION", log_args=True) +async def remove_exception_cmd(message: Message) -> None: + """Удаляет исключение из whitelist""" + success, result = parse_args(message.text, "remexcept", min_args=1, max_args=1) + + if not success: + await message.answer(result, parse_mode="HTML") + return + + word = result[0].lower().strip() + manager = get_manager() + + try: + removed = await manager.remove_whitelist(word=word) + + if removed: + text = format_success_message("удалено", word, "исключение") + else: + text = f"⚠️ Исключение {word} не найдено" + + await message.answer(text, parse_mode="HTML") + + except Exception as e: + logger.error(f"Ошибка удаления исключения: {e}", log_type="CMD", exc_info=True) + await message.answer("❌ Ошибка удаления\n\nПопробуйте позже", parse_mode="HTML")