260 lines
8.9 KiB
Python
260 lines
8.9 KiB
Python
"""
|
||
Управление вебхуком бота через класс-менеджер
|
||
"""
|
||
import secrets
|
||
from typing import Optional
|
||
|
||
from aiohttp import web
|
||
from aiogram import Bot, Dispatcher
|
||
from aiogram.types import WebhookInfo
|
||
from aiogram.webhook.aiohttp_server import SimpleRequestHandler, setup_application
|
||
|
||
from configs import settings
|
||
from middleware.loggers import logger
|
||
|
||
__all__ = ('WebhookManager',)
|
||
|
||
|
||
class WebhookManager:
|
||
"""
|
||
Менеджер для управления webhook режимом.
|
||
|
||
Инкапсулирует всю логику работы с webhook:
|
||
- Создание aiohttp приложения
|
||
- Регистрация handlers
|
||
- Установка/удаление webhook
|
||
- Запуск webhook сервера
|
||
|
||
Attributes:
|
||
bot: Экземпляр бота
|
||
dp: Диспетчер
|
||
app: aiohttp приложение
|
||
secret_token: Секретный токен для webhook
|
||
"""
|
||
|
||
def __init__(self, bot: Bot, dp: Dispatcher):
|
||
"""
|
||
Args:
|
||
bot: Экземпляр бота
|
||
dp: Диспетчер
|
||
"""
|
||
self.bot = bot
|
||
self.dp = dp
|
||
self.app = web.Application()
|
||
self._configured = False
|
||
|
||
# Генерируем или используем существующий токен
|
||
self.secret_token = self._get_or_generate_token()
|
||
|
||
def _get_or_generate_token(self) -> str:
|
||
"""
|
||
Получает токен из настроек или генерирует новый.
|
||
|
||
Returns:
|
||
str: Секретный токен
|
||
"""
|
||
if hasattr(settings, 'SECRET_TOKEN') and settings.SECRET_TOKEN:
|
||
logger.debug("Используется SECRET_TOKEN из настроек", log_type='WEBHOOK')
|
||
return settings.SECRET_TOKEN
|
||
|
||
# Генерируем случайный токен (32 символа)
|
||
token = secrets.token_urlsafe(32)
|
||
logger.info(
|
||
f"🔐 Сгенерирован новый SECRET_TOKEN: {token[:8]}...",
|
||
log_type='WEBHOOK'
|
||
)
|
||
return token
|
||
|
||
async def get_info(self) -> WebhookInfo:
|
||
"""
|
||
Получает информацию о текущем вебхуке.
|
||
|
||
Returns:
|
||
WebhookInfo: Информация о вебхуке
|
||
"""
|
||
try:
|
||
info = await self.bot.get_webhook_info()
|
||
logger.debug(
|
||
f"Webhook URL: {info.url or 'не установлен'}",
|
||
log_type='WEBHOOK'
|
||
)
|
||
return info
|
||
except Exception as e:
|
||
logger.error(
|
||
f"Ошибка получения информации о вебхуке: {e}",
|
||
log_type='WEBHOOK'
|
||
)
|
||
raise
|
||
|
||
async def delete(self, drop_pending_updates: bool = True) -> bool:
|
||
"""
|
||
Удаляет текущий вебхук.
|
||
|
||
Args:
|
||
drop_pending_updates: Удалить накопленные обновления
|
||
|
||
Returns:
|
||
bool: True если удаление успешно
|
||
"""
|
||
try:
|
||
result = await self.bot.delete_webhook(
|
||
drop_pending_updates=drop_pending_updates
|
||
)
|
||
|
||
if result:
|
||
logger.success("✓ Вебхук успешно удален", log_type='WEBHOOK')
|
||
else:
|
||
logger.debug("Вебхук не был установлен", log_type='WEBHOOK')
|
||
|
||
return result
|
||
|
||
except Exception as e:
|
||
logger.error(f"Ошибка удаления вебхука: {e}", log_type='WEBHOOK')
|
||
return False
|
||
|
||
async def setup(
|
||
self,
|
||
webhook_url: Optional[str] = None,
|
||
secret_token: Optional[str] = None,
|
||
drop_pending_updates: bool = True
|
||
) -> bool:
|
||
"""
|
||
Устанавливает вебхук для бота.
|
||
|
||
Args:
|
||
webhook_url: URL вебхука (если None, берется из settings)
|
||
secret_token: Секретный токен (если None, используется self.secret_token)
|
||
drop_pending_updates: Удалить накопленные обновления
|
||
|
||
Returns:
|
||
bool: True если установка успешна
|
||
"""
|
||
url = webhook_url or settings.WEBHOOK_URL
|
||
token = secret_token or self.secret_token
|
||
|
||
if not url:
|
||
logger.error("WEBHOOK_URL не установлен", log_type='WEBHOOK')
|
||
return False
|
||
|
||
try:
|
||
# Проверяем текущий webhook
|
||
current_info = await self.bot.get_webhook_info()
|
||
|
||
# Если уже установлен правильный URL, не трогаем
|
||
if current_info.url == url:
|
||
logger.info(
|
||
f"✓ Webhook уже установлен на {url}",
|
||
log_type='WEBHOOK'
|
||
)
|
||
return True
|
||
|
||
# Удаляем старый webhook если есть
|
||
if current_info.url:
|
||
logger.debug(f"Удаление старого webhook: {current_info.url}", log_type='WEBHOOK')
|
||
await self.delete(drop_pending_updates=drop_pending_updates)
|
||
|
||
# Небольшая задержка
|
||
import asyncio
|
||
await asyncio.sleep(0.5)
|
||
|
||
# Устанавливаем новый
|
||
result = await self.bot.set_webhook(
|
||
url=url,
|
||
secret_token=token,
|
||
drop_pending_updates=drop_pending_updates
|
||
)
|
||
|
||
if result:
|
||
logger.success(f"✓ Вебхук установлен: {url}", log_type='WEBHOOK')
|
||
else:
|
||
logger.error("❌ Не удалось установить вебхук", log_type='WEBHOOK')
|
||
|
||
return result
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ Ошибка установки вебхука: {e}", log_type='WEBHOOK')
|
||
return False
|
||
|
||
def configure(
|
||
self,
|
||
webhook_path: Optional[str] = None,
|
||
secret_token: Optional[str] = None
|
||
) -> None:
|
||
"""
|
||
Конфигурирует webhook handler для aiohttp app.
|
||
|
||
Args:
|
||
webhook_path: Путь для webhook (если None, извлекается из WEBHOOK_URL)
|
||
secret_token: Секретный токен (если None, используется self.secret_token)
|
||
"""
|
||
if self._configured:
|
||
logger.warning("Webhook уже сконфигурирован", log_type='WEBHOOK')
|
||
return
|
||
|
||
# Определяем путь из WEBHOOK_URL
|
||
if webhook_path:
|
||
path = webhook_path
|
||
elif settings.WEBHOOK_URL:
|
||
from urllib.parse import urlparse
|
||
parsed = urlparse(settings.WEBHOOK_URL)
|
||
path = parsed.path if parsed.path else "/webhook"
|
||
else:
|
||
path = "/webhook"
|
||
|
||
# Используем токен
|
||
token = secret_token or self.secret_token
|
||
|
||
# Создаём webhook handler
|
||
webhook_handler = SimpleRequestHandler(
|
||
dispatcher=self.dp,
|
||
bot=self.bot,
|
||
secret_token=token
|
||
)
|
||
|
||
# Регистрируем в aiohttp app
|
||
webhook_handler.register(self.app, path=path)
|
||
setup_application(self.app, self.dp, bot=self.bot)
|
||
|
||
self._configured = True
|
||
logger.success(
|
||
f"✓ Webhook handler настроен на путь: {path}",
|
||
log_type='WEBHOOK'
|
||
)
|
||
|
||
def run(
|
||
self,
|
||
host: Optional[str] = None,
|
||
port: Optional[int] = None,
|
||
access_log: Optional[bool] = None
|
||
) -> None:
|
||
"""
|
||
Запускает webhook сервер (блокирующий вызов).
|
||
|
||
Args:
|
||
host: Хост сервера (если None, берется из settings)
|
||
port: Порт сервера (если None, берется из settings)
|
||
access_log: Логировать запросы (если None, берется из settings)
|
||
"""
|
||
if not self._configured:
|
||
logger.error(
|
||
"Webhook не сконфигурирован! Вызовите configure() перед run()",
|
||
log_type='WEBHOOK'
|
||
)
|
||
return
|
||
|
||
host = host or settings.WEBAPP_HOST
|
||
port = port or settings.WEBAPP_PORT
|
||
access_log_enabled = access_log if access_log is not None else settings.ACCES_LOG
|
||
|
||
logger.info(
|
||
f"🌐 Запуск webhook сервера: {host}:{port}",
|
||
log_type='WEBHOOK'
|
||
)
|
||
|
||
web.run_app(
|
||
self.app,
|
||
host=host,
|
||
port=port,
|
||
access_log=logger if access_log_enabled else None
|
||
)
|