Основные команды для управления банлистом
This commit is contained in:
540
bot/handlers/commands/users/word.py
Normal file
540
bot/handlers/commands/users/word.py
Normal file
@@ -0,0 +1,540 @@
|
||||
"""
|
||||
Обработчики команд добавления и удаления банвордов
|
||||
"""
|
||||
from aiogram import Router
|
||||
from aiogram.filters import Command
|
||||
from aiogram.types import Message
|
||||
|
||||
from bot.filters.admin import IsAdmin
|
||||
from configs import settings, COMMANDS
|
||||
from database import get_manager
|
||||
from database.models import BanWordType
|
||||
from middleware.loggers import logger
|
||||
from bot.utils.decorators import log_action
|
||||
|
||||
__all__ = ("router",)
|
||||
|
||||
router: Router = Router(name="manage_words_router")
|
||||
|
||||
|
||||
# ================= ВСПОМОГАТЕЛЬНЫЕ ФУНКЦИИ =================
|
||||
|
||||
def parse_args(text: str, command: str, min_args: int = 1, max_args: int = 2) -> tuple[bool, str | list]:
|
||||
"""
|
||||
Парсит аргументы команды.
|
||||
|
||||
Args:
|
||||
text: Полный текст сообщения
|
||||
command: Название команды
|
||||
min_args: Минимальное количество аргументов
|
||||
max_args: Максимальное количество аргументов
|
||||
|
||||
Returns:
|
||||
(success, result): result это либо список аргументов, либо текст ошибки
|
||||
"""
|
||||
# Убираем команду из текста
|
||||
parts = text.split(maxsplit=max_args)
|
||||
|
||||
if len(parts) < min_args + 1:
|
||||
usage = f"/{command} <слово>" if min_args == 1 else f"/{command} <слово> <минуты>"
|
||||
return False, f"❌ Использование: <code>{usage}</code>"
|
||||
|
||||
args = parts[1:]
|
||||
|
||||
# Валидация длины слова
|
||||
if args and len(args[0]) < 2:
|
||||
return False, "❌ Слово должно содержать минимум 2 символа"
|
||||
|
||||
if args and len(args[0]) > 100:
|
||||
return False, "❌ Слово слишком длинное (максимум 100 символов)"
|
||||
|
||||
return True, args
|
||||
|
||||
|
||||
def format_success_message(action: str, word: str, word_type: str, extra: str = "") -> str:
|
||||
"""Форматирует сообщение об успехе"""
|
||||
emoji_map = {
|
||||
'добавлена': '✅',
|
||||
'добавлен': '✅',
|
||||
'добавлено': '✅',
|
||||
'удалена': '🗑',
|
||||
'удален': '🗑',
|
||||
'удалено': '🗑'
|
||||
}
|
||||
|
||||
emoji = emoji_map.get(action, '✅')
|
||||
|
||||
message = f"{emoji} <b>{word_type.capitalize()}</b> <code>{word}</code> {action}"
|
||||
|
||||
if extra:
|
||||
message += f"\n{extra}"
|
||||
|
||||
return message
|
||||
|
||||
|
||||
def format_time(minutes: int) -> str:
|
||||
"""Форматирует время в читаемый вид"""
|
||||
if minutes < 60:
|
||||
return f"{minutes} мин"
|
||||
elif minutes < 1440:
|
||||
hours = minutes // 60
|
||||
mins = minutes % 60
|
||||
return f"{hours}ч {mins}м" if mins else f"{hours}ч"
|
||||
else:
|
||||
days = minutes // 1440
|
||||
hours = (minutes % 1440) // 60
|
||||
return f"{days}д {hours}ч" if hours else f"{days}д"
|
||||
|
||||
|
||||
# ================= КОМАНДЫ ДОБАВЛЕНИЯ =================
|
||||
|
||||
@router.message(Command(*COMMANDS.get("addword", ["addword"]), prefix=settings.PREFIX, ignore_case=True), IsAdmin())
|
||||
@log_action(action_name="ADD_WORD", log_args=True)
|
||||
async def add_word_cmd(message: Message) -> None:
|
||||
"""
|
||||
Добавляет банворд-подстроку (постоянно).
|
||||
|
||||
Использование: /addword <слово>
|
||||
"""
|
||||
success, result = parse_args(message.text, "addword", min_args=1, max_args=1)
|
||||
|
||||
if not success:
|
||||
await message.answer(result, parse_mode="HTML")
|
||||
return
|
||||
|
||||
word = result[0].lower().strip()
|
||||
manager = get_manager()
|
||||
|
||||
try:
|
||||
added = await manager.add_banword(
|
||||
word=word,
|
||||
word_type=BanWordType.SUBSTRING,
|
||||
added_by=message.from_user.id,
|
||||
reason=f"Добавлено через команду"
|
||||
)
|
||||
|
||||
if added:
|
||||
text = format_success_message(
|
||||
"добавлена",
|
||||
word,
|
||||
"подстрока",
|
||||
"🔍 Тип проверки: простой поиск в тексте"
|
||||
)
|
||||
else:
|
||||
text = f"⚠️ Подстрока <code>{word}</code> уже существует"
|
||||
|
||||
await message.answer(text, parse_mode="HTML")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Ошибка добавления банворда: {e}", log_type="CMD", exc_info=True)
|
||||
await message.answer("❌ <b>Ошибка добавления</b>\n\nПопробуйте позже", parse_mode="HTML")
|
||||
|
||||
|
||||
@router.message(Command(*COMMANDS.get("addlemma", ["addlemma"]), prefix=settings.PREFIX, ignore_case=True), IsAdmin())
|
||||
@log_action(action_name="ADD_LEMMA", log_args=True)
|
||||
async def add_lemma_cmd(message: Message) -> None:
|
||||
"""
|
||||
Добавляет банворд-лемму (постоянно).
|
||||
|
||||
Использование: /addlemma <слово>
|
||||
"""
|
||||
success, result = parse_args(message.text, "addlemma", min_args=1, max_args=1)
|
||||
|
||||
if not success:
|
||||
await message.answer(result, parse_mode="HTML")
|
||||
return
|
||||
|
||||
word = result[0].lower().strip()
|
||||
manager = get_manager()
|
||||
|
||||
try:
|
||||
added = await manager.add_banword(
|
||||
word=word,
|
||||
word_type=BanWordType.LEMMA,
|
||||
added_by=message.from_user.id,
|
||||
reason=f"Добавлено через команду"
|
||||
)
|
||||
|
||||
if added:
|
||||
text = format_success_message(
|
||||
"добавлена",
|
||||
word,
|
||||
"лемма",
|
||||
"🔤 Тип проверки: все формы слова (купить→куплю, купил, купишь...)"
|
||||
)
|
||||
else:
|
||||
text = f"⚠️ Лемма <code>{word}</code> уже существует"
|
||||
|
||||
await message.answer(text, parse_mode="HTML")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Ошибка добавления леммы: {e}", log_type="CMD", exc_info=True)
|
||||
await message.answer("❌ <b>Ошибка добавления</b>\n\nПопробуйте позже", parse_mode="HTML")
|
||||
|
||||
|
||||
@router.message(Command(*COMMANDS.get("addpart", ["addpart"]), prefix=settings.PREFIX, ignore_case=True), IsAdmin())
|
||||
@log_action(action_name="ADD_PART", log_args=True)
|
||||
async def add_part_cmd(message: Message) -> None:
|
||||
"""
|
||||
Добавляет банворд-часть (постоянно).
|
||||
|
||||
Использование: /addpart <комбинация>
|
||||
"""
|
||||
success, result = parse_args(message.text, "addpart", min_args=1, max_args=1)
|
||||
|
||||
if not success:
|
||||
await message.answer(result, parse_mode="HTML")
|
||||
return
|
||||
|
||||
word = result[0].lower().strip()
|
||||
manager = get_manager()
|
||||
|
||||
try:
|
||||
added = await manager.add_banword(
|
||||
word=word,
|
||||
word_type=BanWordType.PART,
|
||||
added_by=message.from_user.id,
|
||||
reason=f"Добавлено через команду"
|
||||
)
|
||||
|
||||
if added:
|
||||
text = format_success_message(
|
||||
"добавлена",
|
||||
word,
|
||||
"часть",
|
||||
"🧩 Тип проверки: поиск без пробелов (обходит \"к у п и т ь\")"
|
||||
)
|
||||
else:
|
||||
text = f"⚠️ Часть <code>{word}</code> уже существует"
|
||||
|
||||
await message.answer(text, parse_mode="HTML")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Ошибка добавления части: {e}", log_type="CMD", exc_info=True)
|
||||
await message.answer("❌ <b>Ошибка добавления</b>\n\nПопробуйте позже", parse_mode="HTML")
|
||||
|
||||
|
||||
@router.message(Command(*COMMANDS.get("addtempword", ["addtempword"]), prefix=settings.PREFIX, ignore_case=True),
|
||||
IsAdmin())
|
||||
@log_action(action_name="ADD_TEMP_WORD", log_args=True)
|
||||
async def add_temp_word_cmd(message: Message) -> None:
|
||||
"""
|
||||
Добавляет временную банворд-подстроку.
|
||||
|
||||
Использование: /addtempword <слово> <минуты>
|
||||
"""
|
||||
success, result = parse_args(message.text, "addtempword", min_args=2, max_args=2)
|
||||
|
||||
if not success:
|
||||
await message.answer(result, parse_mode="HTML")
|
||||
return
|
||||
|
||||
word = result[0].lower().strip()
|
||||
|
||||
# Валидация минут
|
||||
try:
|
||||
minutes = int(result[1])
|
||||
if minutes < 1 or minutes > 10080: # Максимум неделя
|
||||
await message.answer("❌ Время должно быть от 1 минуты до 10080 минут (7 дней)", parse_mode="HTML")
|
||||
return
|
||||
except ValueError:
|
||||
await message.answer("❌ Неверный формат времени. Укажите число минут", parse_mode="HTML")
|
||||
return
|
||||
|
||||
manager = get_manager()
|
||||
|
||||
try:
|
||||
added = await manager.add_temp_banword(
|
||||
word=word,
|
||||
word_type=BanWordType.SUBSTRING,
|
||||
minutes=minutes,
|
||||
added_by=message.from_user.id
|
||||
)
|
||||
|
||||
if added:
|
||||
time_str = format_time(minutes)
|
||||
text = format_success_message(
|
||||
"добавлена",
|
||||
word,
|
||||
"временная подстрока",
|
||||
f"⏱ Автоматически удалится через {time_str}"
|
||||
)
|
||||
else:
|
||||
text = f"⚠️ Временная подстрока <code>{word}</code> уже существует"
|
||||
|
||||
await message.answer(text, parse_mode="HTML")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Ошибка добавления временного банворда: {e}", log_type="CMD", exc_info=True)
|
||||
await message.answer("❌ <b>Ошибка добавления</b>\n\nПопробуйте позже", parse_mode="HTML")
|
||||
|
||||
|
||||
@router.message(Command(*COMMANDS.get("addtemplemma", ["addtemplemma"]), prefix=settings.PREFIX, ignore_case=True),
|
||||
IsAdmin())
|
||||
@log_action(action_name="ADD_TEMP_LEMMA", log_args=True)
|
||||
async def add_temp_lemma_cmd(message: Message) -> None:
|
||||
"""
|
||||
Добавляет временную банворд-лемму.
|
||||
|
||||
Использование: /addtemplemma <слово> <минуты>
|
||||
"""
|
||||
success, result = parse_args(message.text, "addtemplemma", min_args=2, max_args=2)
|
||||
|
||||
if not success:
|
||||
await message.answer(result, parse_mode="HTML")
|
||||
return
|
||||
|
||||
word = result[0].lower().strip()
|
||||
|
||||
try:
|
||||
minutes = int(result[1])
|
||||
if minutes < 1 or minutes > 10080:
|
||||
await message.answer("❌ Время должно быть от 1 минуты до 10080 минут (7 дней)", parse_mode="HTML")
|
||||
return
|
||||
except ValueError:
|
||||
await message.answer("❌ Неверный формат времени. Укажите число минут", parse_mode="HTML")
|
||||
return
|
||||
|
||||
manager = get_manager()
|
||||
|
||||
try:
|
||||
added = await manager.add_temp_banword(
|
||||
word=word,
|
||||
word_type=BanWordType.LEMMA,
|
||||
minutes=minutes,
|
||||
added_by=message.from_user.id
|
||||
)
|
||||
|
||||
if added:
|
||||
time_str = format_time(minutes)
|
||||
text = format_success_message(
|
||||
"добавлена",
|
||||
word,
|
||||
"временная лемма",
|
||||
f"⏱ Автоматически удалится через {time_str}"
|
||||
)
|
||||
else:
|
||||
text = f"⚠️ Временная лемма <code>{word}</code> уже существует"
|
||||
|
||||
await message.answer(text, parse_mode="HTML")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Ошибка добавления временной леммы: {e}", log_type="CMD", exc_info=True)
|
||||
await message.answer("❌ <b>Ошибка добавления</b>\n\nПопробуйте позже", parse_mode="HTML")
|
||||
|
||||
|
||||
@router.message(Command(*COMMANDS.get("addexcept", ["addexcept"]), prefix=settings.PREFIX, ignore_case=True), IsAdmin())
|
||||
@log_action(action_name="ADD_EXCEPTION", log_args=True)
|
||||
async def add_exception_cmd(message: Message) -> None:
|
||||
"""
|
||||
Добавляет исключение в whitelist.
|
||||
|
||||
Использование: /addexcept <текст>
|
||||
"""
|
||||
success, result = parse_args(message.text, "addexcept", min_args=1, max_args=1)
|
||||
|
||||
if not success:
|
||||
await message.answer(result, parse_mode="HTML")
|
||||
return
|
||||
|
||||
word = result[0].lower().strip()
|
||||
manager = get_manager()
|
||||
|
||||
try:
|
||||
added = await manager.add_whitelist(
|
||||
word=word,
|
||||
added_by=message.from_user.id,
|
||||
reason="Добавлено через команду"
|
||||
)
|
||||
|
||||
if added:
|
||||
text = format_success_message(
|
||||
"добавлено",
|
||||
word,
|
||||
"исключение",
|
||||
"✅ Сообщения с этим текстом не будут проверяться"
|
||||
)
|
||||
else:
|
||||
text = f"⚠️ Исключение <code>{word}</code> уже существует"
|
||||
|
||||
await message.answer(text, parse_mode="HTML")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Ошибка добавления исключения: {e}", log_type="CMD", exc_info=True)
|
||||
await message.answer("❌ <b>Ошибка добавления</b>\n\nПопробуйте позже", parse_mode="HTML")
|
||||
|
||||
|
||||
# ================= КОМАНДЫ УДАЛЕНИЯ =================
|
||||
|
||||
@router.message(Command(*COMMANDS.get("remword", ["remword"]), prefix=settings.PREFIX, ignore_case=True), IsAdmin())
|
||||
@log_action(action_name="REMOVE_WORD", log_args=True)
|
||||
async def remove_word_cmd(message: Message) -> None:
|
||||
"""
|
||||
Удаляет банворд-подстроку.
|
||||
|
||||
Использование: /remword <слово>
|
||||
"""
|
||||
success, result = parse_args(message.text, "remword", min_args=1, max_args=1)
|
||||
|
||||
if not success:
|
||||
await message.answer(result, parse_mode="HTML")
|
||||
return
|
||||
|
||||
word = result[0].lower().strip()
|
||||
manager = get_manager()
|
||||
|
||||
try:
|
||||
removed = await manager.remove_banword(word=word, word_type=BanWordType.SUBSTRING)
|
||||
|
||||
if removed:
|
||||
text = format_success_message("удалена", word, "подстрока")
|
||||
else:
|
||||
text = f"⚠️ Подстрока <code>{word}</code> не найдена"
|
||||
|
||||
await message.answer(text, parse_mode="HTML")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Ошибка удаления банворда: {e}", log_type="CMD", exc_info=True)
|
||||
await message.answer("❌ <b>Ошибка удаления</b>\n\nПопробуйте позже", parse_mode="HTML")
|
||||
|
||||
|
||||
@router.message(Command(*COMMANDS.get("remlemma", ["remlemma"]), prefix=settings.PREFIX, ignore_case=True), IsAdmin())
|
||||
@log_action(action_name="REMOVE_LEMMA", log_args=True)
|
||||
async def remove_lemma_cmd(message: Message) -> None:
|
||||
"""Удаляет банворд-лемму"""
|
||||
success, result = parse_args(message.text, "remlemma", min_args=1, max_args=1)
|
||||
|
||||
if not success:
|
||||
await message.answer(result, parse_mode="HTML")
|
||||
return
|
||||
|
||||
word = result[0].lower().strip()
|
||||
manager = get_manager()
|
||||
|
||||
try:
|
||||
removed = await manager.remove_banword(word=word, word_type=BanWordType.LEMMA)
|
||||
|
||||
if removed:
|
||||
text = format_success_message("удалена", word, "лемма")
|
||||
else:
|
||||
text = f"⚠️ Лемма <code>{word}</code> не найдена"
|
||||
|
||||
await message.answer(text, parse_mode="HTML")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Ошибка удаления леммы: {e}", log_type="CMD", exc_info=True)
|
||||
await message.answer("❌ <b>Ошибка удаления</b>\n\nПопробуйте позже", parse_mode="HTML")
|
||||
|
||||
|
||||
@router.message(Command(*COMMANDS.get("rempart", ["rempart"]), prefix=settings.PREFIX, ignore_case=True), IsAdmin())
|
||||
@log_action(action_name="REMOVE_PART", log_args=True)
|
||||
async def remove_part_cmd(message: Message) -> None:
|
||||
"""Удаляет банворд-часть"""
|
||||
success, result = parse_args(message.text, "rempart", min_args=1, max_args=1)
|
||||
|
||||
if not success:
|
||||
await message.answer(result, parse_mode="HTML")
|
||||
return
|
||||
|
||||
word = result[0].lower().strip()
|
||||
manager = get_manager()
|
||||
|
||||
try:
|
||||
removed = await manager.remove_banword(word=word, word_type=BanWordType.PART)
|
||||
|
||||
if removed:
|
||||
text = format_success_message("удалена", word, "часть")
|
||||
else:
|
||||
text = f"⚠️ Часть <code>{word}</code> не найдена"
|
||||
|
||||
await message.answer(text, parse_mode="HTML")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Ошибка удаления части: {e}", log_type="CMD", exc_info=True)
|
||||
await message.answer("❌ <b>Ошибка удаления</b>\n\nПопробуйте позже", parse_mode="HTML")
|
||||
|
||||
|
||||
@router.message(Command(*COMMANDS.get("remtempword", ["remtempword"]), prefix=settings.PREFIX, ignore_case=True),
|
||||
IsAdmin())
|
||||
@log_action(action_name="REMOVE_TEMP_WORD", log_args=True)
|
||||
async def remove_temp_word_cmd(message: Message) -> None:
|
||||
"""Удаляет временную подстроку"""
|
||||
success, result = parse_args(message.text, "remtempword", min_args=1, max_args=1)
|
||||
|
||||
if not success:
|
||||
await message.answer(result, parse_mode="HTML")
|
||||
return
|
||||
|
||||
word = result[0].lower().strip()
|
||||
manager = get_manager()
|
||||
|
||||
try:
|
||||
removed = await manager.remove_temp_banword(word=word, word_type=BanWordType.SUBSTRING)
|
||||
|
||||
if removed:
|
||||
text = format_success_message("удалена", word, "временная подстрока")
|
||||
else:
|
||||
text = f"⚠️ Временная подстрока <code>{word}</code> не найдена"
|
||||
|
||||
await message.answer(text, parse_mode="HTML")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Ошибка удаления временного банворда: {e}", log_type="CMD", exc_info=True)
|
||||
await message.answer("❌ <b>Ошибка удаления</b>\n\nПопробуйте позже", parse_mode="HTML")
|
||||
|
||||
|
||||
@router.message(Command(*COMMANDS.get("remtemplemma", ["remtemplemma"]), prefix=settings.PREFIX, ignore_case=True),
|
||||
IsAdmin())
|
||||
@log_action(action_name="REMOVE_TEMP_LEMMA", log_args=True)
|
||||
async def remove_temp_lemma_cmd(message: Message) -> None:
|
||||
"""Удаляет временную лемму"""
|
||||
success, result = parse_args(message.text, "remtemplemma", min_args=1, max_args=1)
|
||||
|
||||
if not success:
|
||||
await message.answer(result, parse_mode="HTML")
|
||||
return
|
||||
|
||||
word = result[0].lower().strip()
|
||||
manager = get_manager()
|
||||
|
||||
try:
|
||||
removed = await manager.remove_temp_banword(word=word, word_type=BanWordType.LEMMA)
|
||||
|
||||
if removed:
|
||||
text = format_success_message("удалена", word, "временная лемма")
|
||||
else:
|
||||
text = f"⚠️ Временная лемма <code>{word}</code> не найдена"
|
||||
|
||||
await message.answer(text, parse_mode="HTML")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Ошибка удаления временной леммы: {e}", log_type="CMD", exc_info=True)
|
||||
await message.answer("❌ <b>Ошибка удаления</b>\n\nПопробуйте позже", parse_mode="HTML")
|
||||
|
||||
|
||||
@router.message(Command(*COMMANDS.get("remexcept", ["remexcept"]), prefix=settings.PREFIX, ignore_case=True), IsAdmin())
|
||||
@log_action(action_name="REMOVE_EXCEPTION", log_args=True)
|
||||
async def remove_exception_cmd(message: Message) -> None:
|
||||
"""Удаляет исключение из whitelist"""
|
||||
success, result = parse_args(message.text, "remexcept", min_args=1, max_args=1)
|
||||
|
||||
if not success:
|
||||
await message.answer(result, parse_mode="HTML")
|
||||
return
|
||||
|
||||
word = result[0].lower().strip()
|
||||
manager = get_manager()
|
||||
|
||||
try:
|
||||
removed = await manager.remove_whitelist(word=word)
|
||||
|
||||
if removed:
|
||||
text = format_success_message("удалено", word, "исключение")
|
||||
else:
|
||||
text = f"⚠️ Исключение <code>{word}</code> не найдено"
|
||||
|
||||
await message.answer(text, parse_mode="HTML")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Ошибка удаления исключения: {e}", log_type="CMD", exc_info=True)
|
||||
await message.answer("❌ <b>Ошибка удаления</b>\n\nПопробуйте позже", parse_mode="HTML")
|
||||
Reference in New Issue
Block a user