Files
2026-02-17 11:24:55 +07:00

82 lines
3.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from asyncio import create_task
from aiogram import Router, F
from aiogram.filters import Command
from aiogram.types import Message
from aiogram.exceptions import TelegramBadRequest
from aiogram.fsm.context import FSMContext
from bot.core.bots import bot, BotInfo
from bot.filters import IsOwner
from bot.utils import status_clear, auto_delete_message, hidden_admins_message
from configs import COMMANDS
from middleware.loggers import logger
__all__ = ("router",)
# Ключ для команды
CMD: str = "all"
# Инициализация роутера
router: Router = Router(name=f"{CMD}_cmd_router")
@router.message(
F.text.lower().regexp(rf"^({'|'.join(COMMANDS[CMD])})\s?.*"), # ловим текст без префикса
F.chat.type.in_({"supergroup", "group"}),
IsOwner()
)
@router.message(
Command(*COMMANDS[CMD], prefix=BotInfo.prefix, ignore_case=True),
F.chat.type.in_({"supergroup", "group"}),
IsOwner()
)
async def notify_all_text(message: Message, state: FSMContext) -> None:
"""
Обработчик команды /all, /call и текстовых эквивалентов типа "Калл Привет всем".
Функционал:
1. Считывает весь текст после команды.
2. Формирует скрытое сообщение для администраторов.
3. Отправляет сообщение в чат.
4. Автоматически удаляет сообщение через неделю.
5. Пытается закрепить сообщение в чате.
Args:
message (Message): Объект входящего сообщения.
state (FSMContext): Контекст FSM, используется для очистки состояния.
"""
# Очистка состояния FSM перед выполнением команды
await status_clear(update=message, state=state)
# Извлечение текста после команды
parts: list[str] = message.text.split(" ", 1)
custom_text: str = parts[1] if len(parts) > 1 else "⚡ Внимание всем!"
# Формирование скрытого текста для администраторов
hidden_text: str = await hidden_admins_message(message=message, text=custom_text)
# Отправка сообщения в чат
sent_message: Message = await message.answer(hidden_text)
# Запуск асинхронной задачи по удалению сообщения через 7 дней
create_task(
auto_delete_message(
chat_id=message.chat.id,
message_id=sent_message.message_id,
delay=604800 # 7 дней в секундах
)
)
# Попытка закрепить сообщение и удалить "системное" сообщение о закреплении
try:
await bot.pin_chat_message(
chat_id=message.chat.id,
message_id=sent_message.message_id,
disable_notification=False
)
# Иногда Telegram создает дополнительное уведомление при закреплении
await bot.delete_message(chat_id=message.chat.id, message_id=sent_message.message_id + 1)
logger.debug(f"[ALL] Сообщение закреплено: {custom_text}")
except TelegramBadRequest as e:
logger.error(f"[ALL] Ошибка закрепления сообщения: {e}")