Files
PrimoGuardBot-/bot/handlers/commands/users/word.py

541 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Обработчики команд добавления и удаления банвордов
"""
from aiogram import Router
from aiogram.filters import Command
from aiogram.types import Message
from bot.filters.admin import IsAdmin
from configs import settings, COMMANDS
from database import get_manager
from database.models import BanWordType
from middleware.loggers import logger
from bot.utils.decorators import log_action
__all__ = ("router",)
router: Router = Router(name="manage_words_router")
# ================= ВСПОМОГАТЕЛЬНЫЕ ФУНКЦИИ =================
def parse_args(text: str, command: str, min_args: int = 1, max_args: int = 2) -> tuple[bool, str | list]:
"""
Парсит аргументы команды.
Args:
text: Полный текст сообщения
command: Название команды
min_args: Минимальное количество аргументов
max_args: Максимальное количество аргументов
Returns:
(success, result): result это либо список аргументов, либо текст ошибки
"""
# Убираем команду из текста
parts = text.split(maxsplit=max_args)
if len(parts) < min_args + 1:
usage = f"/{command} &lt;слово&gt;" if min_args == 1 else f"/{command} &lt;слово&gt; &lt;минуты&gt;"
return False, f"❌ Использование: <code>{usage}</code>"
args = parts[1:]
# Валидация длины слова
if args and len(args[0]) < 2:
return False, "❌ Слово должно содержать минимум 2 символа"
if args and len(args[0]) > 100:
return False, "❌ Слово слишком длинное (максимум 100 символов)"
return True, args
def format_success_message(action: str, word: str, word_type: str, extra: str = "") -> str:
"""Форматирует сообщение об успехе"""
emoji_map = {
'добавлена': '',
'добавлен': '',
'добавлено': '',
'удалена': '🗑',
'удален': '🗑',
'удалено': '🗑'
}
emoji = emoji_map.get(action, '')
message = f"{emoji} <b>{word_type.capitalize()}</b> <code>{word}</code> {action}"
if extra:
message += f"\n{extra}"
return message
def format_time(minutes: int) -> str:
"""Форматирует время в читаемый вид"""
if minutes < 60:
return f"{minutes} мин"
elif minutes < 1440:
hours = minutes // 60
mins = minutes % 60
return f"{hours}ч {mins}м" if mins else f"{hours}ч"
else:
days = minutes // 1440
hours = (minutes % 1440) // 60
return f"{days}д {hours}ч" if hours else f"{days}д"
# ================= КОМАНДЫ ДОБАВЛЕНИЯ =================
@router.message(Command(*COMMANDS.get("addword", ["addword"]), prefix=settings.PREFIX, ignore_case=True), IsAdmin())
@log_action(action_name="ADD_WORD", log_args=True)
async def add_word_cmd(message: Message) -> None:
"""
Добавляет банворд-подстроку (постоянно).
Использование: /addword <слово>
"""
success, result = parse_args(message.text, "addword", min_args=1, max_args=1)
if not success:
await message.answer(result, parse_mode="HTML")
return
word = result[0].lower().strip()
manager = get_manager()
try:
added = await manager.add_banword(
word=word,
word_type=BanWordType.SUBSTRING,
added_by=message.from_user.id,
reason=f"Добавлено через команду"
)
if added:
text = format_success_message(
"добавлена",
word,
"подстрока",
"🔍 Тип проверки: простой поиск в тексте"
)
else:
text = f"⚠️ Подстрока <code>{word}</code> уже существует"
await message.answer(text, parse_mode="HTML")
except Exception as e:
logger.error(f"Ошибка добавления банворда: {e}", log_type="CMD", exc_info=True)
await message.answer("❌ <b>Ошибка добавления</b>\n\nПопробуйте позже", parse_mode="HTML")
@router.message(Command(*COMMANDS.get("addlemma", ["addlemma"]), prefix=settings.PREFIX, ignore_case=True), IsAdmin())
@log_action(action_name="ADD_LEMMA", log_args=True)
async def add_lemma_cmd(message: Message) -> None:
"""
Добавляет банворд-лемму (постоянно).
Использование: /addlemma <слово>
"""
success, result = parse_args(message.text, "addlemma", min_args=1, max_args=1)
if not success:
await message.answer(result, parse_mode="HTML")
return
word = result[0].lower().strip()
manager = get_manager()
try:
added = await manager.add_banword(
word=word,
word_type=BanWordType.LEMMA,
added_by=message.from_user.id,
reason=f"Добавлено через команду"
)
if added:
text = format_success_message(
"добавлена",
word,
"лемма",
"🔤 Тип проверки: все формы слова (купить→куплю, купил, купишь...)"
)
else:
text = f"⚠️ Лемма <code>{word}</code> уже существует"
await message.answer(text, parse_mode="HTML")
except Exception as e:
logger.error(f"Ошибка добавления леммы: {e}", log_type="CMD", exc_info=True)
await message.answer("❌ <b>Ошибка добавления</b>\n\nПопробуйте позже", parse_mode="HTML")
@router.message(Command(*COMMANDS.get("addpart", ["addpart"]), prefix=settings.PREFIX, ignore_case=True), IsAdmin())
@log_action(action_name="ADD_PART", log_args=True)
async def add_part_cmd(message: Message) -> None:
"""
Добавляет банворд-часть (постоянно).
Использование: /addpart <комбинация>
"""
success, result = parse_args(message.text, "addpart", min_args=1, max_args=1)
if not success:
await message.answer(result, parse_mode="HTML")
return
word = result[0].lower().strip()
manager = get_manager()
try:
added = await manager.add_banword(
word=word,
word_type=BanWordType.PART,
added_by=message.from_user.id,
reason=f"Добавлено через команду"
)
if added:
text = format_success_message(
"добавлена",
word,
"часть",
"🧩 Тип проверки: поиск без пробелов (обходит \"к у п и т ь\")"
)
else:
text = f"⚠️ Часть <code>{word}</code> уже существует"
await message.answer(text, parse_mode="HTML")
except Exception as e:
logger.error(f"Ошибка добавления части: {e}", log_type="CMD", exc_info=True)
await message.answer("❌ <b>Ошибка добавления</b>\n\nПопробуйте позже", parse_mode="HTML")
@router.message(Command(*COMMANDS.get("addtempword", ["addtempword"]), prefix=settings.PREFIX, ignore_case=True),
IsAdmin())
@log_action(action_name="ADD_TEMP_WORD", log_args=True)
async def add_temp_word_cmd(message: Message) -> None:
"""
Добавляет временную банворд-подстроку.
Использование: /addtempword <слово> <минуты>
"""
success, result = parse_args(message.text, "addtempword", min_args=2, max_args=2)
if not success:
await message.answer(result, parse_mode="HTML")
return
word = result[0].lower().strip()
# Валидация минут
try:
minutes = int(result[1])
if minutes < 1 or minutes > 10080: # Максимум неделя
await message.answer("❌ Время должно быть от 1 минуты до 10080 минут (7 дней)", parse_mode="HTML")
return
except ValueError:
await message.answer("❌ Неверный формат времени. Укажите число минут", parse_mode="HTML")
return
manager = get_manager()
try:
added = await manager.add_temp_banword(
word=word,
word_type=BanWordType.SUBSTRING,
minutes=minutes,
added_by=message.from_user.id
)
if added:
time_str = format_time(minutes)
text = format_success_message(
"добавлена",
word,
"временная подстрока",
f"⏱ Автоматически удалится через {time_str}"
)
else:
text = f"⚠️ Временная подстрока <code>{word}</code> уже существует"
await message.answer(text, parse_mode="HTML")
except Exception as e:
logger.error(f"Ошибка добавления временного банворда: {e}", log_type="CMD", exc_info=True)
await message.answer("❌ <b>Ошибка добавления</b>\n\nПопробуйте позже", parse_mode="HTML")
@router.message(Command(*COMMANDS.get("addtemplemma", ["addtemplemma"]), prefix=settings.PREFIX, ignore_case=True),
IsAdmin())
@log_action(action_name="ADD_TEMP_LEMMA", log_args=True)
async def add_temp_lemma_cmd(message: Message) -> None:
"""
Добавляет временную банворд-лемму.
Использование: /addtemplemma <слово> <минуты>
"""
success, result = parse_args(message.text, "addtemplemma", min_args=2, max_args=2)
if not success:
await message.answer(result, parse_mode="HTML")
return
word = result[0].lower().strip()
try:
minutes = int(result[1])
if minutes < 1 or minutes > 10080:
await message.answer("❌ Время должно быть от 1 минуты до 10080 минут (7 дней)", parse_mode="HTML")
return
except ValueError:
await message.answer("❌ Неверный формат времени. Укажите число минут", parse_mode="HTML")
return
manager = get_manager()
try:
added = await manager.add_temp_banword(
word=word,
word_type=BanWordType.LEMMA,
minutes=minutes,
added_by=message.from_user.id
)
if added:
time_str = format_time(minutes)
text = format_success_message(
"добавлена",
word,
"временная лемма",
f"⏱ Автоматически удалится через {time_str}"
)
else:
text = f"⚠️ Временная лемма <code>{word}</code> уже существует"
await message.answer(text, parse_mode="HTML")
except Exception as e:
logger.error(f"Ошибка добавления временной леммы: {e}", log_type="CMD", exc_info=True)
await message.answer("❌ <b>Ошибка добавления</b>\n\nПопробуйте позже", parse_mode="HTML")
@router.message(Command(*COMMANDS.get("addexcept", ["addexcept"]), prefix=settings.PREFIX, ignore_case=True), IsAdmin())
@log_action(action_name="ADD_EXCEPTION", log_args=True)
async def add_exception_cmd(message: Message) -> None:
"""
Добавляет исключение в whitelist.
Использование: /addexcept <текст>
"""
success, result = parse_args(message.text, "addexcept", min_args=1, max_args=1)
if not success:
await message.answer(result, parse_mode="HTML")
return
word = result[0].lower().strip()
manager = get_manager()
try:
added = await manager.add_whitelist(
word=word,
added_by=message.from_user.id,
reason="Добавлено через команду"
)
if added:
text = format_success_message(
"добавлено",
word,
"исключение",
"✅ Сообщения с этим текстом не будут проверяться"
)
else:
text = f"⚠️ Исключение <code>{word}</code> уже существует"
await message.answer(text, parse_mode="HTML")
except Exception as e:
logger.error(f"Ошибка добавления исключения: {e}", log_type="CMD", exc_info=True)
await message.answer("❌ <b>Ошибка добавления</b>\n\nПопробуйте позже", parse_mode="HTML")
# ================= КОМАНДЫ УДАЛЕНИЯ =================
@router.message(Command(*COMMANDS.get("remword", ["remword"]), prefix=settings.PREFIX, ignore_case=True), IsAdmin())
@log_action(action_name="REMOVE_WORD", log_args=True)
async def remove_word_cmd(message: Message) -> None:
"""
Удаляет банворд-подстроку.
Использование: /remword <слово>
"""
success, result = parse_args(message.text, "remword", min_args=1, max_args=1)
if not success:
await message.answer(result, parse_mode="HTML")
return
word = result[0].lower().strip()
manager = get_manager()
try:
removed = await manager.remove_banword(word=word, word_type=BanWordType.SUBSTRING)
if removed:
text = format_success_message("удалена", word, "подстрока")
else:
text = f"⚠️ Подстрока <code>{word}</code> не найдена"
await message.answer(text, parse_mode="HTML")
except Exception as e:
logger.error(f"Ошибка удаления банворда: {e}", log_type="CMD", exc_info=True)
await message.answer("❌ <b>Ошибка удаления</b>\n\nПопробуйте позже", parse_mode="HTML")
@router.message(Command(*COMMANDS.get("remlemma", ["remlemma"]), prefix=settings.PREFIX, ignore_case=True), IsAdmin())
@log_action(action_name="REMOVE_LEMMA", log_args=True)
async def remove_lemma_cmd(message: Message) -> None:
"""Удаляет банворд-лемму"""
success, result = parse_args(message.text, "remlemma", min_args=1, max_args=1)
if not success:
await message.answer(result, parse_mode="HTML")
return
word = result[0].lower().strip()
manager = get_manager()
try:
removed = await manager.remove_banword(word=word, word_type=BanWordType.LEMMA)
if removed:
text = format_success_message("удалена", word, "лемма")
else:
text = f"⚠️ Лемма <code>{word}</code> не найдена"
await message.answer(text, parse_mode="HTML")
except Exception as e:
logger.error(f"Ошибка удаления леммы: {e}", log_type="CMD", exc_info=True)
await message.answer("❌ <b>Ошибка удаления</b>\n\nПопробуйте позже", parse_mode="HTML")
@router.message(Command(*COMMANDS.get("rempart", ["rempart"]), prefix=settings.PREFIX, ignore_case=True), IsAdmin())
@log_action(action_name="REMOVE_PART", log_args=True)
async def remove_part_cmd(message: Message) -> None:
"""Удаляет банворд-часть"""
success, result = parse_args(message.text, "rempart", min_args=1, max_args=1)
if not success:
await message.answer(result, parse_mode="HTML")
return
word = result[0].lower().strip()
manager = get_manager()
try:
removed = await manager.remove_banword(word=word, word_type=BanWordType.PART)
if removed:
text = format_success_message("удалена", word, "часть")
else:
text = f"⚠️ Часть <code>{word}</code> не найдена"
await message.answer(text, parse_mode="HTML")
except Exception as e:
logger.error(f"Ошибка удаления части: {e}", log_type="CMD", exc_info=True)
await message.answer("❌ <b>Ошибка удаления</b>\n\nПопробуйте позже", parse_mode="HTML")
@router.message(Command(*COMMANDS.get("remtempword", ["remtempword"]), prefix=settings.PREFIX, ignore_case=True),
IsAdmin())
@log_action(action_name="REMOVE_TEMP_WORD", log_args=True)
async def remove_temp_word_cmd(message: Message) -> None:
"""Удаляет временную подстроку"""
success, result = parse_args(message.text, "remtempword", min_args=1, max_args=1)
if not success:
await message.answer(result, parse_mode="HTML")
return
word = result[0].lower().strip()
manager = get_manager()
try:
removed = await manager.remove_temp_banword(word=word, word_type=BanWordType.SUBSTRING)
if removed:
text = format_success_message("удалена", word, "временная подстрока")
else:
text = f"⚠️ Временная подстрока <code>{word}</code> не найдена"
await message.answer(text, parse_mode="HTML")
except Exception as e:
logger.error(f"Ошибка удаления временного банворда: {e}", log_type="CMD", exc_info=True)
await message.answer("❌ <b>Ошибка удаления</b>\n\nПопробуйте позже", parse_mode="HTML")
@router.message(Command(*COMMANDS.get("remtemplemma", ["remtemplemma"]), prefix=settings.PREFIX, ignore_case=True),
IsAdmin())
@log_action(action_name="REMOVE_TEMP_LEMMA", log_args=True)
async def remove_temp_lemma_cmd(message: Message) -> None:
"""Удаляет временную лемму"""
success, result = parse_args(message.text, "remtemplemma", min_args=1, max_args=1)
if not success:
await message.answer(result, parse_mode="HTML")
return
word = result[0].lower().strip()
manager = get_manager()
try:
removed = await manager.remove_temp_banword(word=word, word_type=BanWordType.LEMMA)
if removed:
text = format_success_message("удалена", word, "временная лемма")
else:
text = f"⚠️ Временная лемма <code>{word}</code> не найдена"
await message.answer(text, parse_mode="HTML")
except Exception as e:
logger.error(f"Ошибка удаления временной леммы: {e}", log_type="CMD", exc_info=True)
await message.answer("❌ <b>Ошибка удаления</b>\n\nПопробуйте позже", parse_mode="HTML")
@router.message(Command(*COMMANDS.get("remexcept", ["remexcept"]), prefix=settings.PREFIX, ignore_case=True), IsAdmin())
@log_action(action_name="REMOVE_EXCEPTION", log_args=True)
async def remove_exception_cmd(message: Message) -> None:
"""Удаляет исключение из whitelist"""
success, result = parse_args(message.text, "remexcept", min_args=1, max_args=1)
if not success:
await message.answer(result, parse_mode="HTML")
return
word = result[0].lower().strip()
manager = get_manager()
try:
removed = await manager.remove_whitelist(word=word)
if removed:
text = format_success_message("удалено", word, "исключение")
else:
text = f"⚠️ Исключение <code>{word}</code> не найдено"
await message.answer(text, parse_mode="HTML")
except Exception as e:
logger.error(f"Ошибка удаления исключения: {e}", log_type="CMD", exc_info=True)
await message.answer("❌ <b>Ошибка удаления</b>\n\nПопробуйте позже", parse_mode="HTML")