from __future__ import annotations
import asyncio
import logging
import re
from typing import Any
from aiogram import Bot, Dispatcher, F, Router
from aiogram.enums import ParseMode
from aiogram.exceptions import TelegramBadRequest
from aiogram.filters import Command, CommandStart
from aiogram.fsm.context import FSMContext
from aiogram.fsm.state import State, StatesGroup
from aiogram.fsm.storage.memory import MemoryStorage
from aiogram.types import CallbackQuery, LinkPreviewOptions, Message
from aiogram.utils.keyboard import InlineKeyboardBuilder
from session_bot.config import load_actor_config, load_settings
from session_bot.render import build_channel_text
from session_bot.storage import JsonStateStorage
router = Router()
STATUS_CHOICES = {
"open": "Открыть",
"backstage": "Закулисье",
"delay": "Задержка",
"rest": "Антракт",
}
HELP_TEXT = (
"Команды\n"
"/start или /panel - открыть панель\n"
"/help - показать справку\n"
"/refresh - перерисовать пост в канале\n"
"/cancel - сбросить текущий ввод\n"
"/post - сохранить шаблон поста\n"
"/template_dump - показать HTML из сообщения-реплая\n\n"
"Шаблон поста\n"
"Можно использовать один общий {{actors}} для всего блока актеров.\n"
"Или точечные плейсхолдеры: {{actor:liebe}}, {{actor:mari}}.\n"
"Можно добавить {{hidden_link}} в начало, либо задать HIDDEN_LINK_URL в .env.\n"
"Если нужна готовая Telegram-разметка, используйте /post ответом на уже оформленное сообщение."
)
class SessionForm(StatesGroup):
waiting_for_custom_phrase = State()
waiting_for_post_template = State()
def actor_operator_ids(actor: dict[str, Any]) -> set[int]:
if "operator_user_ids" in actor:
return {int(user_id) for user_id in actor["operator_user_ids"]}
return {int(actor["operator_user_id"])}
def build_actor_lookup(config: dict) -> dict[int, dict[str, Any]]:
lookup: dict[int, dict[str, Any]] = {}
for actor in config["actors"]:
for user_id in actor_operator_ids(actor):
lookup[user_id] = actor
return lookup
def is_allowed(user_id: int, actor_lookup: dict[int, dict[str, Any]], admin_ids: set[int]) -> bool:
return user_id in actor_lookup or user_id in admin_ids
def build_actor_keyboard(config: dict, user_id: int, admin_ids: set[int]) -> InlineKeyboardBuilder:
keyboard = InlineKeyboardBuilder()
for actor in config["actors"]:
if user_id in admin_ids or user_id in actor_operator_ids(actor):
keyboard.button(text=actor["button_text"], callback_data=f"actor:{actor['key']}")
keyboard.adjust(2)
return keyboard
def build_status_keyboard(actor_key: str) -> InlineKeyboardBuilder:
keyboard = InlineKeyboardBuilder()
for status_key, title in STATUS_CHOICES.items():
keyboard.button(text=title, callback_data=f"status:{actor_key}:{status_key}")
keyboard.button(text="Своя фраза", callback_data=f"custom:{actor_key}")
keyboard.button(text="⬅️ Назад", callback_data="nav:panel")
keyboard.adjust(2)
return keyboard
def build_back_to_actor_keyboard(actor_key: str) -> InlineKeyboardBuilder:
keyboard = InlineKeyboardBuilder()
keyboard.button(text="⬅️ Назад", callback_data=f"actor:{actor_key}")
return keyboard
def build_back_to_panel_keyboard() -> InlineKeyboardBuilder:
keyboard = InlineKeyboardBuilder()
keyboard.button(text="К панели", callback_data="nav:panel")
return keyboard
def find_actor(app_config: dict, actor_key: str) -> dict[str, Any] | None:
return next((item for item in app_config["actors"] if item["key"] == actor_key), None)
def get_actor_runtime_state(actor: dict[str, Any], state_storage: JsonStateStorage) -> dict[str, Any]:
payload = state_storage.load()
return payload.get("actors", {}).get(actor["key"], {})
def extract_template_text(message: Message) -> str | None:
if message.text and message.text.startswith("/post "):
return message.text.split(maxsplit=1)[1]
source = message.reply_to_message or message
if source.html_text:
return source.html_text
if source.text:
return source.text
return None
def normalize_template_placeholders(template: str) -> str:
normalized = template
normalized = re.sub(r"\\\{\\\{\s*actors\s*\\\}\\\}", "{{actors}}", normalized, flags=re.IGNORECASE)
normalized = re.sub(r"\{\{\s*actors\s*\}\}", "{{actors}}", normalized, flags=re.IGNORECASE)
normalized = re.sub(
r"\\\{\\\{\s*actor\s*:\s*([a-z0-9_\-]+)\s*\\\}\\\}",
lambda match: f"{{{{actor:{match.group(1).lower()}}}}}",
normalized,
flags=re.IGNORECASE,
)
normalized = re.sub(
r"\{\{\s*actor\s*:\s*([a-z0-9_\-]+)\s*\}\}",
lambda match: f"{{{{actor:{match.group(1).lower()}}}}}",
normalized,
flags=re.IGNORECASE,
)
normalized = re.sub(r"\\\{\\\{\s*hidden_link\s*\\\}\\\}", "{{hidden_link}}", normalized, flags=re.IGNORECASE)
normalized = re.sub(r"\{\{\s*hidden_link\s*\}\}", "{{hidden_link}}", normalized, flags=re.IGNORECASE)
return normalized
def sanitize_template_html(template: str) -> str:
# Some forwarded/copied premium emoji arrive as X.
# Bots often cannot reuse those identifiers, so keep only the visible fallback text.
return re.sub(
r"]*>(.*?)",
lambda match: match.group(1) or "",
template,
flags=re.IGNORECASE | re.DOTALL,
)
def validate_template_structure(template: str) -> str | None:
normalized = normalize_template_placeholders(template)
common_count = normalized.count("{{actors}}")
specific_count = len(re.findall(r"\{\{actor:[a-z0-9_\-]+\}\}", normalized, flags=re.IGNORECASE))
if common_count == 0 and specific_count == 0:
return "Шаблон сохранен, но в нем нет {{actors}} или {{actor:key}}."
if common_count > 1:
return "Шаблон сохранен, но в нем несколько {{actors}}. Нужен только один общий {{actors}}."
return None
async def safe_edit_message(callback: CallbackQuery, text: str, reply_markup=None) -> None:
try:
await callback.message.edit_text(
text,
reply_markup=reply_markup,
disable_web_page_preview=True,
)
except TelegramBadRequest as exc:
if "message is not modified" not in str(exc).lower():
raise
async def show_panel(target: Message | CallbackQuery, user_id: int, app_config: dict, settings) -> None:
keyboard = build_actor_keyboard(app_config, user_id, settings.admin_ids)
text = "Выберите персонажа для смены статуса."
if isinstance(target, CallbackQuery):
await safe_edit_message(target, text, keyboard.as_markup())
await target.answer()
return
await target.answer(text, reply_markup=keyboard.as_markup())
async def show_actor_status_menu(callback: CallbackQuery, actor: dict[str, Any], state_storage: JsonStateStorage) -> None:
runtime_state = get_actor_runtime_state(actor, state_storage)
current_status = runtime_state.get("status", actor.get("default_status", "backstage"))
current_phrase = runtime_state.get("phrase", actor.get("phrases", {}).get(current_status, ""))
lines = [
f"Выбран {actor['display_name']}.",
f"Текущий статус: {STATUS_CHOICES.get(current_status, current_status)}.",
]
if current_phrase:
lines.append(f"Текущая фраза: {current_phrase}")
await safe_edit_message(callback, "\n".join(lines), build_status_keyboard(actor["key"]).as_markup())
async def update_channel_post(bot: Bot, app_config: dict, state_storage: JsonStateStorage, settings) -> None:
state = state_storage.load()
text = build_channel_text(app_config, state)
link_preview_options = None
if app_config.get("hidden_link_url", "").strip():
link_preview_options = LinkPreviewOptions(
url=app_config["hidden_link_url"].strip(),
show_above_text=True,
)
try:
await bot.edit_message_text(
chat_id=settings.channel_id,
message_id=settings.channel_message_id,
text=text,
parse_mode=ParseMode.HTML,
link_preview_options=link_preview_options,
)
except TelegramBadRequest as exc:
if "message is not modified" in str(exc).lower():
return
if "Invalid custom emoji identifier specified" not in str(exc):
raise
template = state.get("template", {}).get("text", "")
if not template:
raise
state["template"]["text"] = sanitize_template_html(template)
state_storage.save(state)
fallback_text = build_channel_text(app_config, state)
await bot.edit_message_text(
chat_id=settings.channel_id,
message_id=settings.channel_message_id,
text=fallback_text,
parse_mode=ParseMode.HTML,
link_preview_options=link_preview_options,
)
async def apply_status_update(
bot: Bot,
actor_key: str,
status_key: str,
phrase: str,
user_id: int,
app_config: dict,
state_storage: JsonStateStorage,
settings,
) -> None:
payload = state_storage.load()
payload.setdefault("actors", {})
payload["actors"][actor_key] = {
"status": status_key,
"phrase": phrase,
"updated_by": user_id,
}
state_storage.save(payload)
await update_channel_post(bot, app_config, state_storage, settings)
def save_post_template(state_storage: JsonStateStorage, template: str) -> None:
payload = state_storage.load()
payload["template"] = {"text": sanitize_template_html(normalize_template_placeholders(template))}
state_storage.save(payload)
@router.message(CommandStart())
@router.message(Command("panel"))
async def start_handler(message: Message, state: FSMContext, app_config: dict, actor_lookup: dict, settings) -> None:
await state.clear()
user_id = message.from_user.id
if not is_allowed(user_id, actor_lookup, settings.admin_ids):
await message.answer("У вас нет доступа к этой панели.")
return
await show_panel(message, user_id, app_config, settings)
@router.message(Command("help"))
async def help_handler(message: Message) -> None:
await message.answer(HELP_TEXT)
@router.message(Command("cancel"))
async def cancel_handler(message: Message, state: FSMContext, app_config: dict, actor_lookup: dict, settings) -> None:
await state.clear()
user_id = message.from_user.id
if not is_allowed(user_id, actor_lookup, settings.admin_ids):
await message.answer("Состояние очищено.")
return
await show_panel(message, user_id, app_config, settings)
@router.message(Command("refresh"))
async def refresh_handler(
message: Message,
bot: Bot,
app_config: dict,
state_storage: JsonStateStorage,
actor_lookup: dict,
settings,
) -> None:
user_id = message.from_user.id
if not is_allowed(user_id, actor_lookup, settings.admin_ids):
await message.answer("У вас нет доступа к обновлению.")
return
await update_channel_post(bot, app_config, state_storage, settings)
await message.answer("Сообщение канала обновлено.")
@router.message(Command("template_dump"))
async def template_dump_handler(message: Message, actor_lookup: dict, settings) -> None:
user_id = message.from_user.id
if not is_allowed(user_id, actor_lookup, settings.admin_ids):
await message.answer("У вас нет доступа к шаблонам поста.")
return
if message.reply_to_message is None:
await message.answer("Используйте /template_dump ответом на оформленное сообщение.")
return
source = message.reply_to_message
if source.html_text:
await message.answer(source.html_text)
return
await message.answer("У сообщения нет HTML-представления, которое можно извлечь.")
@router.message(Command("post"))
async def post_handler(
message: Message,
state: FSMContext,
state_storage: JsonStateStorage,
actor_lookup: dict,
settings,
) -> None:
user_id = message.from_user.id
if not is_allowed(user_id, actor_lookup, settings.admin_ids):
await message.answer("У вас нет доступа к шаблонам поста.")
return
template = extract_template_text(message)
if template is not None:
normalized = normalize_template_placeholders(template)
save_post_template(state_storage, normalized)
await state.clear()
validation_error = validate_template_structure(normalized)
if validation_error:
await message.answer(validation_error)
return
if message.text and message.text.startswith("/post "):
await message.answer(
"Шаблон сохранен.\n"
"Но если нужна готовая разметка, ссылки и premium emoji, лучше использовать /post ответом на уже оформленное сообщение."
)
return
await message.answer("Шаблон поста сохранен.")
return
await state.set_state(SessionForm.waiting_for_post_template)
await message.answer(
"Перешлите или отправьте шаблон одним сообщением.\n"
"Используйте {{actors}} для общего блока или {{actor:key}} для конкретного актера.\n"
"Для скрытой ссылки можно использовать {{hidden_link}}."
)
@router.callback_query(F.data == "nav:panel")
async def panel_callback(callback: CallbackQuery, state: FSMContext, app_config: dict, actor_lookup: dict, settings) -> None:
await state.clear()
user_id = callback.from_user.id
if not is_allowed(user_id, actor_lookup, settings.admin_ids):
await callback.answer("Нет доступа.", show_alert=True)
return
await show_panel(callback, user_id, app_config, settings)
@router.callback_query(F.data.startswith("actor:"))
async def actor_handler(
callback: CallbackQuery,
state: FSMContext,
settings,
actor_lookup: dict,
app_config: dict,
state_storage: JsonStateStorage,
) -> None:
await state.clear()
user_id = callback.from_user.id
if not is_allowed(user_id, actor_lookup, settings.admin_ids):
await callback.answer("Нет доступа.", show_alert=True)
return
actor_key = callback.data.split(":", maxsplit=1)[1]
actor = find_actor(app_config, actor_key)
if actor is None:
await callback.answer("Персонаж не найден.", show_alert=True)
return
if user_id not in settings.admin_ids and user_id not in actor_operator_ids(actor):
await callback.answer("Можно менять только свой статус.", show_alert=True)
return
await show_actor_status_menu(callback, actor, state_storage)
await callback.answer()
@router.callback_query(F.data.startswith("status:"))
async def status_handler(
callback: CallbackQuery,
bot: Bot,
settings,
actor_lookup: dict,
app_config: dict,
state_storage: JsonStateStorage,
) -> None:
user_id = callback.from_user.id
if not is_allowed(user_id, actor_lookup, settings.admin_ids):
await callback.answer("Нет доступа.", show_alert=True)
return
_, actor_key, status_key = callback.data.split(":")
actor = find_actor(app_config, actor_key)
if actor is None:
await callback.answer("Персонаж не найден.", show_alert=True)
return
if user_id not in settings.admin_ids and user_id not in actor_operator_ids(actor):
await callback.answer("Можно менять только свой статус.", show_alert=True)
return
try:
await apply_status_update(
bot=bot,
actor_key=actor_key,
status_key=status_key,
phrase=actor.get("phrases", {}).get(status_key, ""),
user_id=user_id,
app_config=app_config,
state_storage=state_storage,
settings=settings,
)
except TelegramBadRequest as exc:
logging.exception("Failed to edit channel message")
await callback.answer("Не удалось обновить сообщение канала.", show_alert=True)
await safe_edit_message(
callback,
f"Статус сохранен, но сообщение канала не обновилось:\n{exc}",
build_back_to_panel_keyboard().as_markup(),
)
return
await safe_edit_message(
callback,
f"Готово.\n{actor['display_name']} -> {STATUS_CHOICES[status_key]}",
build_back_to_panel_keyboard().as_markup(),
)
await callback.answer("Готово")
@router.callback_query(F.data.startswith("custom:"))
async def custom_phrase_handler(
callback: CallbackQuery,
state: FSMContext,
settings,
actor_lookup: dict,
app_config: dict,
state_storage: JsonStateStorage,
) -> None:
user_id = callback.from_user.id
if not is_allowed(user_id, actor_lookup, settings.admin_ids):
await callback.answer("Нет доступа.", show_alert=True)
return
_, actor_key = callback.data.split(":")
actor = find_actor(app_config, actor_key)
if actor is None:
await callback.answer("Персонаж не найден.", show_alert=True)
return
if user_id not in settings.admin_ids and user_id not in actor_operator_ids(actor):
await callback.answer("Можно менять только свой статус.", show_alert=True)
return
runtime_state = get_actor_runtime_state(actor, state_storage)
status_key = runtime_state.get("status", actor.get("default_status", "backstage"))
await state.set_state(SessionForm.waiting_for_custom_phrase)
await state.update_data(actor_key=actor_key, status_key=status_key)
await safe_edit_message(
callback,
(
f"Введите свою фразу для {actor['display_name']}.\n"
f"Текущий статус останется: {STATUS_CHOICES.get(status_key, status_key)}."
),
build_back_to_actor_keyboard(actor_key).as_markup(),
)
await callback.answer()
@router.message(SessionForm.waiting_for_custom_phrase)
async def phrase_handler(
message: Message,
state: FSMContext,
bot: Bot,
app_config: dict,
state_storage: JsonStateStorage,
settings,
) -> None:
if message.text is None:
await message.answer("Нужен текст сообщения.")
return
data = await state.get_data()
actor_key = data["actor_key"]
status_key = data["status_key"]
actor = find_actor(app_config, actor_key)
if actor is None:
await state.clear()
await message.answer("Персонаж не найден.")
return
phrase = message.text.strip()
if not phrase:
await message.answer("Фраза не должна быть пустой.")
return
try:
await apply_status_update(
bot=bot,
actor_key=actor_key,
status_key=status_key,
phrase=phrase,
user_id=message.from_user.id,
app_config=app_config,
state_storage=state_storage,
settings=settings,
)
except TelegramBadRequest as exc:
logging.exception("Failed to edit channel message")
await message.answer(f"Статус сохранен, но сообщение канала не обновилось:\n{exc}")
await state.clear()
return
await state.clear()
await message.answer(
f"Обновлено: {actor['display_name']} -> {STATUS_CHOICES[status_key].lower()}.",
reply_markup=build_back_to_panel_keyboard().as_markup(),
)
@router.message(SessionForm.waiting_for_post_template)
async def post_template_handler(
message: Message,
state: FSMContext,
state_storage: JsonStateStorage,
) -> None:
template = extract_template_text(message)
if template is None:
await message.answer("Нужен текстовый шаблон. Перешлите текстовый пост или отправьте текст.")
return
normalized = normalize_template_placeholders(template)
save_post_template(state_storage, normalized)
await state.clear()
validation_error = validate_template_structure(normalized)
if validation_error:
await message.answer(validation_error)
return
await message.answer("Шаблон поста сохранен.")
def build_dispatcher(app_config: dict, settings, state_storage: JsonStateStorage) -> Dispatcher:
dispatcher = Dispatcher(storage=MemoryStorage())
dispatcher["app_config"] = app_config
dispatcher["actor_lookup"] = build_actor_lookup(app_config)
dispatcher["settings"] = settings
dispatcher["state_storage"] = state_storage
dispatcher.include_router(router)
return dispatcher
async def main() -> None:
logging.basicConfig(level=logging.INFO)
settings = load_settings()
app_config = load_actor_config(settings.config_path)
if settings.hidden_link_url:
app_config["hidden_link_url"] = settings.hidden_link_url
if settings.hidden_link_char:
app_config["hidden_link_char"] = settings.hidden_link_char
state_storage = JsonStateStorage(settings.state_path)
bot = Bot(token=settings.bot_token)
dispatcher = build_dispatcher(app_config, settings, state_storage)
await dispatcher.start_polling(bot)
def run() -> None:
asyncio.run(main())