1622 lines
53 KiB
Python
1622 lines
53 KiB
Python
import logging
|
||
from collections import OrderedDict
|
||
from collections.abc import Awaitable, Callable
|
||
from dataclasses import dataclass
|
||
from html import escape
|
||
from uuid import uuid4
|
||
|
||
from aiogram import F, Router
|
||
from aiogram.exceptions import TelegramBadRequest, TelegramRetryAfter
|
||
from aiogram.filters import Command
|
||
from aiogram.types import CallbackQuery, Message
|
||
|
||
from glitchup_bot.bot.keyboards import (
|
||
admin_admins_keyboard,
|
||
admin_home_keyboard,
|
||
admin_recipient_group_keyboard,
|
||
admin_recipients_keyboard,
|
||
admin_result_keyboard,
|
||
admin_routing_keyboard,
|
||
admin_settings_keyboard,
|
||
admin_sync_keyboard,
|
||
help_home_keyboard,
|
||
help_result_keyboard,
|
||
)
|
||
from glitchup_bot.services.admins import (
|
||
add_admin,
|
||
is_admin,
|
||
list_effective_admins,
|
||
remove_admin,
|
||
)
|
||
from glitchup_bot.services.digest_builder import (
|
||
build_digest,
|
||
build_project_summary,
|
||
build_stale_issues,
|
||
build_sync_status,
|
||
build_today_summary,
|
||
build_top_issues,
|
||
run_manual_sync,
|
||
)
|
||
from glitchup_bot.services.mute_rules import add_rule, list_rules, remove_rule
|
||
from glitchup_bot.services.routing import (
|
||
add_subscriber,
|
||
clear_project_group,
|
||
clear_topic_override,
|
||
list_project_overrides,
|
||
list_subscriber_overrides,
|
||
list_topic_overrides,
|
||
remove_subscriber,
|
||
resolve_subscribers,
|
||
resolve_topic_id,
|
||
set_project_group,
|
||
set_topic_override,
|
||
)
|
||
from glitchup_bot.services.runtime_settings import get_runtime_settings, set_runtime_setting
|
||
from glitchup_bot.services.sync_service import get_last_sync_state
|
||
|
||
router = Router()
|
||
logger = logging.getLogger(__name__)
|
||
MAX_PAGE_CHARS = 3000
|
||
MAX_PAGE_LINES = 18
|
||
MAX_PAGINATION_SESSIONS = 200
|
||
PENDING_ADMIN_ACTIONS: dict[int, str] = {}
|
||
PENDING_RECIPIENT_ACTIONS: dict[int, tuple[str, str]] = {}
|
||
PENDING_SETTING_ACTIONS: dict[int, tuple[str, str]] = {}
|
||
|
||
|
||
@dataclass(slots=True)
|
||
class PaginationSession:
|
||
pages: list[str]
|
||
back_callback: str
|
||
is_admin: bool
|
||
admin_mode: bool
|
||
|
||
|
||
PAGINATION_SESSIONS: OrderedDict[str, PaginationSession] = OrderedDict()
|
||
|
||
|
||
def _sender_id(message: Message) -> int | None:
|
||
return message.from_user.id if message.from_user else None
|
||
|
||
|
||
def _callback_sender_id(callback: CallbackQuery) -> int | None:
|
||
return callback.from_user.id if callback.from_user else None
|
||
|
||
|
||
async def _is_admin_user(user_id: int | None) -> bool:
|
||
return await is_admin(user_id)
|
||
|
||
|
||
def _paginate_text(text: str) -> list[str]:
|
||
if len(text) <= MAX_PAGE_CHARS and text.count("\n") + 1 <= MAX_PAGE_LINES:
|
||
return [text]
|
||
|
||
pages: list[str] = []
|
||
current_lines: list[str] = []
|
||
current_len = 0
|
||
|
||
for line in text.splitlines():
|
||
projected_len = current_len + len(line) + (1 if current_lines else 0)
|
||
if current_lines and (
|
||
projected_len > MAX_PAGE_CHARS or len(current_lines) >= MAX_PAGE_LINES
|
||
):
|
||
pages.append("\n".join(current_lines))
|
||
current_lines = [line]
|
||
current_len = len(line)
|
||
continue
|
||
|
||
current_lines.append(line)
|
||
current_len = projected_len
|
||
|
||
if current_lines:
|
||
pages.append("\n".join(current_lines))
|
||
|
||
return pages or [text]
|
||
|
||
|
||
def _store_pagination_session(session: PaginationSession) -> str:
|
||
token = uuid4().hex[:12]
|
||
PAGINATION_SESSIONS[token] = session
|
||
PAGINATION_SESSIONS.move_to_end(token)
|
||
|
||
while len(PAGINATION_SESSIONS) > MAX_PAGINATION_SESSIONS:
|
||
PAGINATION_SESSIONS.popitem(last=False)
|
||
|
||
return token
|
||
|
||
|
||
def _result_keyboard(
|
||
*,
|
||
back_callback: str,
|
||
is_admin: bool,
|
||
admin_mode: bool,
|
||
page_token: str | None = None,
|
||
page_index: int = 0,
|
||
total_pages: int = 1,
|
||
):
|
||
if admin_mode:
|
||
return admin_result_keyboard(
|
||
back_callback,
|
||
page_token=page_token,
|
||
page_index=page_index,
|
||
total_pages=total_pages,
|
||
)
|
||
|
||
return help_result_keyboard(
|
||
back_callback,
|
||
is_admin,
|
||
page_token=page_token,
|
||
page_index=page_index,
|
||
total_pages=total_pages,
|
||
)
|
||
|
||
|
||
def _help_text(is_admin: bool) -> str:
|
||
return "\n".join(
|
||
[
|
||
"<b>GlitchUp Bot</b>",
|
||
"",
|
||
"Выберите нужную сводку кнопками ниже.",
|
||
"Пользователю доступны только базовые экраны.",
|
||
"",
|
||
"• Сегодня",
|
||
"• Неделя",
|
||
"• Самые шумные",
|
||
"• Давно висят",
|
||
*(["", "• Для управления откройте админ-панель"] if is_admin else []),
|
||
]
|
||
)
|
||
|
||
|
||
async def _start_text(is_admin: bool) -> str:
|
||
runtime = await get_runtime_settings()
|
||
state = await get_last_sync_state("api_sync")
|
||
sync_value = (
|
||
state.last_successful_at.astimezone().strftime("%Y-%m-%d %H:%M")
|
||
if state and state.last_successful_at
|
||
else "ещё не было"
|
||
)
|
||
lines = [
|
||
f"<b>{escape(runtime.bot_title)}</b>",
|
||
f"Синхронизация: {escape(sync_value)}",
|
||
"",
|
||
escape(runtime.bot_purpose),
|
||
]
|
||
if is_admin:
|
||
lines.extend(["", escape(runtime.bot_admin_hint)])
|
||
return "\n".join(lines)
|
||
|
||
|
||
def _admin_routing_text() -> str:
|
||
return "\n".join(
|
||
[
|
||
"<b>Topic и routing</b>",
|
||
"",
|
||
"Здесь можно смотреть текущую схему и менять topic override без env.",
|
||
]
|
||
)
|
||
|
||
|
||
def _admin_text() -> str:
|
||
return "\n".join(
|
||
[
|
||
"<b>Админ-панель GlitchUp Bot</b>",
|
||
"",
|
||
"Здесь только практичные разделы:",
|
||
"• синхронизация",
|
||
"• получатели по backend/frontend",
|
||
"• topic и routing",
|
||
"• runtime-настройки бота",
|
||
"• администраторы",
|
||
"• mute rules",
|
||
]
|
||
)
|
||
|
||
|
||
def _admin_sync_text() -> str:
|
||
return "\n".join(
|
||
[
|
||
"<b>Синхронизация</b>",
|
||
"",
|
||
"Отсюда удобно запускать ручной sync и проверять, "
|
||
"когда данные обновлялись в последний раз.",
|
||
]
|
||
)
|
||
|
||
|
||
def _admin_recipients_text() -> str:
|
||
return "\n".join(
|
||
[
|
||
"<b>Получатели уведомлений</b>",
|
||
"",
|
||
"Выберите группу и назначайте Telegram ID через кнопки.",
|
||
"Самоподписка пользователей отключена.",
|
||
]
|
||
)
|
||
|
||
|
||
async def _runtime_settings_text() -> str:
|
||
runtime = await get_runtime_settings()
|
||
lines = [
|
||
"<b>Настройки бота</b>",
|
||
"",
|
||
f"• автосинк: {'включён' if runtime.sync_enabled else 'выключен'}",
|
||
f"• интервал sync: {runtime.sync_interval_minutes} мин",
|
||
(
|
||
"• отчёт: "
|
||
f"{runtime.digest_cron_day} "
|
||
f"{runtime.digest_cron_hour:02d}:{runtime.digest_cron_minute:02d} "
|
||
f"{runtime.digest_timezone}"
|
||
),
|
||
f"• название: {escape(runtime.bot_title)}",
|
||
f"• описание: {escape(runtime.bot_purpose)}",
|
||
f"• подсказка админу: {escape(runtime.bot_admin_hint)}",
|
||
]
|
||
return "\n".join(lines)
|
||
|
||
|
||
def _recipient_group_text(group_name: str) -> str:
|
||
return "\n".join(
|
||
[
|
||
f"<b>{escape(group_name.capitalize())} получатели</b>",
|
||
"",
|
||
"Можно посмотреть список, добавить новый Telegram ID или удалить существующий.",
|
||
]
|
||
)
|
||
|
||
|
||
def _admin_guide_text() -> str:
|
||
return "\n".join(
|
||
[
|
||
"<b>Подсказка по админке</b>",
|
||
"",
|
||
"Почти всё вынесено в кнопки и короткие сценарии.",
|
||
"",
|
||
"• <code>/admin_add 123456</code> — добавить администратора",
|
||
"• <code>/admin_add</code> ответом на сообщение — добавить автора сообщения",
|
||
"• <code>/admin_del 123456</code> — удалить runtime-администратора",
|
||
"",
|
||
"• <code>/owner slug backend</code> — переназначить проект в группу",
|
||
"• <code>/topic backend 123</code> — сменить topic override",
|
||
"• <code>/mute_add payment.*timeout</code> — добавить mute rule",
|
||
]
|
||
)
|
||
|
||
|
||
def _admin_hint_text(section: str, target: str | None = None) -> str:
|
||
hints = {
|
||
"sync": "\n".join(
|
||
[
|
||
"<b>Что такое синхронизация</b>",
|
||
"",
|
||
"Синхронизация подтягивает актуальные issues из GlitchTip в локальный кэш бота.",
|
||
"Ручной sync нужен, когда вы хотите обновить данные "
|
||
"прямо сейчас, не дожидаясь расписания.",
|
||
]
|
||
),
|
||
"recipients": "\n".join(
|
||
[
|
||
"<b>Что такое получатели</b>",
|
||
"",
|
||
"Получатели — это Telegram ID пользователей, которым бот отправляет уведомления.",
|
||
"Backend и frontend настраиваются отдельно.",
|
||
]
|
||
),
|
||
"routing": "\n".join(
|
||
[
|
||
"<b>Что такое Topic и routing</b>",
|
||
"",
|
||
"Routing определяет, в какую группу попадает проект.",
|
||
"Topic override определяет, в какую тему Telegram будут "
|
||
"приходить сообщения для backend, frontend и digest.",
|
||
]
|
||
),
|
||
"settings": "\n".join(
|
||
[
|
||
"<b>Что такое настройки бота</b>",
|
||
"",
|
||
"Здесь собраны runtime-параметры, которые можно менять "
|
||
"без правки `.env` и без пересборки контейнера.",
|
||
"Сюда входят автосинк, расписание digest и тексты интерфейса.",
|
||
]
|
||
),
|
||
"mute": "\n".join(
|
||
[
|
||
"<b>Что такое mute rules</b>",
|
||
"",
|
||
"Mute rule — это шаблон, по которому бот скрывает шумные или неважные события.",
|
||
"Если событие совпало с правилом, оно не будет мешать в уведомлениях.",
|
||
]
|
||
),
|
||
"admins": "\n".join(
|
||
[
|
||
"<b>Что такое администраторы</b>",
|
||
"",
|
||
"Администраторы могут менять настройки бота, получателей и расписание.",
|
||
"Часть админов может приходить из `.env`, а часть хранится в runtime-базе.",
|
||
]
|
||
),
|
||
"recipient_group": "\n".join(
|
||
[
|
||
f"<b>{escape((target or '').capitalize())} получатели</b>",
|
||
"",
|
||
"Добавляйте сюда Telegram ID тех, кто должен получать "
|
||
"уведомления по выбранной группе.",
|
||
"Удаление отсюда прекращает доставку уведомлений этому пользователю.",
|
||
]
|
||
),
|
||
}
|
||
return hints[section]
|
||
|
||
|
||
def _sync_summary_text(summary) -> str:
|
||
return "\n".join(
|
||
[
|
||
"<b>Sync завершён</b>",
|
||
"",
|
||
f"• проектов: {summary.project_count}",
|
||
f"• issues: {summary.issue_count}",
|
||
f"• помечено resolved: {summary.resolved_count}",
|
||
f"• время: {escape(summary.synced_at.isoformat())}",
|
||
]
|
||
)
|
||
|
||
|
||
async def _admins_text() -> str:
|
||
admins = await list_effective_admins()
|
||
if not admins:
|
||
return "Список администраторов пуст."
|
||
|
||
lines = [
|
||
"<b>Администраторы</b>",
|
||
"",
|
||
"Добавлять можно командой <code>/admin_add <user_id></code> "
|
||
"или ответом на сообщение пользователя.",
|
||
"",
|
||
]
|
||
for user_id, source in admins:
|
||
source_label = source.replace("env", "из .env")
|
||
lines.append(f"• <code>{user_id}</code> — {source_label}")
|
||
|
||
return "\n".join(lines)
|
||
|
||
|
||
def _extract_target_user_id(message: Message) -> int | None:
|
||
args = message.text.split(maxsplit=1) if message.text else []
|
||
if len(args) >= 2:
|
||
raw_value = args[1].strip()
|
||
return int(raw_value) if raw_value.lstrip("-").isdigit() else None
|
||
|
||
reply = message.reply_to_message
|
||
if reply and reply.from_user:
|
||
return reply.from_user.id
|
||
|
||
return None
|
||
|
||
|
||
async def _recipients_text(group_name: str) -> str:
|
||
subscribers = await resolve_subscribers(group_name)
|
||
title = "Backend" if group_name == "backend" else "Frontend"
|
||
if not subscribers:
|
||
return f"<b>{title} получатели</b>\n\nСписок пуст."
|
||
|
||
lines = [f"<b>{title} получатели</b>", ""]
|
||
for user_id in subscribers:
|
||
lines.append(f"• <code>{user_id}</code>")
|
||
return "\n".join(lines)
|
||
|
||
|
||
def _setting_prompt_text(setting_key: str, target: str) -> str:
|
||
prompts = {
|
||
"topic": f"Отправьте новым сообщением числовой topic ID для <b>{escape(target)}</b>.",
|
||
"sync_interval_minutes": "Отправьте интервал синхронизации в минутах.",
|
||
"digest_cron_day": "Отправьте день отчёта, например <code>mon</code>.",
|
||
"digest_time": "Отправьте время отчёта в формате <code>HH:MM</code>.",
|
||
"digest_timezone": "Отправьте timezone, например <code>Asia/Krasnoyarsk</code>.",
|
||
"bot_title": "Отправьте новое название бота.",
|
||
"bot_purpose": "Отправьте короткое описание, для чего нужен бот.",
|
||
"bot_admin_hint": "Отправьте текст подсказки для администратора на /start.",
|
||
}
|
||
return prompts[setting_key]
|
||
|
||
|
||
async def _require_admin(message: Message) -> bool:
|
||
if await _is_admin_user(_sender_id(message)):
|
||
return True
|
||
|
||
await message.answer("Команда доступна только администраторам.")
|
||
return False
|
||
|
||
|
||
async def _require_admin_callback(callback: CallbackQuery) -> bool:
|
||
if await _is_admin_user(_callback_sender_id(callback)):
|
||
return True
|
||
|
||
await callback.answer("Только для администраторов", show_alert=True)
|
||
return False
|
||
|
||
|
||
async def _answer_text(
|
||
target: Message | CallbackQuery,
|
||
text: str,
|
||
*,
|
||
reply_markup=None,
|
||
disable_web_page_preview: bool = True,
|
||
) -> None:
|
||
if isinstance(target, CallbackQuery):
|
||
await target.message.answer(
|
||
text,
|
||
reply_markup=reply_markup,
|
||
disable_web_page_preview=disable_web_page_preview,
|
||
)
|
||
else:
|
||
await target.answer(
|
||
text,
|
||
reply_markup=reply_markup,
|
||
disable_web_page_preview=disable_web_page_preview,
|
||
)
|
||
|
||
|
||
async def _show_callback_screen(
|
||
callback: CallbackQuery,
|
||
text: str,
|
||
*,
|
||
reply_markup=None,
|
||
disable_web_page_preview: bool = True,
|
||
) -> None:
|
||
if callback.message is None:
|
||
return
|
||
|
||
try:
|
||
await callback.message.edit_text(
|
||
text,
|
||
reply_markup=reply_markup,
|
||
disable_web_page_preview=disable_web_page_preview,
|
||
)
|
||
except TelegramRetryAfter as exc:
|
||
logger.warning(
|
||
"Telegram edit flood control for chat %s; "
|
||
"falling back to sending a new message after %s seconds",
|
||
callback.message.chat.id,
|
||
exc.retry_after,
|
||
)
|
||
await callback.message.answer(
|
||
text,
|
||
reply_markup=reply_markup,
|
||
disable_web_page_preview=disable_web_page_preview,
|
||
)
|
||
except TelegramBadRequest as exc:
|
||
if "message is not modified" in str(exc).lower():
|
||
return
|
||
await callback.message.answer(
|
||
text,
|
||
reply_markup=reply_markup,
|
||
disable_web_page_preview=disable_web_page_preview,
|
||
)
|
||
|
||
|
||
async def _deliver_text(
|
||
target: Message | CallbackQuery,
|
||
text: str,
|
||
*,
|
||
reply_markup=None,
|
||
disable_web_page_preview: bool = True,
|
||
) -> None:
|
||
if isinstance(target, CallbackQuery):
|
||
await _show_callback_screen(
|
||
target,
|
||
text,
|
||
reply_markup=reply_markup,
|
||
disable_web_page_preview=disable_web_page_preview,
|
||
)
|
||
return
|
||
|
||
await _answer_text(
|
||
target,
|
||
text,
|
||
reply_markup=reply_markup,
|
||
disable_web_page_preview=disable_web_page_preview,
|
||
)
|
||
|
||
|
||
async def _deliver_result(
|
||
target: Message | CallbackQuery,
|
||
text: str,
|
||
*,
|
||
back_callback: str,
|
||
is_admin: bool,
|
||
admin_mode: bool = False,
|
||
) -> None:
|
||
pages = _paginate_text(text)
|
||
if len(pages) == 1:
|
||
await _deliver_text(
|
||
target,
|
||
pages[0],
|
||
reply_markup=_result_keyboard(
|
||
back_callback=back_callback,
|
||
is_admin=is_admin,
|
||
admin_mode=admin_mode,
|
||
),
|
||
)
|
||
return
|
||
|
||
token = _store_pagination_session(
|
||
PaginationSession(
|
||
pages=pages,
|
||
back_callback=back_callback,
|
||
is_admin=is_admin,
|
||
admin_mode=admin_mode,
|
||
)
|
||
)
|
||
await _deliver_text(
|
||
target,
|
||
pages[0],
|
||
reply_markup=_result_keyboard(
|
||
back_callback=back_callback,
|
||
is_admin=is_admin,
|
||
admin_mode=admin_mode,
|
||
page_token=token,
|
||
page_index=0,
|
||
total_pages=len(pages),
|
||
),
|
||
)
|
||
|
||
|
||
async def _handle_subscription_action(
|
||
target: Message | CallbackQuery,
|
||
group_name: str,
|
||
action: str,
|
||
user_id: int | None,
|
||
*,
|
||
reply_markup=None,
|
||
) -> None:
|
||
if user_id is None:
|
||
await _deliver_text(
|
||
target,
|
||
"Не удалось определить пользователя.",
|
||
reply_markup=reply_markup,
|
||
)
|
||
return
|
||
|
||
if action == "subscribe":
|
||
await add_subscriber(group_name, user_id)
|
||
await _deliver_text(
|
||
target,
|
||
f"Подписка на <b>{escape(group_name)}</b> включена.",
|
||
reply_markup=reply_markup,
|
||
)
|
||
return
|
||
|
||
removed = await remove_subscriber(group_name, user_id)
|
||
if not removed:
|
||
await _deliver_text(target, "Runtime-подписка не найдена.", reply_markup=reply_markup)
|
||
return
|
||
|
||
await _deliver_text(
|
||
target,
|
||
f"Подписка на <b>{escape(group_name)}</b> отключена.",
|
||
reply_markup=reply_markup,
|
||
)
|
||
|
||
|
||
async def _run_summary_action(
|
||
target: Message | CallbackQuery,
|
||
loader: Callable[[], Awaitable[str]],
|
||
*,
|
||
back_callback: str,
|
||
is_admin: bool,
|
||
admin_mode: bool = False,
|
||
) -> None:
|
||
await _deliver_result(
|
||
target,
|
||
await loader(),
|
||
back_callback=back_callback,
|
||
is_admin=is_admin,
|
||
admin_mode=admin_mode,
|
||
)
|
||
|
||
|
||
async def _ownership_text() -> str:
|
||
project_overrides = await list_project_overrides()
|
||
topic_overrides = await list_topic_overrides()
|
||
subscriber_overrides = await list_subscriber_overrides()
|
||
backend_subscribers = await resolve_subscribers("backend")
|
||
frontend_subscribers = await resolve_subscribers("frontend")
|
||
|
||
lines = [
|
||
"<b>Ownership runtime state</b>",
|
||
"",
|
||
"<b>Topics:</b>",
|
||
f"• backend: {await resolve_topic_id('backend')}",
|
||
f"• frontend: {await resolve_topic_id('frontend')}",
|
||
f"• digest: {await resolve_topic_id('digest')}",
|
||
"",
|
||
"<b>Subscribers:</b>",
|
||
f"• backend: {', '.join(map(str, backend_subscribers)) or 'none'}",
|
||
f"• frontend: {', '.join(map(str, frontend_subscribers)) or 'none'}",
|
||
"",
|
||
"<b>Project overrides:</b>",
|
||
]
|
||
|
||
if project_overrides:
|
||
lines.extend(
|
||
f"• {escape(record.project_slug)} → {escape(record.group_name)}"
|
||
for record in project_overrides
|
||
)
|
||
else:
|
||
lines.append("• none")
|
||
|
||
lines.extend(["", "<b>Topic overrides:</b>"])
|
||
if topic_overrides:
|
||
lines.extend(
|
||
f"• {escape(record.group_name)} → {record.topic_id}" for record in topic_overrides
|
||
)
|
||
else:
|
||
lines.append("• none")
|
||
|
||
lines.extend(["", "<b>Subscriber overrides:</b>"])
|
||
if subscriber_overrides:
|
||
lines.extend(
|
||
f"• {escape(record.group_name)} → {record.user_id}" for record in subscriber_overrides
|
||
)
|
||
else:
|
||
lines.append("• none")
|
||
|
||
return "\n".join(lines)
|
||
|
||
|
||
async def _mute_rules_text() -> str:
|
||
rules = await list_rules()
|
||
if not rules:
|
||
return "Mute rules не настроены."
|
||
|
||
lines = ["<b>Mute rules</b>", ""]
|
||
for rule in rules:
|
||
suffix = f" — {escape(rule.description)}" if rule.description else ""
|
||
lines.append(f"• #{rule.id} <code>{escape(rule.pattern)}</code>{suffix}")
|
||
|
||
return "\n".join(lines)
|
||
|
||
|
||
@router.message(Command("start"))
|
||
async def cmd_start(message: Message) -> None:
|
||
is_admin = await _is_admin_user(_sender_id(message))
|
||
await message.answer(
|
||
await _start_text(is_admin),
|
||
reply_markup=help_home_keyboard(is_admin),
|
||
disable_web_page_preview=True,
|
||
)
|
||
|
||
|
||
@router.message(Command("help"))
|
||
async def cmd_help(message: Message) -> None:
|
||
is_admin = await _is_admin_user(_sender_id(message))
|
||
await message.answer(
|
||
_help_text(is_admin),
|
||
reply_markup=help_home_keyboard(is_admin),
|
||
disable_web_page_preview=True,
|
||
)
|
||
|
||
|
||
@router.message(Command("admin"))
|
||
async def cmd_admin(message: Message) -> None:
|
||
if not await _require_admin(message):
|
||
return
|
||
|
||
await message.answer(
|
||
_admin_text(),
|
||
reply_markup=admin_home_keyboard(),
|
||
disable_web_page_preview=True,
|
||
)
|
||
|
||
|
||
@router.callback_query(F.data.startswith("help:"))
|
||
async def cb_help_actions(callback: CallbackQuery) -> None:
|
||
data = callback.data or ""
|
||
action = data.removeprefix("help:")
|
||
is_admin = await _is_admin_user(_callback_sender_id(callback))
|
||
await callback.answer()
|
||
|
||
if action == "open":
|
||
await _show_callback_screen(
|
||
callback,
|
||
_help_text(is_admin),
|
||
reply_markup=help_home_keyboard(is_admin),
|
||
)
|
||
return
|
||
if action == "week":
|
||
await _run_summary_action(
|
||
callback,
|
||
lambda: build_digest(refresh=False),
|
||
back_callback="help:open",
|
||
is_admin=is_admin,
|
||
)
|
||
return
|
||
if action == "today":
|
||
await _run_summary_action(
|
||
callback,
|
||
lambda: build_today_summary(refresh=False),
|
||
back_callback="help:open",
|
||
is_admin=is_admin,
|
||
)
|
||
return
|
||
if action == "top":
|
||
await _run_summary_action(
|
||
callback,
|
||
lambda: build_top_issues(refresh=False),
|
||
back_callback="help:open",
|
||
is_admin=is_admin,
|
||
)
|
||
return
|
||
if action == "stale":
|
||
await _run_summary_action(
|
||
callback,
|
||
lambda: build_stale_issues(refresh=False),
|
||
back_callback="help:open",
|
||
is_admin=is_admin,
|
||
)
|
||
return
|
||
|
||
|
||
@router.callback_query(F.data == "admin:open")
|
||
async def cb_admin_open(callback: CallbackQuery) -> None:
|
||
if not await _require_admin_callback(callback):
|
||
return
|
||
|
||
await callback.answer()
|
||
await _show_callback_screen(
|
||
callback,
|
||
_admin_text(),
|
||
reply_markup=admin_home_keyboard(),
|
||
)
|
||
|
||
|
||
@router.callback_query(F.data.startswith("admin:"))
|
||
async def cb_admin_actions(callback: CallbackQuery) -> None:
|
||
if not await _require_admin_callback(callback):
|
||
return
|
||
|
||
action = (callback.data or "").removeprefix("admin:")
|
||
await callback.answer()
|
||
|
||
if action == "menu:sync":
|
||
await _show_callback_screen(
|
||
callback,
|
||
_admin_sync_text(),
|
||
reply_markup=admin_sync_keyboard(),
|
||
)
|
||
return
|
||
if action == "menu:recipients":
|
||
await _show_callback_screen(
|
||
callback,
|
||
_admin_recipients_text(),
|
||
reply_markup=admin_recipients_keyboard(),
|
||
)
|
||
return
|
||
if action == "menu:routing":
|
||
await _show_callback_screen(
|
||
callback,
|
||
_admin_routing_text(),
|
||
reply_markup=admin_routing_keyboard(),
|
||
)
|
||
return
|
||
if action == "menu:admins":
|
||
await _show_callback_screen(
|
||
callback,
|
||
await _admins_text(),
|
||
reply_markup=admin_admins_keyboard(),
|
||
)
|
||
return
|
||
if action == "menu:settings":
|
||
await _show_callback_screen(
|
||
callback,
|
||
await _runtime_settings_text(),
|
||
reply_markup=admin_settings_keyboard(),
|
||
)
|
||
return
|
||
if action in {"recipients:backend", "recipients:frontend"}:
|
||
group_name = action.split(":", 1)[1]
|
||
await _show_callback_screen(
|
||
callback,
|
||
_recipient_group_text(group_name),
|
||
reply_markup=admin_recipient_group_keyboard(group_name),
|
||
)
|
||
return
|
||
if action.startswith("recipients:list:"):
|
||
group_name = action.rsplit(":", 1)[1]
|
||
await _deliver_result(
|
||
callback,
|
||
await _recipients_text(group_name),
|
||
back_callback=f"admin:recipients:{group_name}",
|
||
is_admin=True,
|
||
admin_mode=True,
|
||
)
|
||
return
|
||
if action.startswith("recipients:add:"):
|
||
group_name = action.rsplit(":", 1)[1]
|
||
admin_id = _callback_sender_id(callback)
|
||
if admin_id is not None:
|
||
PENDING_RECIPIENT_ACTIONS[admin_id] = ("add", group_name)
|
||
await _show_callback_screen(
|
||
callback,
|
||
"\n".join(
|
||
[
|
||
f"<b>{escape(group_name.capitalize())}</b>",
|
||
"",
|
||
"Отправьте следующим сообщением Telegram ID, который нужно добавить.",
|
||
]
|
||
),
|
||
reply_markup=admin_result_keyboard(f"admin:recipients:{group_name}"),
|
||
)
|
||
return
|
||
if action.startswith("recipients:del:"):
|
||
group_name = action.rsplit(":", 1)[1]
|
||
admin_id = _callback_sender_id(callback)
|
||
if admin_id is not None:
|
||
PENDING_RECIPIENT_ACTIONS[admin_id] = ("del", group_name)
|
||
await _show_callback_screen(
|
||
callback,
|
||
"\n".join(
|
||
[
|
||
f"<b>{escape(group_name.capitalize())}</b>",
|
||
"",
|
||
"Отправьте следующим сообщением Telegram ID, который нужно удалить.",
|
||
]
|
||
),
|
||
reply_markup=admin_result_keyboard(f"admin:recipients:{group_name}"),
|
||
)
|
||
return
|
||
if action == "admins":
|
||
await _deliver_result(
|
||
callback,
|
||
await _admins_text(),
|
||
back_callback="admin:menu:admins",
|
||
is_admin=True,
|
||
admin_mode=True,
|
||
)
|
||
return
|
||
if action == "admins:add":
|
||
admin_id = _callback_sender_id(callback)
|
||
if admin_id is not None:
|
||
PENDING_ADMIN_ACTIONS[admin_id] = "add"
|
||
await _show_callback_screen(
|
||
callback,
|
||
(
|
||
"<b>Добавление администратора</b>\n\n"
|
||
"Отправьте следующим сообщением Telegram ID, который нужно добавить."
|
||
),
|
||
reply_markup=admin_result_keyboard("admin:menu:admins"),
|
||
)
|
||
return
|
||
if action == "admins:del":
|
||
admin_id = _callback_sender_id(callback)
|
||
if admin_id is not None:
|
||
PENDING_ADMIN_ACTIONS[admin_id] = "del"
|
||
await _show_callback_screen(
|
||
callback,
|
||
(
|
||
"<b>Удаление администратора</b>\n\n"
|
||
"Отправьте следующим сообщением Telegram ID, который нужно удалить."
|
||
),
|
||
reply_markup=admin_result_keyboard("admin:menu:admins"),
|
||
)
|
||
return
|
||
if action == "settings:sync_enabled":
|
||
runtime = await get_runtime_settings()
|
||
await set_runtime_setting("sync_enabled", "false" if runtime.sync_enabled else "true")
|
||
from glitchup_bot.tasks.scheduler import reload_scheduler
|
||
|
||
await reload_scheduler()
|
||
await _show_callback_screen(
|
||
callback,
|
||
_admin_sync_text(),
|
||
reply_markup=admin_sync_keyboard(),
|
||
)
|
||
return
|
||
if action == "settings:sync_interval":
|
||
admin_id = _callback_sender_id(callback)
|
||
if admin_id is not None:
|
||
PENDING_SETTING_ACTIONS[admin_id] = ("sync_interval_minutes", "")
|
||
await _show_callback_screen(
|
||
callback,
|
||
_setting_prompt_text("sync_interval_minutes", ""),
|
||
reply_markup=admin_result_keyboard("admin:menu:sync"),
|
||
)
|
||
return
|
||
if action == "settings:digest_day":
|
||
admin_id = _callback_sender_id(callback)
|
||
if admin_id is not None:
|
||
PENDING_SETTING_ACTIONS[admin_id] = ("digest_cron_day", "")
|
||
await _show_callback_screen(
|
||
callback,
|
||
_setting_prompt_text("digest_cron_day", ""),
|
||
reply_markup=admin_result_keyboard("admin:menu:sync"),
|
||
)
|
||
return
|
||
if action == "settings:digest_time":
|
||
admin_id = _callback_sender_id(callback)
|
||
if admin_id is not None:
|
||
PENDING_SETTING_ACTIONS[admin_id] = ("digest_time", "")
|
||
await _show_callback_screen(
|
||
callback,
|
||
_setting_prompt_text("digest_time", ""),
|
||
reply_markup=admin_result_keyboard("admin:menu:sync"),
|
||
)
|
||
return
|
||
if action == "settings:digest_timezone":
|
||
admin_id = _callback_sender_id(callback)
|
||
if admin_id is not None:
|
||
PENDING_SETTING_ACTIONS[admin_id] = ("digest_timezone", "")
|
||
await _show_callback_screen(
|
||
callback,
|
||
_setting_prompt_text("digest_timezone", ""),
|
||
reply_markup=admin_result_keyboard("admin:menu:sync"),
|
||
)
|
||
return
|
||
if action == "settings:bot_title":
|
||
admin_id = _callback_sender_id(callback)
|
||
if admin_id is not None:
|
||
PENDING_SETTING_ACTIONS[admin_id] = ("bot_title", "")
|
||
await _show_callback_screen(
|
||
callback,
|
||
_setting_prompt_text("bot_title", ""),
|
||
reply_markup=admin_result_keyboard("admin:menu:settings"),
|
||
)
|
||
return
|
||
if action == "settings:bot_purpose":
|
||
admin_id = _callback_sender_id(callback)
|
||
if admin_id is not None:
|
||
PENDING_SETTING_ACTIONS[admin_id] = ("bot_purpose", "")
|
||
await _show_callback_screen(
|
||
callback,
|
||
_setting_prompt_text("bot_purpose", ""),
|
||
reply_markup=admin_result_keyboard("admin:menu:settings"),
|
||
)
|
||
return
|
||
if action == "settings:bot_admin_hint":
|
||
admin_id = _callback_sender_id(callback)
|
||
if admin_id is not None:
|
||
PENDING_SETTING_ACTIONS[admin_id] = ("bot_admin_hint", "")
|
||
await _show_callback_screen(
|
||
callback,
|
||
_setting_prompt_text("bot_admin_hint", ""),
|
||
reply_markup=admin_result_keyboard("admin:menu:settings"),
|
||
)
|
||
return
|
||
if action.startswith("settings:topic:"):
|
||
group_name = action.rsplit(":", 1)[1]
|
||
admin_id = _callback_sender_id(callback)
|
||
if admin_id is not None:
|
||
PENDING_SETTING_ACTIONS[admin_id] = ("topic", group_name)
|
||
await _show_callback_screen(
|
||
callback,
|
||
_setting_prompt_text("topic", group_name),
|
||
reply_markup=admin_result_keyboard("admin:menu:routing"),
|
||
)
|
||
return
|
||
if action == "sync":
|
||
summary = await run_manual_sync()
|
||
await _deliver_result(
|
||
callback,
|
||
_sync_summary_text(summary),
|
||
back_callback="admin:menu:sync",
|
||
is_admin=True,
|
||
admin_mode=True,
|
||
)
|
||
return
|
||
if action == "sync_status":
|
||
await _run_summary_action(
|
||
callback,
|
||
build_sync_status,
|
||
back_callback="admin:menu:sync",
|
||
is_admin=True,
|
||
admin_mode=True,
|
||
)
|
||
return
|
||
if action == "ownership":
|
||
await _deliver_result(
|
||
callback,
|
||
await _ownership_text(),
|
||
back_callback="admin:open",
|
||
is_admin=True,
|
||
admin_mode=True,
|
||
)
|
||
return
|
||
if action == "mute_list":
|
||
await _deliver_result(
|
||
callback,
|
||
await _mute_rules_text(),
|
||
back_callback="admin:open",
|
||
is_admin=True,
|
||
admin_mode=True,
|
||
)
|
||
return
|
||
if action == "guide":
|
||
await _deliver_result(
|
||
callback,
|
||
_admin_guide_text(),
|
||
back_callback="admin:open",
|
||
is_admin=True,
|
||
admin_mode=True,
|
||
)
|
||
return
|
||
|
||
|
||
@router.callback_query(F.data.startswith("page:"))
|
||
async def cb_paginated_results(callback: CallbackQuery) -> None:
|
||
payload = (callback.data or "").split(":")
|
||
if len(payload) != 3:
|
||
await callback.answer()
|
||
return
|
||
|
||
_, token, raw_index = payload
|
||
session = PAGINATION_SESSIONS.get(token)
|
||
if session is None:
|
||
await callback.answer("Эта навигация устарела. Открой раздел заново.", show_alert=True)
|
||
return
|
||
|
||
if session.admin_mode and not await _is_admin_user(_callback_sender_id(callback)):
|
||
await callback.answer("Только для администраторов", show_alert=True)
|
||
return
|
||
|
||
if not raw_index.isdigit():
|
||
await callback.answer()
|
||
return
|
||
|
||
page_index = int(raw_index)
|
||
if page_index < 0 or page_index >= len(session.pages):
|
||
await callback.answer()
|
||
return
|
||
|
||
await callback.answer()
|
||
PAGINATION_SESSIONS.move_to_end(token)
|
||
await _show_callback_screen(
|
||
callback,
|
||
session.pages[page_index],
|
||
reply_markup=_result_keyboard(
|
||
back_callback=session.back_callback,
|
||
is_admin=session.is_admin,
|
||
admin_mode=session.admin_mode,
|
||
page_token=token,
|
||
page_index=page_index,
|
||
total_pages=len(session.pages),
|
||
),
|
||
)
|
||
|
||
|
||
@router.message(Command("week"))
|
||
async def cmd_week(message: Message) -> None:
|
||
is_admin = await _is_admin_user(_sender_id(message))
|
||
await _deliver_result(
|
||
message,
|
||
await build_digest(refresh=False),
|
||
back_callback="help:open",
|
||
is_admin=is_admin,
|
||
)
|
||
|
||
|
||
@router.message(Command("today"))
|
||
async def cmd_today(message: Message) -> None:
|
||
is_admin = await _is_admin_user(_sender_id(message))
|
||
await _deliver_result(
|
||
message,
|
||
await build_today_summary(refresh=False),
|
||
back_callback="help:open",
|
||
is_admin=is_admin,
|
||
)
|
||
|
||
|
||
@router.message(Command("project"))
|
||
async def cmd_project(message: Message) -> None:
|
||
args = message.text.split(maxsplit=1) if message.text else []
|
||
if len(args) < 2:
|
||
await message.answer("Использование: /project <slug>")
|
||
return
|
||
|
||
is_admin = await _is_admin_user(_sender_id(message))
|
||
await _deliver_result(
|
||
message,
|
||
await build_project_summary(args[1].strip(), refresh=False),
|
||
back_callback="help:open",
|
||
is_admin=is_admin,
|
||
)
|
||
|
||
|
||
@router.message(Command("top"))
|
||
async def cmd_top(message: Message) -> None:
|
||
is_admin = await _is_admin_user(_sender_id(message))
|
||
await _deliver_result(
|
||
message,
|
||
await build_top_issues(refresh=False),
|
||
back_callback="help:open",
|
||
is_admin=is_admin,
|
||
)
|
||
|
||
|
||
@router.message(Command("stale"))
|
||
async def cmd_stale(message: Message) -> None:
|
||
is_admin = await _is_admin_user(_sender_id(message))
|
||
await _deliver_result(
|
||
message,
|
||
await build_stale_issues(refresh=False),
|
||
back_callback="help:open",
|
||
is_admin=is_admin,
|
||
)
|
||
|
||
|
||
@router.message(Command("releases"))
|
||
async def cmd_releases(message: Message) -> None:
|
||
await message.answer("Раздел релизов скрыт из упрощённого интерфейса.")
|
||
|
||
|
||
@router.message(Command("release"))
|
||
async def cmd_release(message: Message) -> None:
|
||
await message.answer("Раздел релизов скрыт из упрощённого интерфейса.")
|
||
|
||
|
||
@router.message(Command("sync_status"))
|
||
async def cmd_sync_status(message: Message) -> None:
|
||
is_admin = await _is_admin_user(_sender_id(message))
|
||
await _deliver_result(
|
||
message,
|
||
await build_sync_status(),
|
||
back_callback="help:open",
|
||
is_admin=is_admin,
|
||
)
|
||
|
||
|
||
@router.message(Command("sync"))
|
||
async def cmd_sync(message: Message) -> None:
|
||
if not await _require_admin(message):
|
||
return
|
||
|
||
summary = await run_manual_sync()
|
||
await _deliver_result(
|
||
message,
|
||
_sync_summary_text(summary),
|
||
back_callback="admin:menu:sync",
|
||
is_admin=True,
|
||
admin_mode=True,
|
||
)
|
||
|
||
|
||
@router.message(Command("admins"))
|
||
async def cmd_admins(message: Message) -> None:
|
||
if not await _require_admin(message):
|
||
return
|
||
|
||
await _deliver_result(
|
||
message,
|
||
await _admins_text(),
|
||
back_callback="admin:menu:admins",
|
||
is_admin=True,
|
||
admin_mode=True,
|
||
)
|
||
|
||
|
||
@router.message(Command("admin_add"))
|
||
async def cmd_admin_add(message: Message) -> None:
|
||
if not await _require_admin(message):
|
||
return
|
||
|
||
user_id = _extract_target_user_id(message)
|
||
if user_id is None:
|
||
await message.answer(
|
||
"Использование: /admin_add <user_id> или ответом на сообщение пользователя."
|
||
)
|
||
return
|
||
|
||
added = await add_admin(user_id)
|
||
text = (
|
||
f"Администратор <code>{user_id}</code> добавлен."
|
||
if added
|
||
else f"Пользователь <code>{user_id}</code> уже есть среди runtime-администраторов."
|
||
)
|
||
await _deliver_result(
|
||
message,
|
||
text,
|
||
back_callback="admin:menu:admins",
|
||
is_admin=True,
|
||
admin_mode=True,
|
||
)
|
||
|
||
|
||
@router.message(Command("admin_del"))
|
||
async def cmd_admin_del(message: Message) -> None:
|
||
if not await _require_admin(message):
|
||
return
|
||
|
||
user_id = _extract_target_user_id(message)
|
||
if user_id is None:
|
||
await message.answer("Использование: /admin_del <user_id>")
|
||
return
|
||
|
||
removed = await remove_admin(user_id)
|
||
text = (
|
||
f"Runtime-администратор <code>{user_id}</code> удалён."
|
||
if removed
|
||
else f"Runtime-администратор <code>{user_id}</code> не найден."
|
||
)
|
||
await _deliver_result(
|
||
message,
|
||
text,
|
||
back_callback="admin:menu:admins",
|
||
is_admin=True,
|
||
admin_mode=True,
|
||
)
|
||
|
||
|
||
@router.message(F.text)
|
||
async def cmd_pending_recipient_input(message: Message) -> None:
|
||
user_id = _sender_id(message)
|
||
if user_id is None:
|
||
return
|
||
if not await _require_admin(message):
|
||
return
|
||
|
||
raw_value = (message.text or "").strip()
|
||
if raw_value.startswith("/"):
|
||
return
|
||
|
||
if user_id in PENDING_SETTING_ACTIONS:
|
||
setting_key, target = PENDING_SETTING_ACTIONS.pop(user_id)
|
||
|
||
if setting_key == "topic":
|
||
if not raw_value.lstrip("-").isdigit():
|
||
await message.answer("Нужен числовой topic ID.")
|
||
return
|
||
await set_topic_override(target, int(raw_value))
|
||
text = f"Topic для <b>{escape(target)}</b> обновлён: <code>{escape(raw_value)}</code>."
|
||
back_callback = "admin:menu:routing"
|
||
elif setting_key == "sync_interval_minutes":
|
||
if not raw_value.isdigit():
|
||
await message.answer("Нужно указать интервал в минутах числом.")
|
||
return
|
||
await set_runtime_setting("sync_interval_minutes", raw_value)
|
||
from glitchup_bot.tasks.scheduler import reload_scheduler
|
||
|
||
await reload_scheduler()
|
||
text = f"Интервал sync обновлён: <code>{escape(raw_value)}</code> мин."
|
||
back_callback = "admin:menu:sync"
|
||
elif setting_key == "digest_cron_day":
|
||
await set_runtime_setting("digest_cron_day", raw_value)
|
||
from glitchup_bot.tasks.scheduler import reload_scheduler
|
||
|
||
await reload_scheduler()
|
||
text = f"День weekly digest обновлён: <code>{escape(raw_value)}</code>."
|
||
back_callback = "admin:menu:sync"
|
||
elif setting_key == "digest_time":
|
||
if ":" not in raw_value:
|
||
await message.answer("Нужно указать время в формате HH:MM.")
|
||
return
|
||
hour, minute = raw_value.split(":", 1)
|
||
if not (hour.isdigit() and minute.isdigit()):
|
||
await message.answer("Нужно указать время в формате HH:MM.")
|
||
return
|
||
await set_runtime_setting("digest_cron_hour", hour)
|
||
await set_runtime_setting("digest_cron_minute", minute)
|
||
from glitchup_bot.tasks.scheduler import reload_scheduler
|
||
|
||
await reload_scheduler()
|
||
text = f"Время weekly digest обновлено: <code>{escape(raw_value)}</code>."
|
||
back_callback = "admin:menu:sync"
|
||
elif setting_key == "digest_timezone":
|
||
await set_runtime_setting("digest_timezone", raw_value)
|
||
from glitchup_bot.tasks.scheduler import reload_scheduler
|
||
|
||
await reload_scheduler()
|
||
text = f"Timezone обновлён: <code>{escape(raw_value)}</code>."
|
||
back_callback = "admin:menu:sync"
|
||
else:
|
||
await set_runtime_setting(setting_key, raw_value)
|
||
text = f"Настройка <code>{escape(setting_key)}</code> обновлена."
|
||
back_callback = "admin:menu:settings"
|
||
|
||
await _deliver_result(
|
||
message,
|
||
text,
|
||
back_callback=back_callback,
|
||
is_admin=True,
|
||
admin_mode=True,
|
||
)
|
||
return
|
||
|
||
if user_id in PENDING_ADMIN_ACTIONS:
|
||
if not raw_value.lstrip("-").isdigit():
|
||
await message.answer("Нужен числовой Telegram ID.")
|
||
return
|
||
|
||
action = PENDING_ADMIN_ACTIONS.pop(user_id)
|
||
target_id = int(raw_value)
|
||
if action == "add":
|
||
added = await add_admin(target_id)
|
||
text = (
|
||
f"Администратор <code>{target_id}</code> добавлен."
|
||
if added
|
||
else (
|
||
f"Пользователь <code>{target_id}</code> уже есть среди "
|
||
"runtime-администраторов."
|
||
)
|
||
)
|
||
else:
|
||
removed = await remove_admin(target_id)
|
||
text = (
|
||
f"Runtime-администратор <code>{target_id}</code> удалён."
|
||
if removed
|
||
else f"Runtime-администратор <code>{target_id}</code> не найден."
|
||
)
|
||
|
||
await _deliver_result(
|
||
message,
|
||
text,
|
||
back_callback="admin:menu:admins",
|
||
is_admin=True,
|
||
admin_mode=True,
|
||
)
|
||
return
|
||
|
||
if user_id not in PENDING_RECIPIENT_ACTIONS:
|
||
return
|
||
|
||
if not raw_value.lstrip("-").isdigit():
|
||
await message.answer("Нужен числовой Telegram ID.")
|
||
return
|
||
|
||
action, group_name = PENDING_RECIPIENT_ACTIONS.pop(user_id)
|
||
target_id = int(raw_value)
|
||
|
||
if action == "add":
|
||
await add_subscriber(group_name, target_id)
|
||
text = (
|
||
f"Telegram ID <code>{target_id}</code> добавлен в группу "
|
||
f"<b>{escape(group_name)}</b>."
|
||
)
|
||
else:
|
||
removed = await remove_subscriber(group_name, target_id)
|
||
text = (
|
||
f"Telegram ID <code>{target_id}</code> удалён из группы "
|
||
f"<b>{escape(group_name)}</b>."
|
||
if removed
|
||
else f"Telegram ID <code>{target_id}</code> не найден в группе "
|
||
f"<b>{escape(group_name)}</b>."
|
||
)
|
||
|
||
await _deliver_result(
|
||
message,
|
||
text,
|
||
back_callback=f"admin:recipients:{group_name}",
|
||
is_admin=True,
|
||
admin_mode=True,
|
||
)
|
||
|
||
|
||
def _admin_routing_text() -> str:
|
||
return "\n".join(
|
||
[
|
||
"<b>Topic и routing</b>",
|
||
"",
|
||
"Здесь можно смотреть текущую схему доставки и менять topic override без env.",
|
||
(
|
||
"Routing определяет, к какой группе относится проект, а topic override "
|
||
"определяет тему Telegram для backend, frontend и digest."
|
||
),
|
||
]
|
||
)
|
||
|
||
|
||
def _admin_sync_text() -> str:
|
||
return "\n".join(
|
||
[
|
||
"<b>Синхронизация</b>",
|
||
"",
|
||
"Синхронизация подтягивает актуальные issues из GlitchTip в локальный кэш бота.",
|
||
(
|
||
"Здесь удобно запускать ручной sync, смотреть статус и менять "
|
||
"расписание, не трогая .env."
|
||
),
|
||
]
|
||
)
|
||
|
||
|
||
def _admin_recipients_text() -> str:
|
||
return "\n".join(
|
||
[
|
||
"<b>Получатели уведомлений</b>",
|
||
"",
|
||
"Выберите группу и назначайте Telegram ID через кнопки.",
|
||
(
|
||
"Получатели — это пользователи, которым бот отправляет уведомления "
|
||
"по backend или frontend."
|
||
),
|
||
]
|
||
)
|
||
|
||
|
||
async def _runtime_settings_text() -> str:
|
||
runtime = await get_runtime_settings()
|
||
return "\n".join(
|
||
[
|
||
"<b>Настройки бота</b>",
|
||
"",
|
||
"Здесь собраны тексты интерфейса, которые можно менять без правки .env.",
|
||
f"• название: {escape(runtime.bot_title)}",
|
||
f"• описание: {escape(runtime.bot_purpose)}",
|
||
f"• подсказка админу: {escape(runtime.bot_admin_hint)}",
|
||
]
|
||
)
|
||
|
||
|
||
def _recipient_group_text(group_name: str) -> str:
|
||
return "\n".join(
|
||
[
|
||
f"<b>{escape(group_name.capitalize())} получатели</b>",
|
||
"",
|
||
"Здесь можно посмотреть список, добавить новый Telegram ID или удалить существующий.",
|
||
(
|
||
"Удаление отсюда прекращает доставку уведомлений этому пользователю "
|
||
"по выбранной группе."
|
||
),
|
||
]
|
||
)
|
||
|
||
|
||
async def _admins_text() -> str:
|
||
admins = await list_effective_admins()
|
||
lines = [
|
||
"<b>Администраторы</b>",
|
||
"",
|
||
"Администраторы могут менять настройки бота, получателей и расписание.",
|
||
"Добавление и удаление можно делать прямо кнопками ниже по Telegram ID.",
|
||
]
|
||
if not admins:
|
||
lines.extend(["", "Список пока пуст."])
|
||
return "\n".join(lines)
|
||
|
||
lines.append("")
|
||
for user_id, source in admins:
|
||
source_label = source.replace("env", "из .env")
|
||
lines.append(f"• <code>{user_id}</code> — {source_label}")
|
||
return "\n".join(lines)
|
||
|
||
|
||
@router.message(Command("subscribe"))
|
||
async def cmd_subscribe(message: Message) -> None:
|
||
await message.answer("Самоподписка отключена. Получателей настраивает администратор.")
|
||
|
||
|
||
@router.message(Command("unsubscribe"))
|
||
async def cmd_unsubscribe(message: Message) -> None:
|
||
await message.answer("Самоподписка отключена. Получателей настраивает администратор.")
|
||
|
||
|
||
@router.message(Command("ownership"))
|
||
async def cmd_ownership(message: Message) -> None:
|
||
if not await _require_admin(message):
|
||
return
|
||
|
||
await _deliver_result(
|
||
message,
|
||
await _ownership_text(),
|
||
back_callback="admin:open",
|
||
is_admin=True,
|
||
admin_mode=True,
|
||
)
|
||
|
||
|
||
@router.message(Command("owner"))
|
||
async def cmd_owner(message: Message) -> None:
|
||
if not await _require_admin(message):
|
||
return
|
||
|
||
args = message.text.split(maxsplit=2) if message.text else []
|
||
if len(args) < 3:
|
||
await message.answer("Использование: /owner <slug> <backend|frontend>")
|
||
return
|
||
|
||
project_slug = args[1].strip()
|
||
group_name = args[2].strip().lower()
|
||
await set_project_group(project_slug, group_name)
|
||
await message.answer(
|
||
f"Проект <b>{escape(project_slug)}</b> привязан к группе {escape(group_name)}."
|
||
)
|
||
|
||
|
||
@router.message(Command("owner_reset"))
|
||
async def cmd_owner_reset(message: Message) -> None:
|
||
if not await _require_admin(message):
|
||
return
|
||
|
||
args = message.text.split(maxsplit=1) if message.text else []
|
||
if len(args) < 2:
|
||
await message.answer("Использование: /owner_reset <slug>")
|
||
return
|
||
|
||
removed = await clear_project_group(args[1].strip())
|
||
if not removed:
|
||
await message.answer("Override для проекта не найден.")
|
||
return
|
||
|
||
await message.answer("Override для проекта удалён.")
|
||
|
||
|
||
@router.message(Command("topic"))
|
||
async def cmd_topic(message: Message) -> None:
|
||
if not await _require_admin(message):
|
||
return
|
||
|
||
args = message.text.split(maxsplit=2) if message.text else []
|
||
if len(args) < 3:
|
||
await message.answer(
|
||
"Использование: /topic <backend|frontend|digest> <topic_id>"
|
||
)
|
||
return
|
||
|
||
group_name = args[1].strip().lower()
|
||
topic_id = int(args[2].strip())
|
||
await set_topic_override(group_name, topic_id)
|
||
await message.answer(f"Topic override для <b>{escape(group_name)}</b> сохранён: {topic_id}.")
|
||
|
||
|
||
@router.message(Command("topic_reset"))
|
||
async def cmd_topic_reset(message: Message) -> None:
|
||
if not await _require_admin(message):
|
||
return
|
||
|
||
args = message.text.split(maxsplit=1) if message.text else []
|
||
if len(args) < 2:
|
||
await message.answer("Использование: /topic_reset <backend|frontend|digest>")
|
||
return
|
||
|
||
removed = await clear_topic_override(args[1].strip().lower())
|
||
if not removed:
|
||
await message.answer("Topic override не найден.")
|
||
return
|
||
|
||
await message.answer("Topic override удалён.")
|
||
|
||
|
||
@router.message(Command("mute_add"))
|
||
async def cmd_mute_add(message: Message) -> None:
|
||
if not await _require_admin(message):
|
||
return
|
||
|
||
args = message.text.split(maxsplit=1) if message.text else []
|
||
if len(args) < 2:
|
||
await message.answer("Использование: /mute_add <regex>")
|
||
return
|
||
|
||
rule = await add_rule(args[1].strip())
|
||
await message.answer(f"Добавлено mute rule #{rule.id}: <code>{escape(rule.pattern)}</code>")
|
||
|
||
|
||
@router.message(Command("mute_list"))
|
||
async def cmd_mute_list(message: Message) -> None:
|
||
if not await _require_admin(message):
|
||
return
|
||
|
||
await _deliver_result(
|
||
message,
|
||
await _mute_rules_text(),
|
||
back_callback="admin:open",
|
||
is_admin=True,
|
||
admin_mode=True,
|
||
)
|
||
|
||
|
||
@router.message(Command("mute_del"))
|
||
async def cmd_mute_del(message: Message) -> None:
|
||
if not await _require_admin(message):
|
||
return
|
||
|
||
args = message.text.split(maxsplit=1) if message.text else []
|
||
if len(args) < 2:
|
||
await message.answer("Использование: /mute_del <id>")
|
||
return
|
||
|
||
removed = await remove_rule(int(args[1].strip()))
|
||
if not removed:
|
||
await message.answer("Mute rule не найдено.")
|
||
return
|
||
|
||
await message.answer("Mute rule удалено.")
|