Files
balance_bot/bot/core/webhook.py
2026-01-23 04:45:55 +07:00

54 lines
1.8 KiB
Python

from typing import Any
from aiohttp import web
from aiogram.types import Update
from middleware.loggers import logger
from bot.core.bots import dp, bot
# Настройки экспорта в модули
__all__ = ("WebhookApp",)
class WebhookApp:
"""Приложение aiohttp для обработки webhook-запросов."""
def __init__(self, host: str = "0.0.0.0", port: int = 8080) -> None:
self.host = host
self.port = port
self.app: web.Application = web.Application()
self.app.router.add_post("/webhook", self.handle_update)
self.runner: web.AppRunner | None = None
self.site: web.TCPSite | None = None
@staticmethod
async def handle_update(request: web.Request) -> web.Response:
"""Обработчик входящих запросов от Telegram."""
try:
update_json: dict[str, Any] = await request.json()
update: Update = Update.model_validate(update_json)
await dp.feed_update(bot=bot, update=update)
except Exception as e:
logger.error(f"Ошибка обработки webhook-запроса: {e}")
return web.Response(status=500)
return web.Response(status=200)
async def start(self) -> None:
"""Асинхронный запуск aiohttp-приложения."""
self.runner = web.AppRunner(self.app)
await self.runner.setup()
self.site = web.TCPSite(self.runner, self.host, self.port)
await self.site.start()
logger.info(f"🌍 Webhook сервер запущен на http://{self.host}:{self.port}")
async def stop(self) -> None:
"""Остановка aiohttp-приложения."""
if self.runner:
await self.runner.cleanup()