diff --git a/bot/middlewares/sub_mdw.py b/bot/middlewares/sub_mdw.py new file mode 100644 index 0000000..9f150ac --- /dev/null +++ b/bot/middlewares/sub_mdw.py @@ -0,0 +1,553 @@ +""" +Middleware для проверки подписки пользователей на каналы +""" +from time import time +from typing import Callable, Awaitable, Any, Dict, Optional, Union +from dataclasses import dataclass + +from aiogram import BaseMiddleware, Bot +from aiogram.exceptions import TelegramBadRequest, TelegramForbiddenError +from aiogram.types import TelegramObject, Message, CallbackQuery, InlineKeyboardButton, Chat +from aiogram.utils.keyboard import InlineKeyboardBuilder +from aiogram.enums import ChatMemberStatus + +from middleware.loggers import logger +from configs import settings + +__all__ = ('SubscriptionMiddleware', 'ChannelConfig') + + +@dataclass +class ChannelConfig: + """ + Конфигурация канала для проверки подписки. + + Attributes: + id: ID или username канала + name: Название канала (для отображения) + invite_link: Пригласительная ссылка + required: Обязательная ли подписка + """ + id: Union[str, int] + name: Optional[str] = None + invite_link: Optional[str] = None + required: bool = True + + +class SubscriptionCache: + """ + Кэш для проверок подписки. + + Уменьшает количество запросов к Telegram API. + """ + + def __init__(self, ttl: float = 300.0): + """ + Args: + ttl: Время жизни кэша в секундах (по умолчанию 5 минут) + """ + self.ttl = ttl + # Структура: {(user_id, channel_id): (is_subscribed, timestamp)} + self._cache: Dict[tuple[int, Union[str, int]], tuple[bool, float]] = {} + + def get(self, user_id: int, channel_id: Union[str, int]) -> Optional[bool]: + """ + Получает значение из кэша. + + Args: + user_id: ID пользователя + channel_id: ID канала + + Returns: + bool или None: True/False если в кэше и актуально, иначе None + """ + key = (user_id, channel_id) + + if key in self._cache: + is_subscribed, timestamp = self._cache[key] + + # Проверяем актуальность + if time() - timestamp < self.ttl: + return is_subscribed + else: + # Удаляем устаревшую запись + del self._cache[key] + + return None + + def set(self, user_id: int, channel_id: Union[str, int], is_subscribed: bool) -> None: + """ + Сохраняет значение в кэш. + + Args: + user_id: ID пользователя + channel_id: ID канала + is_subscribed: Статус подписки + """ + key = (user_id, channel_id) + self._cache[key] = (is_subscribed, time()) + + def invalidate(self, user_id: Optional[int] = None, channel_id: Optional[Union[str, int]] = None) -> None: + """ + Инвалидирует кэш. + + Args: + user_id: ID пользователя (если None, инвалидирует все) + channel_id: ID канала (если None, инвалидирует все для пользователя) + """ + if user_id is None and channel_id is None: + # Полная очистка + self._cache.clear() + elif user_id is not None and channel_id is None: + # Удаляем все записи пользователя + keys_to_delete = [key for key in self._cache if key[0] == user_id] + for key in keys_to_delete: + del self._cache[key] + elif user_id is not None and channel_id is not None: + # Удаляем конкретную запись + key = (user_id, channel_id) + if key in self._cache: + del self._cache[key] + + def cleanup(self) -> int: + """ + Удаляет устаревшие записи. + + Returns: + int: Количество удаленных записей + """ + current_time = time() + keys_to_delete = [ + key for key, (_, timestamp) in self._cache.items() + if current_time - timestamp >= self.ttl + ] + + for key in keys_to_delete: + del self._cache[key] + + return len(keys_to_delete) + + +class SubscriptionMiddleware(BaseMiddleware): + """ + Middleware для проверки подписки пользователя на каналы. + + Возможности: + - Проверка подписки на один или несколько каналов + - Кэширование результатов проверки + - Whitelist для администраторов + - Автоматическое получение ссылок на каналы + - Гибкая настройка обязательных/необязательных каналов + - Красивое сообщение с кнопками подписки + + Attributes: + bot: Экземпляр бота + channels: Список конфигураций каналов + cache_ttl: Время жизни кэша в секундах + whitelist_admins: Пропускать ли администраторов бота + show_buttons: Показывать ли кнопки для подписки + + Example: + ```python + from middleware.subscription import SubscriptionMiddleware, ChannelConfig + + channels = [ + ChannelConfig( + id="@my_channel", + name="Основной канал", + invite_link="https://t.me/my_channel" + ), + ChannelConfig( + id=-1001234567890, + name="Закрытый канал", + required=True + ) + ] + + dp.message.middleware(SubscriptionMiddleware(bot, channels)) + dp.callback_query.middleware(SubscriptionMiddleware(bot, channels)) + ``` + """ + + def __init__( + self, + bot: Bot, + channels: list[Union[ChannelConfig, str, int]], + cache_ttl: float = 300.0, + whitelist_admins: bool = True, + show_buttons: bool = True, + auto_fetch_links: bool = True + ): + """ + Инициализация middleware. + + Args: + bot: Экземпляр бота + channels: Список каналов (ChannelConfig, ID или username) + cache_ttl: Время жизни кэша в секундах + whitelist_admins: Пропускать администраторов бота + show_buttons: Показывать кнопки подписки + auto_fetch_links: Автоматически получать ссылки на каналы + """ + super().__init__() + self.bot = bot + self.cache = SubscriptionCache(ttl=cache_ttl) + self.whitelist_admins = whitelist_admins + self.show_buttons = show_buttons + self.auto_fetch_links = auto_fetch_links + + # Преобразуем channels в ChannelConfig + self.channels: list[ChannelConfig] = [] + for channel in channels: + if isinstance(channel, ChannelConfig): + self.channels.append(channel) + else: + # Простой ID/username -> ChannelConfig + self.channels.append(ChannelConfig(id=channel)) + + # Кэш информации о каналах + self._channel_info_cache: Dict[Union[str, int], Optional[Chat]] = {} + + async def _get_channel_info(self, channel_id: Union[str, int]) -> Optional[Chat]: + """ + Получает информацию о канале. + + Args: + channel_id: ID или username канала + + Returns: + Chat или None: Информация о канале + """ + if channel_id in self._channel_info_cache: + return self._channel_info_cache[channel_id] + + try: + chat = await self.bot.get_chat(channel_id) + self._channel_info_cache[channel_id] = chat + return chat + except (TelegramBadRequest, TelegramForbiddenError) as e: + logger.error( + f"Не удалось получить информацию о канале {channel_id}: {e}", + log_type='SUBSCRIPTION' + ) + self._channel_info_cache[channel_id] = None + return None + + async def _check_subscription( + self, + user_id: int, + channel_config: ChannelConfig + ) -> bool: + """ + Проверяет подписку пользователя на канал. + + Args: + user_id: ID пользователя + channel_config: Конфигурация канала + + Returns: + bool: True если подписан + """ + channel_id = channel_config.id + + # Проверяем кэш + cached = self.cache.get(user_id, channel_id) + if cached is not None: + logger.debug( + f"Использован кэш для проверки подписки на {channel_id}: {cached}", + log_type='SUBSCRIPTION' + ) + return cached + + # Выполняем проверку + try: + member = await self.bot.get_chat_member( + chat_id=channel_id, + user_id=user_id + ) + + is_subscribed = member.status in ( + ChatMemberStatus.MEMBER, + ChatMemberStatus.ADMINISTRATOR, + ChatMemberStatus.CREATOR + ) + + # Сохраняем в кэш + self.cache.set(user_id, channel_id, is_subscribed) + + logger.debug( + f"Проверка подписки user={user_id} на канал={channel_id}: " + f"{member.status.value} ({'✅' if is_subscribed else '❌'})", + log_type='SUBSCRIPTION' + ) + + return is_subscribed + + except TelegramBadRequest as e: + logger.warning( + f"Канал {channel_id} недоступен или неверный: {e}", + log_type='SUBSCRIPTION' + ) + # В случае ошибки считаем что не подписан + self.cache.set(user_id, channel_id, False) + return False + + except TelegramForbiddenError as e: + logger.error( + f"Бот не имеет доступа к каналу {channel_id}: {e}", + log_type='SUBSCRIPTION' + ) + self.cache.set(user_id, channel_id, False) + return False + + async def _build_subscription_message( + self, + not_subscribed: list[ChannelConfig] + ) -> tuple[str, InlineKeyboardBuilder]: + """ + Создает сообщение и клавиатуру для подписки. + + Args: + not_subscribed: Список каналов без подписки + + Returns: + tuple: (текст_сообщения, клавиатура) + """ + # Текст сообщения + text = "📢 Для использования бота необходимо подписаться на каналы:\n\n" + + # Клавиатура + keyboard = InlineKeyboardBuilder() + + for i, channel_config in enumerate(not_subscribed, 1): + # Получаем информацию о канале + channel_info = await self._get_channel_info(channel_config.id) + + # Определяем название канала + if channel_config.name: + channel_name = channel_config.name + elif channel_info: + channel_name = channel_info.title + else: + channel_name = f"Канал {i}" + + # Добавляем в текст + text += f"{i}. {channel_name}\n" + + # Определяем ссылку + invite_link = channel_config.invite_link + + if not invite_link and self.auto_fetch_links and channel_info: + # Пытаемся получить ссылку + if channel_info.username: + invite_link = f"https://t.me/{channel_info.username}" + elif channel_info.invite_link: + invite_link = channel_info.invite_link + + # Добавляем кнопку если есть ссылка + if invite_link and self.show_buttons: + keyboard.row( + InlineKeyboardButton( + text=f"📌 {channel_name}", + url=invite_link + ) + ) + + text += "\n✅ После подписки нажмите кнопку ниже для проверки." + + # Кнопка проверки подписки + keyboard.row( + InlineKeyboardButton( + text="✅ Я подписался", + callback_data="check_subscription" + ) + ) + + return text, keyboard + + async def __call__( + self, + handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]], + event: TelegramObject, + data: Dict[str, Any] + ) -> Optional[Any]: + """ + Проверяет подписку перед выполнением хендлера. + + Args: + handler: Функция хендлера + event: Объект события + data: Дополнительные данные + + Returns: + Результат хендлера или None если не подписан + """ + # Пропускаем не-сообщения и не-callback + if not isinstance(event, (Message, CallbackQuery)): + return await handler(event, data) + + # Извлекаем user_id + user_id = event.from_user.id if event.from_user else None + + if user_id is None: + return await handler(event, data) + + user_str = f"@{event.from_user.username}" if event.from_user.username else f"id{user_id}" + + # Whitelist для администраторов + if self.whitelist_admins and user_id in settings.super_admin_ids: + logger.debug( + f"Администратор {user_str} пропущен без проверки подписки", + log_type='SUBSCRIPTION' + ) + return await handler(event, data) + + # Проверяем подписку на все каналы + not_subscribed: list[ChannelConfig] = [] + + for channel_config in self.channels: + # Пропускаем необязательные каналы + if not channel_config.required: + continue + + is_subscribed = await self._check_subscription(user_id, channel_config) + + if not is_subscribed: + not_subscribed.append(channel_config) + + # Если есть каналы без подписки + if not_subscribed: + logger.info( + f"Пользователь не подписан на {len(not_subscribed)} каналов", + log_type='SUBSCRIPTION', + user=user_str + ) + + # Создаем сообщение + text, keyboard = await self._build_subscription_message(not_subscribed) + + # Отправляем сообщение + if isinstance(event, Message): + await event.answer( + text, + reply_markup=keyboard.as_markup(), + parse_mode="HTML" + ) + elif isinstance(event, CallbackQuery): + # Для callback отправляем в чат или редактируем + if event.message: + try: + await event.message.edit_text( + text, + reply_markup=keyboard.as_markup(), + parse_mode="HTML" + ) + except: + await event.message.answer( + text, + reply_markup=keyboard.as_markup(), + parse_mode="HTML" + ) + + await event.answer( + "⚠️ Требуется подписка на каналы", + show_alert=True + ) + + return None + + # Все подписки в порядке + logger.debug( + f"Проверка подписки пройдена", + log_type='SUBSCRIPTION', + user=user_str + ) + + return await handler(event, data) + + def invalidate_cache( + self, + user_id: Optional[int] = None, + channel_id: Optional[Union[str, int]] = None + ) -> None: + """ + Публичный метод для инвалидации кэша. + + Используется при обработке callback "check_subscription". + + Args: + user_id: ID пользователя + channel_id: ID канала + """ + self.cache.invalidate(user_id, channel_id) + + +# ================= HANDLER ДЛЯ ПРОВЕРКИ ПОДПИСКИ ================= + +async def handle_check_subscription( + callback: CallbackQuery, + subscription_middleware: SubscriptionMiddleware +): + """ + Обработчик callback для повторной проверки подписки. + + Example: + ```python + from filters.callback import CallbackStartsWith + from middleware.subscription import handle_check_subscription, subscription_middleware + + @router.callback_query(CallbackStartsWith("check_subscription")) + async def check_sub(callback: CallbackQuery): + await handle_check_subscription(callback, subscription_middleware) + ``` + """ + user_id = callback.from_user.id + + # Инвалидируем кэш для пользователя + subscription_middleware.invalidate_cache(user_id=user_id) + + await callback.answer("🔄 Проверяю подписку...", show_alert=False) + + # Перепроверяем подписку + not_subscribed = [] + + for channel_config in subscription_middleware.channels: + if not channel_config.required: + continue + + is_subscribed = await subscription_middleware._check_subscription( + user_id, + channel_config + ) + + if not is_subscribed: + not_subscribed.append(channel_config) + + if not_subscribed: + # Все еще не подписан + text, keyboard = await subscription_middleware._build_subscription_message(not_subscribed) + + await callback.message.edit_text( + text, + reply_markup=keyboard.as_markup(), + parse_mode="HTML" + ) + + await callback.answer( + f"❌ Вы еще не подписаны на {len(not_subscribed)} каналов", + show_alert=True + ) + else: + # Подписка подтверждена + await callback.message.delete() + await callback.message.answer( + "✅ Подписка подтверждена!\n\n" + "Теперь вы можете пользоваться ботом. Используйте /start", + parse_mode="HTML" + ) + + logger.info( + f"Подписка успешно подтверждена", + log_type='SUBSCRIPTION', + user=f"@{callback.from_user.username}" if callback.from_user.username else f"id{user_id}" + )