from __future__ import annotations import asyncio import logging import re from typing import Any from aiogram import Bot, Dispatcher, F, Router from aiogram.enums import ParseMode from aiogram.exceptions import TelegramBadRequest from aiogram.filters import Command, CommandStart from aiogram.fsm.context import FSMContext from aiogram.fsm.state import State, StatesGroup from aiogram.fsm.storage.memory import MemoryStorage from aiogram.types import CallbackQuery, Message from aiogram.utils.keyboard import InlineKeyboardBuilder from session_bot.config import load_actor_config, load_settings from session_bot.render import build_channel_text from session_bot.storage import JsonStateStorage router = Router() STATUS_CHOICES = { "open": "Открыть", "backstage": "Закулисье", "delay": "Задержка", "rest": "Антракт", } HELP_TEXT = ( "Команды\n" "/start или /panel - открыть панель\n" "/help - показать справку\n" "/refresh - перерисовать пост в канале\n" "/cancel - сбросить текущий ввод\n" "/post - сохранить шаблон поста с плейсхолдером {{actors}}\n\n" "Как пользоваться\n" "1. Откройте панель.\n" "2. Выберите своего актера.\n" "3. Нажмите один из статусов. Он применится сразу с шаблонной фразой.\n" "4. Если нужен свой текст, нажмите 'Своя фраза' и отправьте его.\n\n" "Шаблон поста\n" "В шаблоне должен быть {{actors}} - туда бот подставляет актерский блок.\n" "Можно добавить {{hidden_link}} в начало, либо указать hidden_link_url в config/actors.json.\n" "Шаблон лучше отправлять обычным текстом или реплаем на текстовый пост.\n" "Точное восстановление исходного MarkdownV2 из пересланного оформленного поста Telegram не гарантируется." ) class SessionForm(StatesGroup): waiting_for_custom_phrase = State() waiting_for_post_template = State() def actor_operator_ids(actor: dict[str, Any]) -> set[int]: if "operator_user_ids" in actor: return {int(user_id) for user_id in actor["operator_user_ids"]} return {int(actor["operator_user_id"])} def build_actor_lookup(config: dict) -> dict[int, dict[str, Any]]: lookup: dict[int, dict[str, Any]] = {} for actor in config["actors"]: for user_id in actor_operator_ids(actor): lookup[user_id] = actor return lookup def is_allowed(user_id: int, actor_lookup: dict[int, dict[str, Any]], admin_ids: set[int]) -> bool: return user_id in actor_lookup or user_id in admin_ids def build_actor_keyboard(config: dict, user_id: int, admin_ids: set[int]) -> InlineKeyboardBuilder: keyboard = InlineKeyboardBuilder() for actor in config["actors"]: if user_id in admin_ids or user_id in actor_operator_ids(actor): keyboard.button(text=actor["button_text"], callback_data=f"actor:{actor['key']}") keyboard.adjust(2) return keyboard def build_status_keyboard(actor_key: str) -> InlineKeyboardBuilder: keyboard = InlineKeyboardBuilder() for status_key, title in STATUS_CHOICES.items(): keyboard.button(text=title, callback_data=f"status:{actor_key}:{status_key}") keyboard.button(text="Своя фраза", callback_data=f"custom:{actor_key}") keyboard.button(text="⬅️ Назад", callback_data="nav:panel") keyboard.adjust(2) return keyboard def build_back_to_panel_keyboard() -> InlineKeyboardBuilder: keyboard = InlineKeyboardBuilder() keyboard.button(text="К панели", callback_data="nav:panel") return keyboard def find_actor(app_config: dict, actor_key: str) -> dict[str, Any] | None: return next((item for item in app_config["actors"] if item["key"] == actor_key), None) def get_actor_runtime_state(actor: dict[str, Any], state_storage: JsonStateStorage) -> dict[str, Any]: payload = state_storage.load() return payload.get("actors", {}).get(actor["key"], {}) def extract_template_text(message: Message) -> str | None: if message.text and message.text.startswith("/post "): return message.text.split(maxsplit=1)[1] source = message.reply_to_message or message if source.html_text: return source.html_text if source.text: return source.text return None def normalize_template_placeholders(template: str) -> str: normalized = template normalized = re.sub(r"\\\{\\\{\s*actors\s*\\\}\\\}", "{{actors}}", normalized, flags=re.IGNORECASE) normalized = re.sub(r"\{\{\s*actors\s*\}\}", "{{actors}}", normalized, flags=re.IGNORECASE) normalized = re.sub( r"\\\{\\\{\s*hidden_link\s*\\\}\\\}", "{{hidden_link}}", normalized, flags=re.IGNORECASE, ) normalized = re.sub(r"\{\{\s*hidden_link\s*\}\}", "{{hidden_link}}", normalized, flags=re.IGNORECASE) return normalized async def safe_edit_message(callback: CallbackQuery, text: str, reply_markup=None) -> None: try: await callback.message.edit_text( text, reply_markup=reply_markup, disable_web_page_preview=True, ) except TelegramBadRequest as exc: if "message is not modified" not in str(exc).lower(): raise async def show_panel(target: Message | CallbackQuery, user_id: int, app_config: dict, settings) -> None: keyboard = build_actor_keyboard(app_config, user_id, settings.admin_ids) text = "Выберите персонажа для смены статуса." if isinstance(target, CallbackQuery): await safe_edit_message(target, text, keyboard.as_markup()) await target.answer() return await target.answer(text, reply_markup=keyboard.as_markup()) async def show_actor_status_menu( callback: CallbackQuery, actor: dict[str, Any], state_storage: JsonStateStorage, ) -> None: runtime_state = get_actor_runtime_state(actor, state_storage) current_status = runtime_state.get("status", actor.get("default_status", "backstage")) current_phrase = runtime_state.get("phrase", actor.get("phrases", {}).get(current_status, "")) lines = [ f"Выбран {actor['display_name']}.", f"Текущий статус: {STATUS_CHOICES.get(current_status, current_status)}.", ] if current_phrase: lines.append(f"Текущая фраза: {current_phrase}") await safe_edit_message(callback, "\n".join(lines), build_status_keyboard(actor["key"]).as_markup()) async def update_channel_post(bot: Bot, app_config: dict, state_storage: JsonStateStorage, settings) -> None: state = state_storage.load() text = build_channel_text(app_config, state) await bot.edit_message_text( chat_id=settings.channel_id, message_id=settings.channel_message_id, text=text, parse_mode=ParseMode.HTML, disable_web_page_preview=False, ) async def apply_status_update( bot: Bot, actor_key: str, status_key: str, phrase: str, user_id: int, app_config: dict, state_storage: JsonStateStorage, settings, ) -> None: payload = state_storage.load() payload.setdefault("actors", {}) payload["actors"][actor_key] = { "status": status_key, "phrase": phrase, "updated_by": user_id, } state_storage.save(payload) await update_channel_post(bot, app_config, state_storage, settings) def save_post_template(state_storage: JsonStateStorage, template: str) -> None: payload = state_storage.load() payload["template"] = {"text": normalize_template_placeholders(template)} state_storage.save(payload) def count_actor_placeholders(template: str) -> int: return normalize_template_placeholders(template).count("{{actors}}") @router.message(CommandStart()) @router.message(Command("panel")) async def start_handler(message: Message, state: FSMContext, app_config: dict, actor_lookup: dict, settings) -> None: await state.clear() user_id = message.from_user.id if not is_allowed(user_id, actor_lookup, settings.admin_ids): await message.answer("У вас нет доступа к этой панели.") return await show_panel(message, user_id, app_config, settings) @router.message(Command("help")) async def help_handler(message: Message) -> None: await message.answer(HELP_TEXT, disable_web_page_preview=True) @router.message(Command("cancel")) async def cancel_handler(message: Message, state: FSMContext, app_config: dict, actor_lookup: dict, settings) -> None: await state.clear() user_id = message.from_user.id if not is_allowed(user_id, actor_lookup, settings.admin_ids): await message.answer("Состояние очищено.") return await show_panel(message, user_id, app_config, settings) @router.message(Command("refresh")) async def refresh_handler( message: Message, bot: Bot, app_config: dict, state_storage: JsonStateStorage, actor_lookup: dict, settings, ) -> None: user_id = message.from_user.id if not is_allowed(user_id, actor_lookup, settings.admin_ids): await message.answer("У вас нет доступа к обновлению.") return await update_channel_post(bot, app_config, state_storage, settings) await message.answer("Сообщение канала обновлено.") @router.message(Command("post")) async def post_handler( message: Message, state: FSMContext, state_storage: JsonStateStorage, actor_lookup: dict, settings, ) -> None: user_id = message.from_user.id if not is_allowed(user_id, actor_lookup, settings.admin_ids): await message.answer("У вас нет доступа к шаблонам поста.") return template = extract_template_text(message) if template is not None: normalized = normalize_template_placeholders(template) save_post_template(state_storage, normalized) await state.clear() placeholder_count = count_actor_placeholders(normalized) if placeholder_count == 0: await message.answer( "Шаблон сохранен, но в нем нет {{actors}}.\n" "Плейсхолдер должен быть именно {{actors}} в любом регистре." ) return if placeholder_count > 1: await message.answer( "Шаблон сохранен, но в нем несколько {{actors}}.\n" "Нужен только один общий {{actors}} на месте всего блока актеров, иначе список будет дублироваться." ) return if message.text and message.text.startswith("/post "): await message.answer( "Шаблон сохранен.\n" "Но если нужна разметка, ссылки и premium emoji, лучше использовать /post ответом на уже оформленное сообщение, а не вставлять шаблон текстом после команды." ) return await message.answer("Шаблон поста сохранен.") return await state.set_state(SessionForm.waiting_for_post_template) await message.answer( "Перешлите или отправьте текст шаблона одним сообщением.\n" "Внутри должен быть {{actors}}.\n" "Для скрытой ссылки можно использовать {{hidden_link}}." ) @router.callback_query(F.data == "nav:panel") async def panel_callback(callback: CallbackQuery, state: FSMContext, app_config: dict, actor_lookup: dict, settings) -> None: await state.clear() user_id = callback.from_user.id if not is_allowed(user_id, actor_lookup, settings.admin_ids): await callback.answer("Нет доступа.", show_alert=True) return await show_panel(callback, user_id, app_config, settings) @router.callback_query(F.data.startswith("actor:")) async def actor_handler( callback: CallbackQuery, state: FSMContext, settings, actor_lookup: dict, app_config: dict, state_storage: JsonStateStorage, ) -> None: await state.clear() user_id = callback.from_user.id if not is_allowed(user_id, actor_lookup, settings.admin_ids): await callback.answer("Нет доступа.", show_alert=True) return actor_key = callback.data.split(":", maxsplit=1)[1] actor = find_actor(app_config, actor_key) if actor is None: await callback.answer("Персонаж не найден.", show_alert=True) return if user_id not in settings.admin_ids and user_id not in actor_operator_ids(actor): await callback.answer("Можно менять только свой статус.", show_alert=True) return await show_actor_status_menu(callback, actor, state_storage) await callback.answer() @router.callback_query(F.data.startswith("status:")) async def status_handler( callback: CallbackQuery, bot: Bot, settings, actor_lookup: dict, app_config: dict, state_storage: JsonStateStorage, ) -> None: user_id = callback.from_user.id if not is_allowed(user_id, actor_lookup, settings.admin_ids): await callback.answer("Нет доступа.", show_alert=True) return _, actor_key, status_key = callback.data.split(":") actor = find_actor(app_config, actor_key) if actor is None: await callback.answer("Персонаж не найден.", show_alert=True) return if user_id not in settings.admin_ids and user_id not in actor_operator_ids(actor): await callback.answer("Можно менять только свой статус.", show_alert=True) return try: await apply_status_update( bot=bot, actor_key=actor_key, status_key=status_key, phrase=actor.get("phrases", {}).get(status_key, ""), user_id=user_id, app_config=app_config, state_storage=state_storage, settings=settings, ) except TelegramBadRequest as exc: logging.exception("Failed to edit channel message") await callback.answer("Не удалось обновить сообщение канала.", show_alert=True) await safe_edit_message( callback, f"Статус сохранен, но сообщение канала не обновилось:\n{exc}", build_back_to_panel_keyboard().as_markup(), ) return await safe_edit_message( callback, f"Готово.\n{actor['display_name']} -> {STATUS_CHOICES[status_key]}", build_back_to_panel_keyboard().as_markup(), ) await callback.answer("Готово") @router.callback_query(F.data.startswith("custom:")) async def custom_phrase_handler( callback: CallbackQuery, state: FSMContext, settings, actor_lookup: dict, app_config: dict, state_storage: JsonStateStorage, ) -> None: user_id = callback.from_user.id if not is_allowed(user_id, actor_lookup, settings.admin_ids): await callback.answer("Нет доступа.", show_alert=True) return _, actor_key = callback.data.split(":") actor = find_actor(app_config, actor_key) if actor is None: await callback.answer("Персонаж не найден.", show_alert=True) return if user_id not in settings.admin_ids and user_id not in actor_operator_ids(actor): await callback.answer("Можно менять только свой статус.", show_alert=True) return runtime_state = get_actor_runtime_state(actor, state_storage) status_key = runtime_state.get("status", actor.get("default_status", "backstage")) await state.set_state(SessionForm.waiting_for_custom_phrase) await state.update_data(actor_key=actor_key, status_key=status_key) await safe_edit_message( callback, ( f"Введите свою фразу для {actor['display_name']}.\n" f"Текущий статус останется: {STATUS_CHOICES.get(status_key, status_key)}.\n" "Если хотите сначала сменить статус, вернитесь назад." ), build_status_keyboard(actor_key).as_markup(), ) await callback.answer() @router.message(SessionForm.waiting_for_custom_phrase) async def phrase_handler( message: Message, state: FSMContext, bot: Bot, app_config: dict, state_storage: JsonStateStorage, settings, ) -> None: if message.text is None: await message.answer("Нужен текст сообщения.") return data = await state.get_data() actor_key = data["actor_key"] status_key = data["status_key"] actor = find_actor(app_config, actor_key) if actor is None: await state.clear() await message.answer("Персонаж не найден.") return phrase = message.text.strip() if not phrase: await message.answer("Фраза не должна быть пустой.") return try: await apply_status_update( bot=bot, actor_key=actor_key, status_key=status_key, phrase=phrase, user_id=message.from_user.id, app_config=app_config, state_storage=state_storage, settings=settings, ) except TelegramBadRequest as exc: logging.exception("Failed to edit channel message") await message.answer(f"Статус сохранен, но сообщение канала не обновилось:\n{exc}") await state.clear() return await state.clear() await message.answer( f"Обновлено: {actor['display_name']} -> {STATUS_CHOICES[status_key].lower()}.", reply_markup=build_back_to_panel_keyboard().as_markup(), ) @router.message(SessionForm.waiting_for_post_template) async def post_template_handler( message: Message, state: FSMContext, state_storage: JsonStateStorage, ) -> None: template = extract_template_text(message) if template is None: await message.answer("Нужен текстовый шаблон. Перешлите текстовый пост или отправьте текст.") return normalized = normalize_template_placeholders(template) save_post_template(state_storage, normalized) await state.clear() placeholder_count = count_actor_placeholders(normalized) if placeholder_count == 0: await message.answer( "Шаблон сохранен, но в нем нет {{actors}}.\n" "Плейсхолдер должен быть именно {{actors}} в любом регистре." ) return if placeholder_count > 1: await message.answer( "Шаблон сохранен, но в нем несколько {{actors}}.\n" "Нужен только один общий {{actors}} на месте всего блока актеров, иначе список будет дублироваться." ) return await message.answer("Шаблон поста сохранен.") def build_dispatcher(app_config: dict, settings, state_storage: JsonStateStorage) -> Dispatcher: dispatcher = Dispatcher(storage=MemoryStorage()) dispatcher["app_config"] = app_config dispatcher["actor_lookup"] = build_actor_lookup(app_config) dispatcher["settings"] = settings dispatcher["state_storage"] = state_storage dispatcher.include_router(router) return dispatcher async def main() -> None: logging.basicConfig(level=logging.INFO) settings = load_settings() app_config = load_actor_config(settings.config_path) state_storage = JsonStateStorage(settings.state_path) bot = Bot(token=settings.bot_token) dispatcher = build_dispatcher(app_config, settings, state_storage) await dispatcher.start_polling(bot) def run() -> None: asyncio.run(main())