123
This commit is contained in:
1
session_bot/__init__.py
Normal file
1
session_bot/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Session status bot package."""
|
||||
213
session_bot/bot.py
Normal file
213
session_bot/bot.py
Normal file
@@ -0,0 +1,213 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
from aiogram import Bot, Dispatcher, F, Router
|
||||
from aiogram.exceptions import TelegramBadRequest
|
||||
from aiogram.filters import CommandStart
|
||||
from aiogram.fsm.context import FSMContext
|
||||
from aiogram.fsm.state import State, StatesGroup
|
||||
from aiogram.fsm.storage.memory import MemoryStorage
|
||||
from aiogram.types import CallbackQuery, Message
|
||||
from aiogram.utils.keyboard import InlineKeyboardBuilder
|
||||
|
||||
from session_bot.config import load_actor_config, load_settings
|
||||
from session_bot.render import build_channel_text
|
||||
from session_bot.storage import JsonStateStorage
|
||||
|
||||
router = Router()
|
||||
|
||||
STATUS_CHOICES = {
|
||||
"open": "Открыть",
|
||||
"backstage": "Закулисье",
|
||||
"delay": "Задержка",
|
||||
"rest": "Антракт",
|
||||
}
|
||||
|
||||
|
||||
class SessionForm(StatesGroup):
|
||||
waiting_for_phrase = State()
|
||||
|
||||
|
||||
def build_actor_lookup(config: dict) -> dict[int, dict[str, Any]]:
|
||||
lookup: dict[int, dict[str, Any]] = {}
|
||||
for actor in config["actors"]:
|
||||
lookup[int(actor["operator_user_id"])] = actor
|
||||
return lookup
|
||||
|
||||
|
||||
def is_allowed(user_id: int, actor_lookup: dict[int, dict[str, Any]], admin_ids: set[int]) -> bool:
|
||||
return user_id in actor_lookup or user_id in admin_ids
|
||||
|
||||
|
||||
def build_actor_keyboard(config: dict, user_id: int, admin_ids: set[int]) -> InlineKeyboardBuilder:
|
||||
keyboard = InlineKeyboardBuilder()
|
||||
for actor in config["actors"]:
|
||||
if user_id in admin_ids or int(actor["operator_user_id"]) == user_id:
|
||||
keyboard.button(text=actor["button_text"], callback_data=f"actor:{actor['key']}")
|
||||
keyboard.adjust(2)
|
||||
return keyboard
|
||||
|
||||
|
||||
def build_status_keyboard(actor_key: str) -> InlineKeyboardBuilder:
|
||||
keyboard = InlineKeyboardBuilder()
|
||||
for status_key, title in STATUS_CHOICES.items():
|
||||
keyboard.button(text=title, callback_data=f"status:{actor_key}:{status_key}")
|
||||
keyboard.adjust(2)
|
||||
return keyboard
|
||||
|
||||
|
||||
async def update_channel_post(bot: Bot, app_config: dict, state_storage: JsonStateStorage, settings) -> None:
|
||||
state = state_storage.load()
|
||||
text = build_channel_text(app_config, state)
|
||||
await bot.edit_message_text(
|
||||
chat_id=settings.channel_id,
|
||||
message_id=settings.channel_message_id,
|
||||
text=text,
|
||||
disable_web_page_preview=True,
|
||||
)
|
||||
|
||||
|
||||
@router.message(CommandStart())
|
||||
async def start_handler(message: Message, state: FSMContext, app_config: dict, actor_lookup: dict, settings) -> None:
|
||||
await state.clear()
|
||||
user_id = message.from_user.id
|
||||
|
||||
if not is_allowed(user_id, actor_lookup, settings.admin_ids):
|
||||
await message.answer("У вас нет доступа к этой панели.")
|
||||
return
|
||||
|
||||
keyboard = build_actor_keyboard(app_config, user_id, settings.admin_ids)
|
||||
await message.answer("Выберите персонажа для смены статуса.", reply_markup=keyboard.as_markup())
|
||||
|
||||
|
||||
@router.callback_query(F.data.startswith("actor:"))
|
||||
async def actor_handler(callback: CallbackQuery, settings, actor_lookup: dict, app_config: dict) -> None:
|
||||
user_id = callback.from_user.id
|
||||
if not is_allowed(user_id, actor_lookup, settings.admin_ids):
|
||||
await callback.answer("Нет доступа.", show_alert=True)
|
||||
return
|
||||
|
||||
actor_key = callback.data.split(":", maxsplit=1)[1]
|
||||
actor = next((item for item in app_config["actors"] if item["key"] == actor_key), None)
|
||||
if actor is None:
|
||||
await callback.answer("Персонаж не найден.", show_alert=True)
|
||||
return
|
||||
|
||||
if user_id not in settings.admin_ids and int(actor["operator_user_id"]) != user_id:
|
||||
await callback.answer("Можно менять только свой статус.", show_alert=True)
|
||||
return
|
||||
|
||||
keyboard = build_status_keyboard(actor_key)
|
||||
await callback.message.edit_text(
|
||||
f"Выбран {actor['display_name']}. Какой статус поставить?",
|
||||
reply_markup=keyboard.as_markup(),
|
||||
)
|
||||
await callback.answer()
|
||||
|
||||
|
||||
@router.callback_query(F.data.startswith("status:"))
|
||||
async def status_handler(
|
||||
callback: CallbackQuery,
|
||||
state: FSMContext,
|
||||
settings,
|
||||
actor_lookup: dict,
|
||||
app_config: dict,
|
||||
) -> None:
|
||||
user_id = callback.from_user.id
|
||||
if not is_allowed(user_id, actor_lookup, settings.admin_ids):
|
||||
await callback.answer("Нет доступа.", show_alert=True)
|
||||
return
|
||||
|
||||
_, actor_key, status_key = callback.data.split(":")
|
||||
actor = next((item for item in app_config["actors"] if item["key"] == actor_key), None)
|
||||
if actor is None:
|
||||
await callback.answer("Персонаж не найден.", show_alert=True)
|
||||
return
|
||||
|
||||
if user_id not in settings.admin_ids and int(actor["operator_user_id"]) != user_id:
|
||||
await callback.answer("Можно менять только свой статус.", show_alert=True)
|
||||
return
|
||||
|
||||
default_phrase = actor.get("phrases", {}).get(status_key, "")
|
||||
await state.set_state(SessionForm.waiting_for_phrase)
|
||||
await state.update_data(actor_key=actor_key, status_key=status_key)
|
||||
|
||||
prompt = (
|
||||
f"Статус для {actor['display_name']}: {STATUS_CHOICES[status_key]}.\n"
|
||||
"Отправьте фразу для публикации.\n"
|
||||
"Если хотите использовать шаблон, отправьте точку: .\n"
|
||||
f"Шаблон сейчас: {default_phrase or 'не задан'}"
|
||||
)
|
||||
await callback.message.edit_text(prompt)
|
||||
await callback.answer()
|
||||
|
||||
|
||||
@router.message(SessionForm.waiting_for_phrase)
|
||||
async def phrase_handler(
|
||||
message: Message,
|
||||
state: FSMContext,
|
||||
bot: Bot,
|
||||
app_config: dict,
|
||||
state_storage: JsonStateStorage,
|
||||
settings,
|
||||
) -> None:
|
||||
if message.text is None:
|
||||
await message.answer("Нужен текст сообщения.")
|
||||
return
|
||||
|
||||
data = await state.get_data()
|
||||
actor_key = data["actor_key"]
|
||||
status_key = data["status_key"]
|
||||
actor = next(item for item in app_config["actors"] if item["key"] == actor_key)
|
||||
|
||||
phrase = message.text.strip()
|
||||
if phrase == ".":
|
||||
phrase = actor.get("phrases", {}).get(status_key, "")
|
||||
|
||||
payload = state_storage.load()
|
||||
payload.setdefault("actors", {})
|
||||
payload["actors"][actor_key] = {
|
||||
"status": status_key,
|
||||
"phrase": phrase,
|
||||
"updated_by": message.from_user.id,
|
||||
}
|
||||
state_storage.save(payload)
|
||||
|
||||
try:
|
||||
await update_channel_post(bot, app_config, state_storage, settings)
|
||||
except TelegramBadRequest as exc:
|
||||
logging.exception("Failed to edit channel message")
|
||||
await message.answer(f"Статус сохранен, но сообщение канала не обновилось: {exc}")
|
||||
await state.clear()
|
||||
return
|
||||
|
||||
await state.clear()
|
||||
await message.answer(f"Обновлено: {actor['display_name']} -> {STATUS_CHOICES[status_key].lower()}.")
|
||||
|
||||
|
||||
def build_dispatcher(app_config: dict, settings, state_storage: JsonStateStorage) -> Dispatcher:
|
||||
dispatcher = Dispatcher(storage=MemoryStorage())
|
||||
dispatcher["app_config"] = app_config
|
||||
dispatcher["actor_lookup"] = build_actor_lookup(app_config)
|
||||
dispatcher["settings"] = settings
|
||||
dispatcher["state_storage"] = state_storage
|
||||
dispatcher.include_router(router)
|
||||
return dispatcher
|
||||
|
||||
|
||||
async def main() -> None:
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
settings = load_settings()
|
||||
app_config = load_actor_config(settings.config_path)
|
||||
state_storage = JsonStateStorage(settings.state_path)
|
||||
|
||||
bot = Bot(token=settings.bot_token)
|
||||
dispatcher = build_dispatcher(app_config, settings, state_storage)
|
||||
await dispatcher.start_polling(bot)
|
||||
|
||||
|
||||
def run() -> None:
|
||||
asyncio.run(main())
|
||||
52
session_bot/config.py
Normal file
52
session_bot/config.py
Normal file
@@ -0,0 +1,52 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
from dotenv import load_dotenv
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class BotSettings:
|
||||
bot_token: str
|
||||
channel_id: int
|
||||
channel_message_id: int
|
||||
config_path: Path
|
||||
state_path: Path
|
||||
admin_ids: set[int]
|
||||
|
||||
|
||||
def _parse_admin_ids(value: str) -> set[int]:
|
||||
ids = set()
|
||||
for chunk in value.split(","):
|
||||
chunk = chunk.strip()
|
||||
if chunk:
|
||||
ids.add(int(chunk))
|
||||
return ids
|
||||
|
||||
|
||||
def load_settings() -> BotSettings:
|
||||
load_dotenv()
|
||||
|
||||
bot_token = os.environ["BOT_TOKEN"]
|
||||
channel_id = int(os.environ["CHANNEL_ID"])
|
||||
channel_message_id = int(os.environ["CHANNEL_MESSAGE_ID"])
|
||||
config_path = Path(os.environ.get("CONFIG_PATH", "config/actors.json"))
|
||||
state_path = Path(os.environ.get("STATE_PATH", "data/state.json"))
|
||||
admin_ids = _parse_admin_ids(os.environ.get("ADMIN_IDS", ""))
|
||||
|
||||
return BotSettings(
|
||||
bot_token=bot_token,
|
||||
channel_id=channel_id,
|
||||
channel_message_id=channel_message_id,
|
||||
config_path=config_path,
|
||||
state_path=state_path,
|
||||
admin_ids=admin_ids,
|
||||
)
|
||||
|
||||
|
||||
def load_actor_config(path: Path) -> dict:
|
||||
with path.open("r", encoding="utf-8") as file:
|
||||
return json.load(file)
|
||||
58
session_bot/render.py
Normal file
58
session_bot/render.py
Normal file
@@ -0,0 +1,58 @@
|
||||
from __future__ import annotations
|
||||
|
||||
|
||||
DEFAULT_STATUS_LABELS = {
|
||||
"open": "исполняет роль",
|
||||
"backstage": "в закулисье",
|
||||
"delay": "задержки",
|
||||
"rest": "антракт",
|
||||
}
|
||||
|
||||
|
||||
def build_channel_text(config: dict, state: dict) -> str:
|
||||
parts: list[str] = []
|
||||
|
||||
header = config.get("header", "").strip()
|
||||
if header:
|
||||
parts.append(header)
|
||||
|
||||
intro_links = config.get("intro_links", "").strip()
|
||||
if intro_links:
|
||||
parts.append(intro_links)
|
||||
|
||||
projects = config.get("projects_block", "").strip()
|
||||
if projects:
|
||||
parts.append(projects)
|
||||
|
||||
actors_title = config.get("actors_title", "").strip()
|
||||
if actors_title:
|
||||
parts.append(actors_title)
|
||||
|
||||
actor_lines: list[str] = []
|
||||
actor_state = state.get("actors", {})
|
||||
status_labels = {**DEFAULT_STATUS_LABELS, **config.get("status_labels", {})}
|
||||
for actor in config["actors"]:
|
||||
current = actor_state.get(actor["key"], {})
|
||||
status = current.get("status", actor.get("default_status", "backstage"))
|
||||
phrase = current.get("phrase", actor.get("phrases", {}).get(status, ""))
|
||||
label = status_labels.get(status, status)
|
||||
|
||||
line = (
|
||||
f'{actor["emoji"]} {actor["display_name"]} ({actor["link"]}) '
|
||||
f'{actor["pronouns"]} {label}.'
|
||||
)
|
||||
if phrase:
|
||||
line = f"{line}\n {phrase}"
|
||||
actor_lines.append(line)
|
||||
|
||||
parts.append("\n".join(actor_lines))
|
||||
|
||||
legend = config.get("legend", "").strip()
|
||||
if legend:
|
||||
parts.append(legend)
|
||||
|
||||
footer = config.get("footer", "").strip()
|
||||
if footer:
|
||||
parts.append(footer)
|
||||
|
||||
return "\n\n".join(part for part in parts if part)
|
||||
20
session_bot/storage.py
Normal file
20
session_bot/storage.py
Normal file
@@ -0,0 +1,20 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
class JsonStateStorage:
|
||||
def __init__(self, path: Path) -> None:
|
||||
self.path = path
|
||||
|
||||
def load(self) -> dict:
|
||||
if not self.path.exists():
|
||||
return {"actors": {}}
|
||||
with self.path.open("r", encoding="utf-8") as file:
|
||||
return json.load(file)
|
||||
|
||||
def save(self, payload: dict) -> None:
|
||||
self.path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with self.path.open("w", encoding="utf-8") as file:
|
||||
json.dump(payload, file, ensure_ascii=False, indent=2)
|
||||
Reference in New Issue
Block a user