Files
PrimoWorldsBot/BotLibrary/samples/user_cmd_class.py
2025-02-28 13:49:14 +07:00

398 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# BotCode/routers/commands/user_cmd_class.py
# Класс-шаблон для создания новых команд
import inspect
from aiogram import Router, types, F
from aiogram.enums import ChatAction
from aiogram.filters import Command
from aiogram.types import InputMediaPhoto, InputMediaVideo, InputMediaDocument
from typing import Optional, Callable
from BotLibrary import Logs, valid_url, username
from ProjectsFiles import BotVar
from SQLite3 import base_sql
# Настройки экспорта в модули
__all__ = ("CommandHandler",)
class CommandHandler:
def __init__(self, name: str, keywords: list, func: Optional[list[Callable]] = None, text_msg=None, chat_action: bool = False,
description: str = "Описание команды", tg_links: bool = False,
keyboard=None, prefix=BotVar.prefix, callbackdata: list = None, only_admin: bool = False,
ignore_case: bool = True, activate_keywoards: bool = True,
activate_commands: bool = True, activate_callback: bool = True,
media: str = "message", path_to_media=None, parse_mode: str = BotVar.parse_mode,
disable_notification: bool = BotVar.disable_notification, protect: bool = BotVar.protect_content):
self.router = Router(name=f"{name}_router")
self.name = name
self.log_type = name.capitalize()
self.description = description
self.keywords = keywords
self.text_msg = text_msg
self.parse_mode = parse_mode
self.keyboard = keyboard
self.chat_action = chat_action
self.disable_notification = disable_notification
self.media = media.lower()
self.protect = protect
self.only_admin = only_admin
self.func = func
# Поддержка до 10 медиафайлов через список
if path_to_media is None:
self.path_to_media = []
elif isinstance(path_to_media, (str, types.FSInputFile)):
self.path_to_media = [path_to_media]
elif isinstance(path_to_media, list):
self.path_to_media = path_to_media[:10] # Ограничение до 10 элементов
self.tg_links = tg_links
if callbackdata == "keywords":
self.callbackdata = keywords
else:
self.callbackdata = callbackdata
# Привязываем хэндлер к роутеру
if activate_commands:
self.router.message(Command(*keywords, prefix=prefix, ignore_case=ignore_case))(self.handler)
if activate_keywoards:
self.router.message(F.text.lower().in_(keywords))(self.handler)
if activate_callback and self.callbackdata:
self.router.message(F.text.lower().in_(self.callbackdata))(self.handler)
async def handler(self, message: types.Message):
"""Основной хэндлер команды."""
try:
# Извлекаем текст после команды
command_text = message.text[len(message.text.split()[0]):].strip() # Убираем команду из текста
args = command_text.split() # Разделяем команду на аргументы
# Проверка на выполнение дополнительной функции (если она есть)
if self.func: # Проверяем, что функция не None
# Выполняем все функции из списка, передавая команду
for func_item in self.func:
# Используем inspect для получения информации о функции
if callable(func_item):
signature = inspect.signature(func_item)
# Если функция ожидает аргументы
if len(signature.parameters) > 0:
await func_item(message, *args) # Передаем аргументы функции
else:
await func_item(message) # Если функция не требует аргументов, просто вызываем её
# Обрабатываем текстовое сообщение
if callable(self.text_msg):
text = self.text_msg()
else:
text = self.text_msg
# Обрабатываем tg_links
if self.tg_links:
text = text.replace("<users>", str(message.from_user.id))
# Обрабатываем текстовое сообщение
if callable(self.text_msg):
text = self.text_msg()
else:
text = self.text_msg
# Обрабатываем tg_links
if self.tg_links:
text = text.replace("<users>", str(message.from_user.id))
Logs.info(log_type=self.log_type, user=username(message), text=f"использовал(а) команду /{self.name}")
await base_sql(message)
# Обрабатываем текстовое сообщение
if callable(self.text_msg):
text = self.text_msg()
else:
text = self.text_msg
# Обрабатываем tg_links
if self.tg_links:
text = text.replace("<users>", str(message.from_user.id))
if self.media == "message":
await message.reply(
text=text,
reply_markup=self.keyboard() if self.keyboard else None,
parse_mode=self.parse_mode,
disable_notification=self.disable_notification,
protect_content=self.protect,
)
if self.chat_action:
await message.bot.send_chat_action(
chat_id=message.chat.id,
action=ChatAction.TYPING,
)
elif self.media == "photo" and len(self.path_to_media) > 1:
# Отправка медиагруппы для фотографий
media_group = []
for media_path in self.path_to_media:
url = valid_url(media_path)
if url:
media_group.append(InputMediaPhoto(media=media_path))
else:
media_group.append(InputMediaPhoto(media=types.FSInputFile(path=media_path)))
# Добавляем подпись и клавиатуру к последнему элементу
media_group[-1].caption = text
media_group[-1].parse_mode = self.parse_mode
await message.reply_media_group(
media=media_group,
disable_notification=self.disable_notification,
protect_content=self.protect,
)
# Отправка клавиатуры отдельным сообщением, если есть
if self.keyboard:
await message.reply(
text=" ",
reply_markup=self.keyboard(),
disable_notification=self.disable_notification,
)
if self.chat_action:
await message.bot.send_chat_action(
chat_id=message.chat.id,
action=ChatAction.UPLOAD_PHOTO,
)
elif self.media == "video" and len(self.path_to_media) > 1:
# Отправка медиагруппы для видео
media_group = []
for media_path in self.path_to_media:
url = valid_url(media_path)
if url:
media_group.append(InputMediaVideo(media=media_path))
else:
media_group.append(InputMediaVideo(media=types.FSInputFile(path=media_path)))
# Добавляем подпись и клавиатуру к последнему элементу
media_group[-1].caption = text
media_group[-1].parse_mode = self.parse_mode
await message.reply_media_group(
media=media_group,
disable_notification=self.disable_notification,
protect_content=self.protect,
)
# Отправка клавиатуры отдельным сообщением, если есть
if self.keyboard:
await message.reply(
text=" ",
reply_markup=self.keyboard(),
disable_notification=self.disable_notification,
)
if self.chat_action:
await message.bot.send_chat_action(
chat_id=message.chat.id,
action=ChatAction.UPLOAD_VIDEO,
)
elif self.media == "file" and len(self.path_to_media) > 1:
# Отправка медиагруппы для файлов
media_group = []
for media_path in self.path_to_media:
url = valid_url(media_path)
if url:
media_group.append(InputMediaDocument(media=media_path))
else:
media_group.append(InputMediaDocument(media=types.FSInputFile(path=media_path)))
# Добавляем подпись и клавиатуру к последнему элементу
media_group[-1].caption = text
media_group[-1].parse_mode = self.parse_mode
await message.reply_media_group(
media=media_group,
disable_notification=self.disable_notification,
protect_content=self.protect,
)
# Отправка клавиатуры отдельным сообщением, если есть
if self.keyboard:
await message.reply(
text=" ",
reply_markup=self.keyboard(),
disable_notification=self.disable_notification,
)
if self.chat_action:
await message.bot.send_chat_action(
chat_id=message.chat.id,
action=ChatAction.UPLOAD_DOCUMENT,
)
else:
# Одиночное медиа или другие типы
for idx, media_path in enumerate(self.path_to_media):
is_last = idx == len(self.path_to_media) - 1
url = valid_url(media_path)
if self.media == "photo":
if url:
await message.reply_photo(
photo=media_path,
caption=text if is_last else None,
reply_markup=self.keyboard() if is_last and self.keyboard else None,
parse_mode=self.parse_mode,
disable_notification=self.disable_notification,
protect_content=self.protect,
)
else:
await message.reply_photo(
photo=types.FSInputFile(path=media_path),
caption=text if is_last else None,
reply_markup=self.keyboard() if is_last and self.keyboard else None,
parse_mode=self.parse_mode,
disable_notification=self.disable_notification,
protect_content=self.protect,
)
if self.chat_action and is_last:
await message.bot.send_chat_action(
chat_id=message.chat.id,
action=ChatAction.UPLOAD_PHOTO,
)
elif self.media == "gif":
if url:
await message.reply_animation(
animation=media_path,
caption=text if is_last else None,
reply_markup=self.keyboard() if is_last and self.keyboard else None,
parse_mode=self.parse_mode,
disable_notification=self.disable_notification,
protect_content=self.protect,
)
else:
await message.reply_animation(
animation=types.FSInputFile(path=media_path),
caption=text if is_last else None,
reply_markup=self.keyboard() if is_last and self.keyboard else None,
parse_mode=self.parse_mode,
disable_notification=self.disable_notification,
protect_content=self.protect,
)
if self.chat_action and is_last:
await message.bot.send_chat_action(
chat_id=message.chat.id,
action=ChatAction.UPLOAD_VIDEO,
)
elif self.media == "video":
if url:
await message.reply_video(
video=media_path,
caption=text if is_last else None,
reply_markup=self.keyboard() if is_last and self.keyboard else None,
parse_mode=self.parse_mode,
disable_notification=self.disable_notification,
protect_content=self.protect,
)
else:
await message.reply_video(
video=types.FSInputFile(path=media_path),
caption=text if is_last else None,
reply_markup=self.keyboard() if is_last and self.keyboard else None,
parse_mode=self.parse_mode,
disable_notification=self.disable_notification,
protect_content=self.protect,
)
if self.chat_action and is_last:
await message.bot.send_chat_action(
chat_id=message.chat.id,
action=ChatAction.UPLOAD_VIDEO,
)
elif self.media == "videonote":
if url:
await message.reply_video_note(
video_note=media_path,
caption=text if is_last else None,
reply_markup=self.keyboard() if is_last and self.keyboard else None,
parse_mode=self.parse_mode,
disable_notification=self.disable_notification,
protect_content=self.protect,
)
else:
await message.reply_video_note(
video_note=types.FSInputFile(path=media_path),
caption=text if is_last else None,
reply_markup=self.keyboard() if is_last and self.keyboard else None,
parse_mode=self.parse_mode,
disable_notification=self.disable_notification,
protect_content=self.protect,
)
if self.chat_action and is_last:
await message.bot.send_chat_action(
chat_id=message.chat.id,
action=ChatAction.UPLOAD_VIDEO_NOTE,
)
elif self.media == "audio":
if url:
await message.reply_audio(
audio=media_path,
caption=text if is_last else None,
reply_markup=self.keyboard() if is_last and self.keyboard else None,
parse_mode=self.parse_mode,
disable_notification=self.disable_notification,
protect_content=self.protect,
)
else:
await message.reply_audio(
audio=types.FSInputFile(path=media_path),
caption=text if is_last else None,
reply_markup=self.keyboard() if is_last and self.keyboard else None,
parse_mode=self.parse_mode,
disable_notification=self.disable_notification,
protect_content=self.protect,
)
if self.chat_action and is_last:
await message.bot.send_chat_action(
chat_id=message.chat.id,
action=ChatAction.UPLOAD_VOICE,
)
elif self.media == "file":
if url:
await message.reply_document(
document=media_path,
caption=text if is_last else None,
reply_markup=self.keyboard() if is_last and self.keyboard else None,
parse_mode=self.parse_mode,
disable_notification=self.disable_notification,
protect_content=self.protect,
)
else:
await message.reply_document(
document=types.FSInputFile(path=media_path),
caption=text if is_last else None,
reply_markup=self.keyboard() if is_last and self.keyboard else None,
parse_mode=self.parse_mode,
disable_notification=self.disable_notification,
protect_content=self.protect,
)
if self.chat_action and is_last:
await message.bot.send_chat_action(
chat_id=message.chat.id,
action=ChatAction.UPLOAD_DOCUMENT,
)
elif self.media == "dice":
await message.reply_dice(
emoji="🎲", # Эмодзи кубика как стандартное значение, если нет URL
caption=text if is_last else None,
reply_markup=self.keyboard() if is_last and self.keyboard else None,
parse_mode=self.parse_mode,
disable_notification=self.disable_notification,
protect_content=self.protect,
)
if self.chat_action and is_last:
await message.bot.send_chat_action(
chat_id=message.chat.id,
action=ChatAction.CHOOSE_STICKER,
)
# Проверка на ошибку
except Exception as e:
Logs.error(log_type=self.log_type, user=username(message), text=f"Ошибка команды: {e}")