527 lines
19 KiB
Python
527 lines
19 KiB
Python
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import logging
|
||
import re
|
||
from typing import Any
|
||
|
||
from aiogram import Bot, Dispatcher, F, Router
|
||
from aiogram.enums import ParseMode
|
||
from aiogram.exceptions import TelegramBadRequest
|
||
from aiogram.filters import Command, CommandStart
|
||
from aiogram.fsm.context import FSMContext
|
||
from aiogram.fsm.state import State, StatesGroup
|
||
from aiogram.fsm.storage.memory import MemoryStorage
|
||
from aiogram.types import CallbackQuery, Message
|
||
from aiogram.utils.keyboard import InlineKeyboardBuilder
|
||
|
||
from session_bot.config import load_actor_config, load_settings
|
||
from session_bot.render import build_channel_text
|
||
from session_bot.storage import JsonStateStorage
|
||
|
||
router = Router()
|
||
|
||
STATUS_CHOICES = {
|
||
"open": "Открыть",
|
||
"backstage": "Закулисье",
|
||
"delay": "Задержка",
|
||
"rest": "Антракт",
|
||
}
|
||
|
||
HELP_TEXT = (
|
||
"Команды\n"
|
||
"/start или /panel - открыть панель\n"
|
||
"/help - показать справку\n"
|
||
"/refresh - перерисовать пост в канале\n"
|
||
"/cancel - сбросить текущий ввод\n"
|
||
"/post - сохранить шаблон поста с плейсхолдером {{actors}}\n\n"
|
||
"Как пользоваться\n"
|
||
"1. Откройте панель.\n"
|
||
"2. Выберите своего актера.\n"
|
||
"3. Нажмите один из статусов. Он применится сразу с шаблонной фразой.\n"
|
||
"4. Если нужен свой текст, нажмите 'Своя фраза' и отправьте его.\n\n"
|
||
"Шаблон поста\n"
|
||
"В шаблоне должен быть {{actors}} - туда бот подставляет актерский блок.\n"
|
||
"Можно добавить {{hidden_link}} в начало, либо указать hidden_link_url в config/actors.json.\n"
|
||
"Шаблон лучше отправлять обычным текстом или реплаем на текстовый пост.\n"
|
||
"Точное восстановление исходного MarkdownV2 из пересланного оформленного поста Telegram не гарантируется."
|
||
)
|
||
|
||
|
||
class SessionForm(StatesGroup):
|
||
waiting_for_custom_phrase = State()
|
||
waiting_for_post_template = State()
|
||
|
||
|
||
def actor_operator_ids(actor: dict[str, Any]) -> set[int]:
|
||
if "operator_user_ids" in actor:
|
||
return {int(user_id) for user_id in actor["operator_user_ids"]}
|
||
return {int(actor["operator_user_id"])}
|
||
|
||
|
||
def build_actor_lookup(config: dict) -> dict[int, dict[str, Any]]:
|
||
lookup: dict[int, dict[str, Any]] = {}
|
||
for actor in config["actors"]:
|
||
for user_id in actor_operator_ids(actor):
|
||
lookup[user_id] = actor
|
||
return lookup
|
||
|
||
|
||
def is_allowed(user_id: int, actor_lookup: dict[int, dict[str, Any]], admin_ids: set[int]) -> bool:
|
||
return user_id in actor_lookup or user_id in admin_ids
|
||
|
||
|
||
def build_actor_keyboard(config: dict, user_id: int, admin_ids: set[int]) -> InlineKeyboardBuilder:
|
||
keyboard = InlineKeyboardBuilder()
|
||
for actor in config["actors"]:
|
||
if user_id in admin_ids or user_id in actor_operator_ids(actor):
|
||
keyboard.button(text=actor["button_text"], callback_data=f"actor:{actor['key']}")
|
||
keyboard.adjust(2)
|
||
return keyboard
|
||
|
||
|
||
def build_status_keyboard(actor_key: str) -> InlineKeyboardBuilder:
|
||
keyboard = InlineKeyboardBuilder()
|
||
for status_key, title in STATUS_CHOICES.items():
|
||
keyboard.button(text=title, callback_data=f"status:{actor_key}:{status_key}")
|
||
keyboard.button(text="Своя фраза", callback_data=f"custom:{actor_key}")
|
||
keyboard.button(text="⬅️ Назад", callback_data="nav:panel")
|
||
keyboard.adjust(2)
|
||
return keyboard
|
||
|
||
|
||
def build_back_to_panel_keyboard() -> InlineKeyboardBuilder:
|
||
keyboard = InlineKeyboardBuilder()
|
||
keyboard.button(text="К панели", callback_data="nav:panel")
|
||
return keyboard
|
||
|
||
|
||
def find_actor(app_config: dict, actor_key: str) -> dict[str, Any] | None:
|
||
return next((item for item in app_config["actors"] if item["key"] == actor_key), None)
|
||
|
||
|
||
def get_actor_runtime_state(actor: dict[str, Any], state_storage: JsonStateStorage) -> dict[str, Any]:
|
||
payload = state_storage.load()
|
||
return payload.get("actors", {}).get(actor["key"], {})
|
||
|
||
|
||
def extract_template_text(message: Message) -> str | None:
|
||
if message.text and message.text.startswith("/post "):
|
||
return message.text.split(maxsplit=1)[1]
|
||
|
||
source = message.reply_to_message or message
|
||
if source.text:
|
||
return source.md_text
|
||
return None
|
||
|
||
|
||
def normalize_template_placeholders(template: str) -> str:
|
||
normalized = template
|
||
normalized = re.sub(r"\\\{\\\{\s*actors\s*\\\}\\\}", "{{actors}}", normalized, flags=re.IGNORECASE)
|
||
normalized = re.sub(r"\{\{\s*actors\s*\}\}", "{{actors}}", normalized, flags=re.IGNORECASE)
|
||
normalized = re.sub(
|
||
r"\\\{\\\{\s*hidden_link\s*\\\}\\\}",
|
||
"{{hidden_link}}",
|
||
normalized,
|
||
flags=re.IGNORECASE,
|
||
)
|
||
normalized = re.sub(r"\{\{\s*hidden_link\s*\}\}", "{{hidden_link}}", normalized, flags=re.IGNORECASE)
|
||
return normalized
|
||
|
||
|
||
async def safe_edit_message(callback: CallbackQuery, text: str, reply_markup=None) -> None:
|
||
try:
|
||
await callback.message.edit_text(
|
||
text,
|
||
reply_markup=reply_markup,
|
||
disable_web_page_preview=True,
|
||
)
|
||
except TelegramBadRequest as exc:
|
||
if "message is not modified" not in str(exc).lower():
|
||
raise
|
||
|
||
|
||
async def show_panel(target: Message | CallbackQuery, user_id: int, app_config: dict, settings) -> None:
|
||
keyboard = build_actor_keyboard(app_config, user_id, settings.admin_ids)
|
||
text = "Выберите персонажа для смены статуса."
|
||
|
||
if isinstance(target, CallbackQuery):
|
||
await safe_edit_message(target, text, keyboard.as_markup())
|
||
await target.answer()
|
||
return
|
||
|
||
await target.answer(text, reply_markup=keyboard.as_markup())
|
||
|
||
|
||
async def show_actor_status_menu(
|
||
callback: CallbackQuery,
|
||
actor: dict[str, Any],
|
||
state_storage: JsonStateStorage,
|
||
) -> None:
|
||
runtime_state = get_actor_runtime_state(actor, state_storage)
|
||
current_status = runtime_state.get("status", actor.get("default_status", "backstage"))
|
||
current_phrase = runtime_state.get("phrase", actor.get("phrases", {}).get(current_status, ""))
|
||
|
||
lines = [
|
||
f"Выбран {actor['display_name']}.",
|
||
f"Текущий статус: {STATUS_CHOICES.get(current_status, current_status)}.",
|
||
]
|
||
if current_phrase:
|
||
lines.append(f"Текущая фраза: {current_phrase}")
|
||
|
||
await safe_edit_message(callback, "\n".join(lines), build_status_keyboard(actor["key"]).as_markup())
|
||
|
||
|
||
async def update_channel_post(bot: Bot, app_config: dict, state_storage: JsonStateStorage, settings) -> None:
|
||
state = state_storage.load()
|
||
text = build_channel_text(app_config, state)
|
||
await bot.edit_message_text(
|
||
chat_id=settings.channel_id,
|
||
message_id=settings.channel_message_id,
|
||
text=text,
|
||
parse_mode=ParseMode.MARKDOWN_V2,
|
||
disable_web_page_preview=False,
|
||
)
|
||
|
||
|
||
async def apply_status_update(
|
||
bot: Bot,
|
||
actor_key: str,
|
||
status_key: str,
|
||
phrase: str,
|
||
user_id: int,
|
||
app_config: dict,
|
||
state_storage: JsonStateStorage,
|
||
settings,
|
||
) -> None:
|
||
payload = state_storage.load()
|
||
payload.setdefault("actors", {})
|
||
payload["actors"][actor_key] = {
|
||
"status": status_key,
|
||
"phrase": phrase,
|
||
"updated_by": user_id,
|
||
}
|
||
state_storage.save(payload)
|
||
await update_channel_post(bot, app_config, state_storage, settings)
|
||
|
||
|
||
def save_post_template(state_storage: JsonStateStorage, template: str) -> None:
|
||
payload = state_storage.load()
|
||
payload["template"] = {"text": normalize_template_placeholders(template)}
|
||
state_storage.save(payload)
|
||
|
||
|
||
@router.message(CommandStart())
|
||
@router.message(Command("panel"))
|
||
async def start_handler(message: Message, state: FSMContext, app_config: dict, actor_lookup: dict, settings) -> None:
|
||
await state.clear()
|
||
user_id = message.from_user.id
|
||
|
||
if not is_allowed(user_id, actor_lookup, settings.admin_ids):
|
||
await message.answer("У вас нет доступа к этой панели.")
|
||
return
|
||
|
||
await show_panel(message, user_id, app_config, settings)
|
||
|
||
|
||
@router.message(Command("help"))
|
||
async def help_handler(message: Message) -> None:
|
||
await message.answer(HELP_TEXT, disable_web_page_preview=True)
|
||
|
||
|
||
@router.message(Command("cancel"))
|
||
async def cancel_handler(message: Message, state: FSMContext, app_config: dict, actor_lookup: dict, settings) -> None:
|
||
await state.clear()
|
||
user_id = message.from_user.id
|
||
if not is_allowed(user_id, actor_lookup, settings.admin_ids):
|
||
await message.answer("Состояние очищено.")
|
||
return
|
||
await show_panel(message, user_id, app_config, settings)
|
||
|
||
|
||
@router.message(Command("refresh"))
|
||
async def refresh_handler(
|
||
message: Message,
|
||
bot: Bot,
|
||
app_config: dict,
|
||
state_storage: JsonStateStorage,
|
||
actor_lookup: dict,
|
||
settings,
|
||
) -> None:
|
||
user_id = message.from_user.id
|
||
if not is_allowed(user_id, actor_lookup, settings.admin_ids):
|
||
await message.answer("У вас нет доступа к обновлению.")
|
||
return
|
||
|
||
await update_channel_post(bot, app_config, state_storage, settings)
|
||
await message.answer("Сообщение канала обновлено.")
|
||
|
||
|
||
@router.message(Command("post"))
|
||
async def post_handler(
|
||
message: Message,
|
||
state: FSMContext,
|
||
state_storage: JsonStateStorage,
|
||
actor_lookup: dict,
|
||
settings,
|
||
) -> None:
|
||
user_id = message.from_user.id
|
||
if not is_allowed(user_id, actor_lookup, settings.admin_ids):
|
||
await message.answer("У вас нет доступа к шаблонам поста.")
|
||
return
|
||
|
||
template = extract_template_text(message)
|
||
if template is not None:
|
||
normalized = normalize_template_placeholders(template)
|
||
save_post_template(state_storage, normalized)
|
||
await state.clear()
|
||
if "{{actors}}" not in normalized:
|
||
await message.answer(
|
||
"Шаблон сохранен, но в нем нет {{actors}}.\n"
|
||
"Плейсхолдер должен быть именно {{actors}} в любом регистре."
|
||
)
|
||
return
|
||
await message.answer("Шаблон поста сохранен.")
|
||
return
|
||
|
||
await state.set_state(SessionForm.waiting_for_post_template)
|
||
await message.answer(
|
||
"Перешлите или отправьте текст шаблона одним сообщением.\n"
|
||
"Внутри должен быть {{actors}}.\n"
|
||
"Для скрытой ссылки можно использовать {{hidden_link}}."
|
||
)
|
||
|
||
|
||
@router.callback_query(F.data == "nav:panel")
|
||
async def panel_callback(callback: CallbackQuery, state: FSMContext, app_config: dict, actor_lookup: dict, settings) -> None:
|
||
await state.clear()
|
||
user_id = callback.from_user.id
|
||
if not is_allowed(user_id, actor_lookup, settings.admin_ids):
|
||
await callback.answer("Нет доступа.", show_alert=True)
|
||
return
|
||
await show_panel(callback, user_id, app_config, settings)
|
||
|
||
|
||
@router.callback_query(F.data.startswith("actor:"))
|
||
async def actor_handler(
|
||
callback: CallbackQuery,
|
||
state: FSMContext,
|
||
settings,
|
||
actor_lookup: dict,
|
||
app_config: dict,
|
||
state_storage: JsonStateStorage,
|
||
) -> None:
|
||
await state.clear()
|
||
user_id = callback.from_user.id
|
||
if not is_allowed(user_id, actor_lookup, settings.admin_ids):
|
||
await callback.answer("Нет доступа.", show_alert=True)
|
||
return
|
||
|
||
actor_key = callback.data.split(":", maxsplit=1)[1]
|
||
actor = find_actor(app_config, actor_key)
|
||
if actor is None:
|
||
await callback.answer("Персонаж не найден.", show_alert=True)
|
||
return
|
||
|
||
if user_id not in settings.admin_ids and user_id not in actor_operator_ids(actor):
|
||
await callback.answer("Можно менять только свой статус.", show_alert=True)
|
||
return
|
||
|
||
await show_actor_status_menu(callback, actor, state_storage)
|
||
await callback.answer()
|
||
|
||
|
||
@router.callback_query(F.data.startswith("status:"))
|
||
async def status_handler(
|
||
callback: CallbackQuery,
|
||
bot: Bot,
|
||
settings,
|
||
actor_lookup: dict,
|
||
app_config: dict,
|
||
state_storage: JsonStateStorage,
|
||
) -> None:
|
||
user_id = callback.from_user.id
|
||
if not is_allowed(user_id, actor_lookup, settings.admin_ids):
|
||
await callback.answer("Нет доступа.", show_alert=True)
|
||
return
|
||
|
||
_, actor_key, status_key = callback.data.split(":")
|
||
actor = find_actor(app_config, actor_key)
|
||
if actor is None:
|
||
await callback.answer("Персонаж не найден.", show_alert=True)
|
||
return
|
||
|
||
if user_id not in settings.admin_ids and user_id not in actor_operator_ids(actor):
|
||
await callback.answer("Можно менять только свой статус.", show_alert=True)
|
||
return
|
||
|
||
try:
|
||
await apply_status_update(
|
||
bot=bot,
|
||
actor_key=actor_key,
|
||
status_key=status_key,
|
||
phrase=actor.get("phrases", {}).get(status_key, ""),
|
||
user_id=user_id,
|
||
app_config=app_config,
|
||
state_storage=state_storage,
|
||
settings=settings,
|
||
)
|
||
except TelegramBadRequest as exc:
|
||
logging.exception("Failed to edit channel message")
|
||
await callback.answer("Не удалось обновить сообщение канала.", show_alert=True)
|
||
await safe_edit_message(
|
||
callback,
|
||
f"Статус сохранен, но сообщение канала не обновилось:\n{exc}",
|
||
build_back_to_panel_keyboard().as_markup(),
|
||
)
|
||
return
|
||
|
||
await safe_edit_message(
|
||
callback,
|
||
f"Готово.\n{actor['display_name']} -> {STATUS_CHOICES[status_key]}",
|
||
build_back_to_panel_keyboard().as_markup(),
|
||
)
|
||
await callback.answer("Готово")
|
||
|
||
|
||
@router.callback_query(F.data.startswith("custom:"))
|
||
async def custom_phrase_handler(
|
||
callback: CallbackQuery,
|
||
state: FSMContext,
|
||
settings,
|
||
actor_lookup: dict,
|
||
app_config: dict,
|
||
state_storage: JsonStateStorage,
|
||
) -> None:
|
||
user_id = callback.from_user.id
|
||
if not is_allowed(user_id, actor_lookup, settings.admin_ids):
|
||
await callback.answer("Нет доступа.", show_alert=True)
|
||
return
|
||
|
||
_, actor_key = callback.data.split(":")
|
||
actor = find_actor(app_config, actor_key)
|
||
if actor is None:
|
||
await callback.answer("Персонаж не найден.", show_alert=True)
|
||
return
|
||
|
||
if user_id not in settings.admin_ids and user_id not in actor_operator_ids(actor):
|
||
await callback.answer("Можно менять только свой статус.", show_alert=True)
|
||
return
|
||
|
||
runtime_state = get_actor_runtime_state(actor, state_storage)
|
||
status_key = runtime_state.get("status", actor.get("default_status", "backstage"))
|
||
|
||
await state.set_state(SessionForm.waiting_for_custom_phrase)
|
||
await state.update_data(actor_key=actor_key, status_key=status_key)
|
||
await safe_edit_message(
|
||
callback,
|
||
(
|
||
f"Введите свою фразу для {actor['display_name']}.\n"
|
||
f"Текущий статус останется: {STATUS_CHOICES.get(status_key, status_key)}.\n"
|
||
"Если хотите сначала сменить статус, вернитесь назад."
|
||
),
|
||
build_status_keyboard(actor_key).as_markup(),
|
||
)
|
||
await callback.answer()
|
||
|
||
|
||
@router.message(SessionForm.waiting_for_custom_phrase)
|
||
async def phrase_handler(
|
||
message: Message,
|
||
state: FSMContext,
|
||
bot: Bot,
|
||
app_config: dict,
|
||
state_storage: JsonStateStorage,
|
||
settings,
|
||
) -> None:
|
||
if message.text is None:
|
||
await message.answer("Нужен текст сообщения.")
|
||
return
|
||
|
||
data = await state.get_data()
|
||
actor_key = data["actor_key"]
|
||
status_key = data["status_key"]
|
||
actor = find_actor(app_config, actor_key)
|
||
if actor is None:
|
||
await state.clear()
|
||
await message.answer("Персонаж не найден.")
|
||
return
|
||
|
||
phrase = message.text.strip()
|
||
if not phrase:
|
||
await message.answer("Фраза не должна быть пустой.")
|
||
return
|
||
|
||
try:
|
||
await apply_status_update(
|
||
bot=bot,
|
||
actor_key=actor_key,
|
||
status_key=status_key,
|
||
phrase=phrase,
|
||
user_id=message.from_user.id,
|
||
app_config=app_config,
|
||
state_storage=state_storage,
|
||
settings=settings,
|
||
)
|
||
except TelegramBadRequest as exc:
|
||
logging.exception("Failed to edit channel message")
|
||
await message.answer(f"Статус сохранен, но сообщение канала не обновилось:\n{exc}")
|
||
await state.clear()
|
||
return
|
||
|
||
await state.clear()
|
||
await message.answer(
|
||
f"Обновлено: {actor['display_name']} -> {STATUS_CHOICES[status_key].lower()}.",
|
||
reply_markup=build_back_to_panel_keyboard().as_markup(),
|
||
)
|
||
|
||
|
||
@router.message(SessionForm.waiting_for_post_template)
|
||
async def post_template_handler(
|
||
message: Message,
|
||
state: FSMContext,
|
||
state_storage: JsonStateStorage,
|
||
) -> None:
|
||
template = extract_template_text(message)
|
||
if template is None:
|
||
await message.answer("Нужен текстовый шаблон. Перешлите текстовый пост или отправьте текст.")
|
||
return
|
||
|
||
normalized = normalize_template_placeholders(template)
|
||
save_post_template(state_storage, normalized)
|
||
await state.clear()
|
||
|
||
if "{{actors}}" not in normalized:
|
||
await message.answer(
|
||
"Шаблон сохранен, но в нем нет {{actors}}.\n"
|
||
"Плейсхолдер должен быть именно {{actors}} в любом регистре."
|
||
)
|
||
return
|
||
|
||
await message.answer("Шаблон поста сохранен.")
|
||
|
||
|
||
def build_dispatcher(app_config: dict, settings, state_storage: JsonStateStorage) -> Dispatcher:
|
||
dispatcher = Dispatcher(storage=MemoryStorage())
|
||
dispatcher["app_config"] = app_config
|
||
dispatcher["actor_lookup"] = build_actor_lookup(app_config)
|
||
dispatcher["settings"] = settings
|
||
dispatcher["state_storage"] = state_storage
|
||
dispatcher.include_router(router)
|
||
return dispatcher
|
||
|
||
|
||
async def main() -> None:
|
||
logging.basicConfig(level=logging.INFO)
|
||
settings = load_settings()
|
||
app_config = load_actor_config(settings.config_path)
|
||
state_storage = JsonStateStorage(settings.state_path)
|
||
|
||
bot = Bot(token=settings.bot_token)
|
||
dispatcher = build_dispatcher(app_config, settings, state_storage)
|
||
await dispatcher.start_polling(bot)
|
||
|
||
|
||
def run() -> None:
|
||
asyncio.run(main())
|