Files
PrimoGuardBot/bot/templates/message_callback.py

836 lines
26 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Универсальные шаблоны для отправки сообщений
"""
from typing import Union, Optional, List, Dict, Callable
from pathlib import Path
from contextlib import suppress
from aiogram import Bot
from aiogram.fsm.context import FSMContext
from aiogram.types import (
Message,
CallbackQuery,
InlineKeyboardMarkup,
ReplyKeyboardMarkup,
ReplyKeyboardRemove,
FSInputFile,
InputMediaPhoto,
InputMediaVideo,
InputMediaAudio,
InputMediaDocument,
BufferedInputFile
)
from aiogram.utils.keyboard import InlineKeyboardBuilder, ReplyKeyboardBuilder
from aiogram.exceptions import TelegramBadRequest, TelegramForbiddenError
from aiogram.enums import ParseMode, ChatAction
from middleware.loggers import logger
from ..utils.state_utils import safe_answer_callback
from ..utils.auto_delete import auto_delete_manager
__all__ = (
'msg',
'msg_photo',
'msg_video',
'msg_document',
'msg_audio',
'msg_voice',
'msg_media_group',
'edit_msg',
'delete_msg',
'forward_msg',
'send_action',
'markups',
'MessageTemplate',
'batch_send'
)
class MessageTemplate:
"""
Класс для хранения шаблонов сообщений.
Example:
```python
# Создание шаблона
welcome = MessageTemplate(
text="👋 Привет, {name}! Добро пожаловать в {chat}",
parse_mode=ParseMode.HTML
)
# Использование
await welcome.send(
message,
name=user.first_name,
chat=chat.title
)
```
"""
def __init__(
self,
text: str,
parse_mode: Optional[str] = ParseMode.HTML,
disable_web_page_preview: bool = False,
markup: Optional[Union[InlineKeyboardBuilder, InlineKeyboardMarkup]] = None
):
self.text = text
self.parse_mode = parse_mode
self.disable_web_page_preview = disable_web_page_preview
self.markup = markup
def format(self, **kwargs) -> str:
"""Форматирует текст с подстановкой переменных"""
return self.text.format(**kwargs)
async def send(
self,
target: Union[Message, CallbackQuery, int],
bot: Optional[Bot] = None,
**format_kwargs
) -> Optional[Message]:
"""
Отправляет сообщение по шаблону.
Args:
target: Куда отправить (Message, CallbackQuery или chat_id)
bot: Экземпляр бота (если target это chat_id)
**format_kwargs: Переменные для форматирования
"""
text = self.format(**format_kwargs)
if isinstance(target, int):
# Отправка по chat_id
if not bot:
raise ValueError("Bot instance required for chat_id")
return await bot.send_message(
chat_id=target,
text=text,
parse_mode=self.parse_mode,
disable_web_page_preview=self.disable_web_page_preview,
reply_markup=markups(self.markup)
)
else:
# Отправка через Message/CallbackQuery
return await msg(
target,
text=text,
parse_mode=self.parse_mode,
disable_web_page_preview=self.disable_web_page_preview,
markup=self.markup
)
# ================= MARKUP UTILS =================
def markups(
markup: Union[
InlineKeyboardBuilder,
ReplyKeyboardBuilder,
InlineKeyboardMarkup,
ReplyKeyboardMarkup,
ReplyKeyboardRemove,
None
] = None,
resize_keyboard: bool = True,
one_time_keyboard: bool = False
) -> Optional[Union[InlineKeyboardMarkup, ReplyKeyboardMarkup, ReplyKeyboardRemove]]:
"""
Конвертирует builder в готовый markup.
Args:
markup: Builder или готовая клавиатура
resize_keyboard: Автоматический размер (для ReplyKeyboard)
one_time_keyboard: Скрыть после нажатия (для ReplyKeyboard)
Returns:
Готовый markup или None
Example:
>> builder = InlineKeyboardBuilder()
>> builder.button(text="Test", callback_data="test")
>> keyboard = markups(builder)
"""
if markup is None:
return None
if isinstance(markup, InlineKeyboardBuilder):
return markup.as_markup()
if isinstance(markup, ReplyKeyboardBuilder):
return markup.as_markup(
resize_keyboard=resize_keyboard,
one_time_keyboard=one_time_keyboard
)
if isinstance(markup, (InlineKeyboardMarkup, ReplyKeyboardMarkup, ReplyKeyboardRemove)):
return markup
return None
# ================= TEXT MESSAGES =================
async def msg(
update: Union[Message, CallbackQuery],
text: str,
state: Optional[FSMContext] = None,
markup: Union[InlineKeyboardBuilder, InlineKeyboardMarkup, None] = None,
parse_mode: Optional[str] = ParseMode.HTML,
disable_web_page_preview: bool = False,
answer_callback: bool = True,
state_clear: bool = False,
edit_if_possible: bool = True,
delete_previous: bool = False,
auto_delete: Optional[int] = None,
disable_notification: bool = False,
protect_content: bool = False,
show_typing: bool = False,
log: bool = False
) -> Optional[Message]:
"""
Универсальная отправка/редактирование текстового сообщения.
Args:
update: Message или CallbackQuery
text: Текст сообщения
state: FSM контекст
markup: Клавиатура
parse_mode: Режим парсинга (HTML, Markdown, None)
disable_web_page_preview: Отключить предпросмотр ссылок
answer_callback: Ответить на callback
state_clear: Очистить состояние
edit_if_possible: Попытаться отредактировать (для callback)
delete_previous: Удалить предыдущее сообщение перед отправкой
auto_delete: Автоудаление через N секунд
disable_notification: Без звука
protect_content: Защита от пересылки
show_typing: Показать "печатает"
log: Логировать отправку
Returns:
Отправленное сообщение
Example:
>> # Простая отправка
>> await msg(message, "Привет!")
>> # С клавиатурой и автоудалением
>> builder = InlineKeyboardBuilder()
>> builder.button(text="OK", callback_data="ok")
>> await msg(
... callback,
... "Сообщение удалится через 10 секунд",
... markup=builder,
... auto_delete=10
... )
"""
# Получаем message объект
message = update.message if isinstance(update, CallbackQuery) else update
if not message:
logger.warning("Невозможно получить message объект", log_type='MESSAGE')
return None
# Показываем typing если нужно
if show_typing:
await send_action(message, ChatAction.TYPING)
# Удаляем предыдущее сообщение если нужно
if delete_previous:
with suppress(TelegramBadRequest, TelegramForbiddenError):
await message.delete()
keyboard = markups(markup)
try:
if edit_if_possible and isinstance(update, CallbackQuery):
try:
sent_message = await message.edit_text(
text=text,
reply_markup=keyboard,
parse_mode=parse_mode,
disable_web_page_preview=disable_web_page_preview
)
if log:
logger.debug(
f"Сообщение отредактировано: {message.message_id}",
log_type='MESSAGE'
)
except (TelegramBadRequest, TelegramForbiddenError):
sent_message = await message.answer(
text=text,
reply_markup=keyboard,
parse_mode=parse_mode,
disable_web_page_preview=disable_web_page_preview,
disable_notification=disable_notification,
protect_content=protect_content
)
else:
sent_message = await message.answer(
text=text,
reply_markup=keyboard,
parse_mode=parse_mode,
disable_web_page_preview=disable_web_page_preview,
disable_notification=disable_notification,
protect_content=protect_content
)
except (TelegramBadRequest, TelegramForbiddenError):
# Отправка нового сообщения
try:
sent_message = await message.answer(
text=text,
reply_markup=keyboard,
parse_mode=parse_mode,
disable_web_page_preview=disable_web_page_preview,
disable_notification=disable_notification,
protect_content=protect_content
)
if log:
logger.debug(
f"Сообщение отправлено: {sent_message.message_id}",
log_type='MESSAGE'
)
except Exception as e:
logger.error(f"Ошибка отправки сообщения: {e}", log_type='MESSAGE')
return None
# Отвечаем на callback
if answer_callback and isinstance(update, CallbackQuery):
await safe_answer_callback(update)
# Очищаем состояние
if state_clear and state:
await state.clear()
# Планируем автоудаление
if auto_delete and sent_message:
await auto_delete_manager.schedule(
bot=message.bot,
chat_id=sent_message.chat.id,
message_id=sent_message.message_id,
delay=auto_delete,
reason="template_auto_delete"
)
return sent_message
# ================= MEDIA MESSAGES =================
async def msg_photo(
update: Union[Message, CallbackQuery],
photo: Union[str, Path, FSInputFile, BufferedInputFile],
caption: Optional[str] = None,
state: Optional[FSMContext] = None,
markup: Union[InlineKeyboardBuilder, InlineKeyboardMarkup, None] = None,
parse_mode: Optional[str] = ParseMode.HTML,
answer_callback: bool = True,
state_clear: bool = False,
edit_if_possible: bool = True,
auto_delete: Optional[int] = None,
has_spoiler: bool = False,
log: bool = False
) -> Optional[Message]:
"""
Универсальная отправка/редактирование фото.
Args:
update: Message или CallbackQuery
photo: Путь к файлу, FSInputFile или BufferedInputFile
caption: Подпись к фото
state: FSM контекст
markup: Клавиатура
parse_mode: Режим парсинга
answer_callback: Ответить на callback
state_clear: Очистить состояние
edit_if_possible: Попытаться отредактировать
auto_delete: Автоудаление через N секунд
has_spoiler: Спойлер
log: Логировать
Returns:
Отправленное сообщение
Example:
>> await msg_photo(
... message,
... photo="assets/welcome.jpg",
... caption="Добро пожаловать!",
... auto_delete=30
... )
"""
message = update.message if isinstance(update, CallbackQuery) else update
if not message:
return None
# Конвертируем путь в FSInputFile
if isinstance(photo, (str, Path)):
photo = FSInputFile(photo)
keyboard = markups(markup)
try:
# Попытка редактирования медиа
if edit_if_possible and isinstance(update, CallbackQuery):
media = InputMediaPhoto(
media=photo,
caption=caption,
parse_mode=parse_mode,
has_spoiler=has_spoiler
)
await message.edit_media(
media=media,
reply_markup=keyboard
)
sent_message = message
if log:
logger.debug("Фото отредактировано", log_type='MESSAGE')
else:
raise TelegramBadRequest
except (TelegramBadRequest, TelegramForbiddenError):
# Отправка нового фото
try:
sent_message = await message.answer_photo(
photo=photo,
caption=caption,
reply_markup=keyboard,
parse_mode=parse_mode,
has_spoiler=has_spoiler
)
if log:
logger.debug("Фото отправлено", log_type='MESSAGE')
except Exception as e:
logger.error(f"Ошибка отправки фото: {e}", log_type='MESSAGE')
return None
if answer_callback and isinstance(update, CallbackQuery):
await safe_answer_callback(update)
if state_clear and state:
await state.clear()
if auto_delete and sent_message:
await auto_delete_manager.schedule(
bot=message.bot,
chat_id=sent_message.chat.id,
message_id=sent_message.message_id,
delay=auto_delete
)
return sent_message
async def msg_video(
update: Union[Message, CallbackQuery],
video: Union[str, Path, FSInputFile, BufferedInputFile],
caption: Optional[str] = None,
**kwargs
) -> Optional[Message]:
"""
Отправка видео.
Поддерживает те же параметры что и msg_photo.
"""
message = update.message if isinstance(update, CallbackQuery) else update
if not message:
return None
if isinstance(video, (str, Path)):
video = FSInputFile(video)
try:
sent = await message.answer_video(
video=video,
caption=caption,
parse_mode=kwargs.get('parse_mode', ParseMode.HTML),
reply_markup=markups(kwargs.get('markup'))
)
if kwargs.get('answer_callback') and isinstance(update, CallbackQuery):
await safe_answer_callback(update)
if kwargs.get('state_clear') and kwargs.get('state'):
await kwargs['state'].clear()
if kwargs.get('auto_delete'):
await auto_delete_manager.schedule(
bot=message.bot,
chat_id=sent.chat.id,
message_id=sent.message_id,
delay=kwargs['auto_delete']
)
return sent
except Exception as e:
logger.error(f"Ошибка отправки видео: {e}", log_type='MESSAGE')
return None
async def msg_document(
update: Union[Message, CallbackQuery],
document: Union[str, Path, FSInputFile, BufferedInputFile],
caption: Optional[str] = None,
filename: Optional[str] = None,
**kwargs
) -> Optional[Message]:
"""
Отправка документа.
Args:
filename: Имя файла для отображения
:param filename:
:param caption:
:param document:
:param update:
"""
message = update.message if isinstance(update, CallbackQuery) else update
if not message:
return None
if isinstance(document, (str, Path)):
document = FSInputFile(document, filename=filename)
try:
sent = await message.answer_document(
document=document,
caption=caption,
parse_mode=kwargs.get('parse_mode', ParseMode.HTML),
reply_markup=markups(kwargs.get('markup'))
)
if kwargs.get('answer_callback') and isinstance(update, CallbackQuery):
await safe_answer_callback(update)
if kwargs.get('state_clear') and kwargs.get('state'):
await kwargs['state'].clear()
if kwargs.get('auto_delete'):
await auto_delete_manager.schedule(
bot=message.bot,
chat_id=sent.chat.id,
message_id=sent.message_id,
delay=kwargs['auto_delete']
)
return sent
except Exception as e:
logger.error(f"Ошибка отправки документа: {e}", log_type='MESSAGE')
return None
async def msg_audio(
update: Union[Message, CallbackQuery],
audio: Union[str, Path, FSInputFile, BufferedInputFile],
caption: Optional[str] = None,
**kwargs
) -> Optional[Message]:
"""Отправка аудио"""
message = update.message if isinstance(update, CallbackQuery) else update
if not message:
return None
if isinstance(audio, (str, Path)):
audio = FSInputFile(audio)
try:
return await message.answer_audio(
audio=audio,
caption=caption,
parse_mode=kwargs.get('parse_mode', ParseMode.HTML),
reply_markup=markups(kwargs.get('markup'))
)
except Exception as e:
logger.error(f"Ошибка отправки аудио: {e}", log_type='MESSAGE')
return None
async def msg_voice(
update: Union[Message, CallbackQuery],
voice: Union[str, Path, FSInputFile, BufferedInputFile],
caption: Optional[str] = None,
**kwargs
) -> Optional[Message]:
"""Отправка голосового сообщения"""
message = update.message if isinstance(update, CallbackQuery) else update
if not message:
return None
if isinstance(voice, (str, Path)):
voice = FSInputFile(voice)
try:
return await message.answer_voice(
voice=voice,
caption=caption,
parse_mode=kwargs.get('parse_mode', ParseMode.HTML),
reply_markup=markups(kwargs.get('markup'))
)
except Exception as e:
logger.error(f"Ошибка отправки голосового: {e}", log_type='MESSAGE')
return None
async def msg_media_group(
message: Message,
media: List[Union[InputMediaPhoto, InputMediaVideo, InputMediaAudio, InputMediaDocument]],
caption: Optional[str] = None
) -> Optional[List[Message]]:
"""
Отправка media group (альбом).
Args:
message: Объект сообщения
media: Список медиа
caption: Подпись (будет добавлена к первому элементу)
Returns:
Список отправленных сообщений
Example:
>> media = [
... InputMediaPhoto(media=FSInputFile("photo1.jpg")),
... InputMediaPhoto(media=FSInputFile("photo2.jpg")),
... InputMediaVideo(media=FSInputFile("video.mp4"))
... ]
>> await msg_media_group(message, media, caption="Альбом")
"""
if not media:
return None
# Добавляем подпись к первому элементу
if caption and media:
media[0].caption = caption
try:
return await message.answer_media_group(media=media)
except Exception as e:
logger.error(f"Ошибка отправки media group: {e}", log_type='MESSAGE')
return None
# ================= MESSAGE ACTIONS =================
async def edit_msg(
message: Message,
text: Optional[str] = None,
caption: Optional[str] = None,
markup: Optional[InlineKeyboardMarkup] = None,
parse_mode: Optional[str] = ParseMode.HTML
) -> bool:
"""
Безопасное редактирование сообщения.
Returns:
bool: True если успешно отредактировано
"""
try:
if text:
await message.edit_text(
text=text,
reply_markup=markup,
parse_mode=parse_mode
)
elif caption:
await message.edit_caption(
caption=caption,
reply_markup=markup,
parse_mode=parse_mode
)
else:
await message.edit_reply_markup(reply_markup=markup)
return True
except (TelegramBadRequest, TelegramForbiddenError) as e:
logger.debug(f"Не удалось отредактировать сообщение: {e}", log_type='MESSAGE')
return False
async def delete_msg(
message: Message,
delay: Optional[int] = None
) -> bool:
"""
Безопасное удаление сообщения.
Args:
message: Сообщение для удаления
delay: Задержка перед удалением (секунды)
Returns:
bool: True если успешно удалено
"""
if delay:
await auto_delete_manager.schedule(
bot=message.bot,
chat_id=message.chat.id,
message_id=message.message_id,
delay=delay
)
return True
try:
await message.delete()
return True
except (TelegramBadRequest, TelegramForbiddenError):
return False
async def forward_msg(
message: Message,
to_chat_id: int,
disable_notification: bool = False,
protect_content: bool = False
) -> Optional[Message]:
"""
Пересылка сообщения.
Args:
message: Исходное сообщение
to_chat_id: ID чата куда переслать
disable_notification: Без звука
protect_content: Защита от пересылки
Returns:
Пересланное сообщение
"""
try:
return await message.forward(
chat_id=to_chat_id,
disable_notification=disable_notification,
protect_content=protect_content
)
except Exception as e:
logger.error(f"Ошибка пересылки сообщения: {e}", log_type='MESSAGE')
return None
async def send_action(
message: Message,
action: ChatAction = ChatAction.TYPING
) -> bool:
"""
Отправка chat action (печатает, загружает фото и т.д.).
Args:
message: Объект сообщения
action: Тип действия
Returns:
bool: True если успешно
Example:
>> await send_action(message, ChatAction.TYPING)
>> await send_action(message, ChatAction.UPLOAD_PHOTO)
"""
try:
await message.bot.send_chat_action(
chat_id=message.chat.id,
action=action
)
return True
except:
return False
# ================= BATCH SENDING =================
async def batch_send(
bot: Bot,
chat_ids: List[int],
text: str,
markup: Optional[InlineKeyboardMarkup] = None,
parse_mode: Optional[str] = ParseMode.HTML,
disable_notification: bool = False,
on_success: Optional[Callable] = None,
on_error: Optional[Callable] = None
) -> Dict[str, int]:
"""
Массовая рассылка сообщений.
Args:
bot: Экземпляр бота
chat_ids: Список ID чатов
text: Текст сообщения
markup: Клавиатура
parse_mode: Режим парсинга
disable_notification: Без звука
on_success: Callback при успехе (chat_id)
on_error: Callback при ошибке (chat_id, error)
Returns:
Dict со статистикой: {'success': N, 'failed': N}
Example:
>> stats = await batch_send(
... bot,
... [123, 456, 789],
... "Важное объявление!"
... )
>> print(f"Отправлено: {stats['success']}")
"""
import asyncio
success_count = 0
failed_count = 0
for chat_id in chat_ids:
try:
await bot.send_message(
chat_id=chat_id,
text=text,
reply_markup=markup,
parse_mode=parse_mode,
disable_notification=disable_notification
)
success_count += 1
if on_success:
await on_success(chat_id)
except Exception as e:
failed_count += 1
if on_error:
await on_error(chat_id, e)
logger.warning(
f"Не удалось отправить сообщение в чат {chat_id}: {e}",
log_type='BATCH'
)
# Небольшая задержка чтобы избежать rate limit
await asyncio.sleep(0.05)
logger.info(
f"Рассылка завершена: успешно={success_count}, ошибок={failed_count}",
log_type='BATCH'
)
return {
'success': success_count,
'failed': failed_count
}