Files
Otkritiebot/session_bot/bot.py
Verum c286039df7
Some checks failed
CI / Lint (ruff + mypy) (push) Failing after 31s
CI / Run tests (push) Has been skipped
CI / Docker build test (push) Successful in 10s
12312421
2026-04-02 20:36:20 +07:00

501 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import asyncio
import logging
from typing import Any
from aiogram import Bot, Dispatcher, F, Router
from aiogram.enums import ParseMode
from aiogram.exceptions import TelegramBadRequest
from aiogram.filters import Command, CommandStart
from aiogram.fsm.context import FSMContext
from aiogram.fsm.state import State, StatesGroup
from aiogram.fsm.storage.memory import MemoryStorage
from aiogram.types import CallbackQuery, Message
from aiogram.utils.keyboard import InlineKeyboardBuilder
from session_bot.config import load_actor_config, load_settings
from session_bot.render import build_channel_text
from session_bot.storage import JsonStateStorage
router = Router()
STATUS_CHOICES = {
"open": "Открыть",
"backstage": "Закулисье",
"delay": "Задержка",
"rest": "Антракт",
}
HELP_TEXT = (
"<b>Команды</b>\n"
"/start или /panel - открыть панель\n"
"/help - показать эту справку\n"
"/refresh - принудительно обновить сообщение канала\n"
"/cancel - сбросить текущий ввод и вернуться к панели\n\n"
"<b>Как пользоваться</b>\n"
"1. Откройте панель.\n"
"2. Выберите своего актера.\n"
"3. Выберите статус.\n"
"4. Выберите вариант фразы кнопкой: шаблон, без фразы или своя.\n"
"5. Если выбрана своя фраза, отправьте текст одним сообщением.\n\n"
"<b>Настройка</b>\n"
"ENV: BOT_TOKEN, CHANNEL_ID, CHANNEL_MESSAGE_ID, ADMIN_IDS.\n"
"Актеры и шаблоны лежат в config/actors.json.\n"
"Текущие статусы сохраняются в data/state.json."
)
class SessionForm(StatesGroup):
waiting_for_custom_phrase = State()
def actor_operator_ids(actor: dict[str, Any]) -> set[int]:
if "operator_user_ids" in actor:
return {int(user_id) for user_id in actor["operator_user_ids"]}
return {int(actor["operator_user_id"])}
def build_actor_lookup(config: dict) -> dict[int, dict[str, Any]]:
lookup: dict[int, dict[str, Any]] = {}
for actor in config["actors"]:
for user_id in actor_operator_ids(actor):
lookup[user_id] = actor
return lookup
def is_allowed(user_id: int, actor_lookup: dict[int, dict[str, Any]], admin_ids: set[int]) -> bool:
return user_id in actor_lookup or user_id in admin_ids
def build_actor_keyboard(config: dict, user_id: int, admin_ids: set[int]) -> InlineKeyboardBuilder:
keyboard = InlineKeyboardBuilder()
for actor in config["actors"]:
if user_id in admin_ids or user_id in actor_operator_ids(actor):
keyboard.button(text=actor["button_text"], callback_data=f"actor:{actor['key']}")
keyboard.adjust(2)
return keyboard
def build_status_keyboard(actor_key: str) -> InlineKeyboardBuilder:
keyboard = InlineKeyboardBuilder()
for status_key, title in STATUS_CHOICES.items():
keyboard.button(text=title, callback_data=f"status:{actor_key}:{status_key}")
keyboard.button(text="⬅️ Назад", callback_data="nav:panel")
keyboard.adjust(2)
return keyboard
def build_phrase_keyboard(actor_key: str, status_key: str, default_phrase: str) -> InlineKeyboardBuilder:
keyboard = InlineKeyboardBuilder()
if default_phrase:
keyboard.button(
text="Шаблон",
callback_data=f"apply:template:{actor_key}:{status_key}",
)
keyboard.button(
text="Без фразы",
callback_data=f"apply:empty:{actor_key}:{status_key}",
)
keyboard.button(
text="Своя фраза",
callback_data=f"custom:{actor_key}:{status_key}",
)
keyboard.button(
text="⬅️ Назад",
callback_data=f"nav:actor:{actor_key}",
)
keyboard.adjust(2)
return keyboard
def build_back_to_panel_keyboard() -> InlineKeyboardBuilder:
keyboard = InlineKeyboardBuilder()
keyboard.button(text="К панели", callback_data="nav:panel")
return keyboard
def find_actor(app_config: dict, actor_key: str) -> dict[str, Any] | None:
return next((item for item in app_config["actors"] if item["key"] == actor_key), None)
async def safe_edit_message(callback: CallbackQuery, text: str, reply_markup=None) -> None:
try:
await callback.message.edit_text(
text,
reply_markup=reply_markup,
parse_mode=ParseMode.HTML,
disable_web_page_preview=True,
)
except TelegramBadRequest as exc:
if "message is not modified" not in str(exc).lower():
raise
async def show_panel(target: Message | CallbackQuery, user_id: int, app_config: dict, settings) -> None:
keyboard = build_actor_keyboard(app_config, user_id, settings.admin_ids)
text = "Выберите персонажа для смены статуса."
if isinstance(target, CallbackQuery):
await safe_edit_message(target, text, keyboard.as_markup())
await target.answer()
return
await target.answer(text, reply_markup=keyboard.as_markup())
async def show_actor_status_menu(callback: CallbackQuery, actor: dict[str, Any]) -> None:
keyboard = build_status_keyboard(actor["key"])
await safe_edit_message(
callback,
f"Выбран <b>{actor['display_name']}</b>.\nКакой статус поставить?",
keyboard.as_markup(),
)
async def update_channel_post(bot: Bot, app_config: dict, state_storage: JsonStateStorage, settings) -> None:
state = state_storage.load()
text = build_channel_text(app_config, state)
await bot.edit_message_text(
chat_id=settings.channel_id,
message_id=settings.channel_message_id,
text=text,
parse_mode=ParseMode.HTML,
disable_web_page_preview=True,
)
@router.message(CommandStart())
@router.message(Command("panel"))
async def start_handler(message: Message, state: FSMContext, app_config: dict, actor_lookup: dict, settings) -> None:
await state.clear()
user_id = message.from_user.id
if not is_allowed(user_id, actor_lookup, settings.admin_ids):
await message.answer("У вас нет доступа к этой панели.")
return
await show_panel(message, user_id, app_config, settings)
@router.message(Command("help"))
async def help_handler(message: Message) -> None:
await message.answer(HELP_TEXT, parse_mode=ParseMode.HTML, disable_web_page_preview=True)
@router.message(Command("cancel"))
async def cancel_handler(message: Message, state: FSMContext, app_config: dict, actor_lookup: dict, settings) -> None:
await state.clear()
user_id = message.from_user.id
if not is_allowed(user_id, actor_lookup, settings.admin_ids):
await message.answer("Состояние очищено.")
return
await show_panel(message, user_id, app_config, settings)
@router.message(Command("refresh"))
async def refresh_handler(
message: Message,
bot: Bot,
app_config: dict,
state_storage: JsonStateStorage,
actor_lookup: dict,
settings,
) -> None:
user_id = message.from_user.id
if not is_allowed(user_id, actor_lookup, settings.admin_ids):
await message.answer("У вас нет доступа к обновлению.")
return
await update_channel_post(bot, app_config, state_storage, settings)
await message.answer("Сообщение канала обновлено.")
@router.callback_query(F.data == "nav:panel")
async def panel_callback(callback: CallbackQuery, state: FSMContext, app_config: dict, actor_lookup: dict, settings) -> None:
await state.clear()
user_id = callback.from_user.id
if not is_allowed(user_id, actor_lookup, settings.admin_ids):
await callback.answer("Нет доступа.", show_alert=True)
return
await show_panel(callback, user_id, app_config, settings)
@router.callback_query(F.data.startswith("actor:"))
async def actor_handler(callback: CallbackQuery, state: FSMContext, settings, actor_lookup: dict, app_config: dict) -> None:
await state.clear()
user_id = callback.from_user.id
if not is_allowed(user_id, actor_lookup, settings.admin_ids):
await callback.answer("Нет доступа.", show_alert=True)
return
actor_key = callback.data.split(":", maxsplit=1)[1]
actor = find_actor(app_config, actor_key)
if actor is None:
await callback.answer("Персонаж не найден.", show_alert=True)
return
if user_id not in settings.admin_ids and user_id not in actor_operator_ids(actor):
await callback.answer("Можно менять только свой статус.", show_alert=True)
return
await show_actor_status_menu(callback, actor)
await callback.answer()
@router.callback_query(F.data.startswith("nav:actor:"))
async def actor_back_handler(callback: CallbackQuery, state: FSMContext, settings, actor_lookup: dict, app_config: dict) -> None:
await state.clear()
actor_key = callback.data.split(":", maxsplit=2)[2]
user_id = callback.from_user.id
if not is_allowed(user_id, actor_lookup, settings.admin_ids):
await callback.answer("Нет доступа.", show_alert=True)
return
actor = find_actor(app_config, actor_key)
if actor is None:
await callback.answer("Персонаж не найден.", show_alert=True)
return
if user_id not in settings.admin_ids and user_id not in actor_operator_ids(actor):
await callback.answer("Можно менять только свой статус.", show_alert=True)
return
await show_actor_status_menu(callback, actor)
await callback.answer()
@router.callback_query(F.data.startswith("status:"))
async def status_handler(
callback: CallbackQuery,
settings,
actor_lookup: dict,
app_config: dict,
) -> None:
user_id = callback.from_user.id
if not is_allowed(user_id, actor_lookup, settings.admin_ids):
await callback.answer("Нет доступа.", show_alert=True)
return
_, actor_key, status_key = callback.data.split(":")
actor = find_actor(app_config, actor_key)
if actor is None:
await callback.answer("Персонаж не найден.", show_alert=True)
return
if user_id not in settings.admin_ids and user_id not in actor_operator_ids(actor):
await callback.answer("Можно менять только свой статус.", show_alert=True)
return
default_phrase = actor.get("phrases", {}).get(status_key, "")
prompt_parts = [
f"Статус для <b>{actor['display_name']}</b>: <b>{STATUS_CHOICES[status_key]}</b>.",
"Выберите, как оформить подпись.",
]
if default_phrase:
prompt_parts.append(f"Шаблон: <code>{default_phrase}</code>")
else:
prompt_parts.append("Шаблон не задан.")
await safe_edit_message(
callback,
"\n".join(prompt_parts),
build_phrase_keyboard(actor_key, status_key, default_phrase).as_markup(),
)
await callback.answer()
async def apply_status_update(
bot: Bot,
actor_key: str,
status_key: str,
phrase: str,
user_id: int,
app_config: dict,
state_storage: JsonStateStorage,
settings,
) -> None:
payload = state_storage.load()
payload.setdefault("actors", {})
payload["actors"][actor_key] = {
"status": status_key,
"phrase": phrase,
"updated_by": user_id,
}
state_storage.save(payload)
await update_channel_post(bot, app_config, state_storage, settings)
@router.callback_query(F.data.startswith("apply:"))
async def apply_phrase_handler(
callback: CallbackQuery,
state: FSMContext,
bot: Bot,
app_config: dict,
state_storage: JsonStateStorage,
settings,
actor_lookup: dict,
) -> None:
user_id = callback.from_user.id
if not is_allowed(user_id, actor_lookup, settings.admin_ids):
await callback.answer("Нет доступа.", show_alert=True)
return
_, mode, actor_key, status_key = callback.data.split(":")
actor = find_actor(app_config, actor_key)
if actor is None:
await callback.answer("Персонаж не найден.", show_alert=True)
return
if user_id not in settings.admin_ids and user_id not in actor_operator_ids(actor):
await callback.answer("Можно менять только свой статус.", show_alert=True)
return
phrase = ""
if mode == "template":
phrase = actor.get("phrases", {}).get(status_key, "")
try:
await apply_status_update(
bot=bot,
actor_key=actor_key,
status_key=status_key,
phrase=phrase,
user_id=user_id,
app_config=app_config,
state_storage=state_storage,
settings=settings,
)
except TelegramBadRequest as exc:
logging.exception("Failed to edit channel message")
await callback.answer("Не удалось обновить сообщение канала.", show_alert=True)
await safe_edit_message(
callback,
f"Статус сохранен, но сообщение канала не обновилось:\n<code>{exc}</code>",
build_back_to_panel_keyboard().as_markup(),
)
await state.clear()
return
await state.clear()
await safe_edit_message(
callback,
f"Обновлено: <b>{actor['display_name']}</b> -> <b>{STATUS_CHOICES[status_key].lower()}</b>.",
build_back_to_panel_keyboard().as_markup(),
)
await callback.answer("Готово")
@router.callback_query(F.data.startswith("custom:"))
async def custom_phrase_handler(
callback: CallbackQuery,
state: FSMContext,
settings,
actor_lookup: dict,
app_config: dict,
) -> None:
user_id = callback.from_user.id
if not is_allowed(user_id, actor_lookup, settings.admin_ids):
await callback.answer("Нет доступа.", show_alert=True)
return
_, actor_key, status_key = callback.data.split(":")
actor = find_actor(app_config, actor_key)
if actor is None:
await callback.answer("Персонаж не найден.", show_alert=True)
return
if user_id not in settings.admin_ids and user_id not in actor_operator_ids(actor):
await callback.answer("Можно менять только свой статус.", show_alert=True)
return
await state.set_state(SessionForm.waiting_for_custom_phrase)
await state.update_data(actor_key=actor_key, status_key=status_key)
await safe_edit_message(
callback,
(
f"Введите свою фразу для <b>{actor['display_name']}</b>.\n"
"Она будет добавлена под строкой статуса.\n"
"Если передумали, нажмите кнопку ниже."
),
build_phrase_keyboard(actor_key, status_key, actor.get("phrases", {}).get(status_key, "")).as_markup(),
)
await callback.answer()
@router.message(SessionForm.waiting_for_custom_phrase)
async def phrase_handler(
message: Message,
state: FSMContext,
bot: Bot,
app_config: dict,
state_storage: JsonStateStorage,
settings,
) -> None:
if message.text is None:
await message.answer("Нужен текст сообщения.")
return
data = await state.get_data()
actor_key = data["actor_key"]
status_key = data["status_key"]
actor = find_actor(app_config, actor_key)
if actor is None:
await state.clear()
await message.answer("Персонаж не найден.")
return
phrase = message.text.strip()
if not phrase:
await message.answer("Фраза не должна быть пустой.")
return
try:
await apply_status_update(
bot=bot,
actor_key=actor_key,
status_key=status_key,
phrase=phrase,
user_id=message.from_user.id,
app_config=app_config,
state_storage=state_storage,
settings=settings,
)
except TelegramBadRequest as exc:
logging.exception("Failed to edit channel message")
await message.answer(
f"Статус сохранен, но сообщение канала не обновилось:\n<code>{exc}</code>",
parse_mode=ParseMode.HTML,
)
await state.clear()
return
await state.clear()
await message.answer(
f"Обновлено: <b>{actor['display_name']}</b> -> <b>{STATUS_CHOICES[status_key].lower()}</b>.",
parse_mode=ParseMode.HTML,
reply_markup=build_back_to_panel_keyboard().as_markup(),
)
def build_dispatcher(app_config: dict, settings, state_storage: JsonStateStorage) -> Dispatcher:
dispatcher = Dispatcher(storage=MemoryStorage())
dispatcher["app_config"] = app_config
dispatcher["actor_lookup"] = build_actor_lookup(app_config)
dispatcher["settings"] = settings
dispatcher["state_storage"] = state_storage
dispatcher.include_router(router)
return dispatcher
async def main() -> None:
logging.basicConfig(level=logging.INFO)
settings = load_settings()
app_config = load_actor_config(settings.config_path)
state_storage = JsonStateStorage(settings.state_path)
bot = Bot(token=settings.bot_token)
dispatcher = build_dispatcher(app_config, settings, state_storage)
await dispatcher.start_polling(bot)
def run() -> None:
asyncio.run(main())