554 lines
20 KiB
Python
554 lines
20 KiB
Python
"""
|
||
Middleware для проверки подписки пользователей на каналы
|
||
"""
|
||
from time import time
|
||
from typing import Callable, Awaitable, Any, Dict, Optional, Union
|
||
from dataclasses import dataclass
|
||
|
||
from aiogram import BaseMiddleware, Bot
|
||
from aiogram.exceptions import TelegramBadRequest, TelegramForbiddenError
|
||
from aiogram.types import TelegramObject, Message, CallbackQuery, InlineKeyboardButton, Chat
|
||
from aiogram.utils.keyboard import InlineKeyboardBuilder
|
||
from aiogram.enums import ChatMemberStatus
|
||
|
||
from middleware.loggers import logger
|
||
from configs import settings
|
||
|
||
__all__ = ('SubscriptionMiddleware', 'ChannelConfig')
|
||
|
||
|
||
@dataclass
|
||
class ChannelConfig:
|
||
"""
|
||
Конфигурация канала для проверки подписки.
|
||
|
||
Attributes:
|
||
id: ID или username канала
|
||
name: Название канала (для отображения)
|
||
invite_link: Пригласительная ссылка
|
||
required: Обязательная ли подписка
|
||
"""
|
||
id: Union[str, int]
|
||
name: Optional[str] = None
|
||
invite_link: Optional[str] = None
|
||
required: bool = True
|
||
|
||
|
||
class SubscriptionCache:
|
||
"""
|
||
Кэш для проверок подписки.
|
||
|
||
Уменьшает количество запросов к Telegram API.
|
||
"""
|
||
|
||
def __init__(self, ttl: float = 300.0):
|
||
"""
|
||
Args:
|
||
ttl: Время жизни кэша в секундах (по умолчанию 5 минут)
|
||
"""
|
||
self.ttl = ttl
|
||
# Структура: {(user_id, channel_id): (is_subscribed, timestamp)}
|
||
self._cache: Dict[tuple[int, Union[str, int]], tuple[bool, float]] = {}
|
||
|
||
def get(self, user_id: int, channel_id: Union[str, int]) -> Optional[bool]:
|
||
"""
|
||
Получает значение из кэша.
|
||
|
||
Args:
|
||
user_id: ID пользователя
|
||
channel_id: ID канала
|
||
|
||
Returns:
|
||
bool или None: True/False если в кэше и актуально, иначе None
|
||
"""
|
||
key = (user_id, channel_id)
|
||
|
||
if key in self._cache:
|
||
is_subscribed, timestamp = self._cache[key]
|
||
|
||
# Проверяем актуальность
|
||
if time() - timestamp < self.ttl:
|
||
return is_subscribed
|
||
else:
|
||
# Удаляем устаревшую запись
|
||
del self._cache[key]
|
||
|
||
return None
|
||
|
||
def set(self, user_id: int, channel_id: Union[str, int], is_subscribed: bool) -> None:
|
||
"""
|
||
Сохраняет значение в кэш.
|
||
|
||
Args:
|
||
user_id: ID пользователя
|
||
channel_id: ID канала
|
||
is_subscribed: Статус подписки
|
||
"""
|
||
key = (user_id, channel_id)
|
||
self._cache[key] = (is_subscribed, time())
|
||
|
||
def invalidate(self, user_id: Optional[int] = None, channel_id: Optional[Union[str, int]] = None) -> None:
|
||
"""
|
||
Инвалидирует кэш.
|
||
|
||
Args:
|
||
user_id: ID пользователя (если None, инвалидирует все)
|
||
channel_id: ID канала (если None, инвалидирует все для пользователя)
|
||
"""
|
||
if user_id is None and channel_id is None:
|
||
# Полная очистка
|
||
self._cache.clear()
|
||
elif user_id is not None and channel_id is None:
|
||
# Удаляем все записи пользователя
|
||
keys_to_delete = [key for key in self._cache if key[0] == user_id]
|
||
for key in keys_to_delete:
|
||
del self._cache[key]
|
||
elif user_id is not None and channel_id is not None:
|
||
# Удаляем конкретную запись
|
||
key = (user_id, channel_id)
|
||
if key in self._cache:
|
||
del self._cache[key]
|
||
|
||
def cleanup(self) -> int:
|
||
"""
|
||
Удаляет устаревшие записи.
|
||
|
||
Returns:
|
||
int: Количество удаленных записей
|
||
"""
|
||
current_time = time()
|
||
keys_to_delete = [
|
||
key for key, (_, timestamp) in self._cache.items()
|
||
if current_time - timestamp >= self.ttl
|
||
]
|
||
|
||
for key in keys_to_delete:
|
||
del self._cache[key]
|
||
|
||
return len(keys_to_delete)
|
||
|
||
|
||
class SubscriptionMiddleware(BaseMiddleware):
|
||
"""
|
||
Middleware для проверки подписки пользователя на каналы.
|
||
|
||
Возможности:
|
||
- Проверка подписки на один или несколько каналов
|
||
- Кэширование результатов проверки
|
||
- Whitelist для администраторов
|
||
- Автоматическое получение ссылок на каналы
|
||
- Гибкая настройка обязательных/необязательных каналов
|
||
- Красивое сообщение с кнопками подписки
|
||
|
||
Attributes:
|
||
bot: Экземпляр бота
|
||
channels: Список конфигураций каналов
|
||
cache_ttl: Время жизни кэша в секундах
|
||
whitelist_admins: Пропускать ли администраторов бота
|
||
show_buttons: Показывать ли кнопки для подписки
|
||
|
||
Example:
|
||
```python
|
||
from middleware.subscription import SubscriptionMiddleware, ChannelConfig
|
||
|
||
channels = [
|
||
ChannelConfig(
|
||
id="@my_channel",
|
||
name="Основной канал",
|
||
invite_link="https://t.me/my_channel"
|
||
),
|
||
ChannelConfig(
|
||
id=-1001234567890,
|
||
name="Закрытый канал",
|
||
required=True
|
||
)
|
||
]
|
||
|
||
dp.message.middleware(SubscriptionMiddleware(bot, channels))
|
||
dp.callback_query.middleware(SubscriptionMiddleware(bot, channels))
|
||
```
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
bot: Bot,
|
||
channels: list[Union[ChannelConfig, str, int]],
|
||
cache_ttl: float = 300.0,
|
||
whitelist_admins: bool = True,
|
||
show_buttons: bool = True,
|
||
auto_fetch_links: bool = True
|
||
):
|
||
"""
|
||
Инициализация middleware.
|
||
|
||
Args:
|
||
bot: Экземпляр бота
|
||
channels: Список каналов (ChannelConfig, ID или username)
|
||
cache_ttl: Время жизни кэша в секундах
|
||
whitelist_admins: Пропускать администраторов бота
|
||
show_buttons: Показывать кнопки подписки
|
||
auto_fetch_links: Автоматически получать ссылки на каналы
|
||
"""
|
||
super().__init__()
|
||
self.bot = bot
|
||
self.cache = SubscriptionCache(ttl=cache_ttl)
|
||
self.whitelist_admins = whitelist_admins
|
||
self.show_buttons = show_buttons
|
||
self.auto_fetch_links = auto_fetch_links
|
||
|
||
# Преобразуем channels в ChannelConfig
|
||
self.channels: list[ChannelConfig] = []
|
||
for channel in channels:
|
||
if isinstance(channel, ChannelConfig):
|
||
self.channels.append(channel)
|
||
else:
|
||
# Простой ID/username -> ChannelConfig
|
||
self.channels.append(ChannelConfig(id=channel))
|
||
|
||
# Кэш информации о каналах
|
||
self._channel_info_cache: Dict[Union[str, int], Optional[Chat]] = {}
|
||
|
||
async def _get_channel_info(self, channel_id: Union[str, int]) -> Optional[Chat]:
|
||
"""
|
||
Получает информацию о канале.
|
||
|
||
Args:
|
||
channel_id: ID или username канала
|
||
|
||
Returns:
|
||
Chat или None: Информация о канале
|
||
"""
|
||
if channel_id in self._channel_info_cache:
|
||
return self._channel_info_cache[channel_id]
|
||
|
||
try:
|
||
chat = await self.bot.get_chat(channel_id)
|
||
self._channel_info_cache[channel_id] = chat
|
||
return chat
|
||
except (TelegramBadRequest, TelegramForbiddenError) as e:
|
||
logger.error(
|
||
f"Не удалось получить информацию о канале {channel_id}: {e}",
|
||
log_type='SUBSCRIPTION'
|
||
)
|
||
self._channel_info_cache[channel_id] = None
|
||
return None
|
||
|
||
async def _check_subscription(
|
||
self,
|
||
user_id: int,
|
||
channel_config: ChannelConfig
|
||
) -> bool:
|
||
"""
|
||
Проверяет подписку пользователя на канал.
|
||
|
||
Args:
|
||
user_id: ID пользователя
|
||
channel_config: Конфигурация канала
|
||
|
||
Returns:
|
||
bool: True если подписан
|
||
"""
|
||
channel_id = channel_config.id
|
||
|
||
# Проверяем кэш
|
||
cached = self.cache.get(user_id, channel_id)
|
||
if cached is not None:
|
||
logger.debug(
|
||
f"Использован кэш для проверки подписки на {channel_id}: {cached}",
|
||
log_type='SUBSCRIPTION'
|
||
)
|
||
return cached
|
||
|
||
# Выполняем проверку
|
||
try:
|
||
member = await self.bot.get_chat_member(
|
||
chat_id=channel_id,
|
||
user_id=user_id
|
||
)
|
||
|
||
is_subscribed = member.status in (
|
||
ChatMemberStatus.MEMBER,
|
||
ChatMemberStatus.ADMINISTRATOR,
|
||
ChatMemberStatus.CREATOR
|
||
)
|
||
|
||
# Сохраняем в кэш
|
||
self.cache.set(user_id, channel_id, is_subscribed)
|
||
|
||
logger.debug(
|
||
f"Проверка подписки user={user_id} на канал={channel_id}: "
|
||
f"{member.status.value} ({'✅' if is_subscribed else '❌'})",
|
||
log_type='SUBSCRIPTION'
|
||
)
|
||
|
||
return is_subscribed
|
||
|
||
except TelegramBadRequest as e:
|
||
logger.warning(
|
||
f"Канал {channel_id} недоступен или неверный: {e}",
|
||
log_type='SUBSCRIPTION'
|
||
)
|
||
# В случае ошибки считаем что не подписан
|
||
self.cache.set(user_id, channel_id, False)
|
||
return False
|
||
|
||
except TelegramForbiddenError as e:
|
||
logger.error(
|
||
f"Бот не имеет доступа к каналу {channel_id}: {e}",
|
||
log_type='SUBSCRIPTION'
|
||
)
|
||
self.cache.set(user_id, channel_id, False)
|
||
return False
|
||
|
||
async def _build_subscription_message(
|
||
self,
|
||
not_subscribed: list[ChannelConfig]
|
||
) -> tuple[str, InlineKeyboardBuilder]:
|
||
"""
|
||
Создает сообщение и клавиатуру для подписки.
|
||
|
||
Args:
|
||
not_subscribed: Список каналов без подписки
|
||
|
||
Returns:
|
||
tuple: (текст_сообщения, клавиатура)
|
||
"""
|
||
# Текст сообщения
|
||
text = "📢 <b>Для использования бота необходимо подписаться на каналы:</b>\n\n"
|
||
|
||
# Клавиатура
|
||
keyboard = InlineKeyboardBuilder()
|
||
|
||
for i, channel_config in enumerate(not_subscribed, 1):
|
||
# Получаем информацию о канале
|
||
channel_info = await self._get_channel_info(channel_config.id)
|
||
|
||
# Определяем название канала
|
||
if channel_config.name:
|
||
channel_name = channel_config.name
|
||
elif channel_info:
|
||
channel_name = channel_info.title
|
||
else:
|
||
channel_name = f"Канал {i}"
|
||
|
||
# Добавляем в текст
|
||
text += f"{i}. {channel_name}\n"
|
||
|
||
# Определяем ссылку
|
||
invite_link = channel_config.invite_link
|
||
|
||
if not invite_link and self.auto_fetch_links and channel_info:
|
||
# Пытаемся получить ссылку
|
||
if channel_info.username:
|
||
invite_link = f"https://t.me/{channel_info.username}"
|
||
elif channel_info.invite_link:
|
||
invite_link = channel_info.invite_link
|
||
|
||
# Добавляем кнопку если есть ссылка
|
||
if invite_link and self.show_buttons:
|
||
keyboard.row(
|
||
InlineKeyboardButton(
|
||
text=f"📌 {channel_name}",
|
||
url=invite_link
|
||
)
|
||
)
|
||
|
||
text += "\n✅ После подписки нажмите кнопку ниже для проверки."
|
||
|
||
# Кнопка проверки подписки
|
||
keyboard.row(
|
||
InlineKeyboardButton(
|
||
text="✅ Я подписался",
|
||
callback_data="check_subscription"
|
||
)
|
||
)
|
||
|
||
return text, keyboard
|
||
|
||
async def __call__(
|
||
self,
|
||
handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]],
|
||
event: TelegramObject,
|
||
data: Dict[str, Any]
|
||
) -> Optional[Any]:
|
||
"""
|
||
Проверяет подписку перед выполнением хендлера.
|
||
|
||
Args:
|
||
handler: Функция хендлера
|
||
event: Объект события
|
||
data: Дополнительные данные
|
||
|
||
Returns:
|
||
Результат хендлера или None если не подписан
|
||
"""
|
||
# Пропускаем не-сообщения и не-callback
|
||
if not isinstance(event, (Message, CallbackQuery)):
|
||
return await handler(event, data)
|
||
|
||
# Извлекаем user_id
|
||
user_id = event.from_user.id if event.from_user else None
|
||
|
||
if user_id is None:
|
||
return await handler(event, data)
|
||
|
||
user_str = f"@{event.from_user.username}" if event.from_user.username else f"id{user_id}"
|
||
|
||
# Whitelist для администраторов
|
||
if self.whitelist_admins and user_id in settings.super_admin_ids:
|
||
logger.debug(
|
||
f"Администратор {user_str} пропущен без проверки подписки",
|
||
log_type='SUBSCRIPTION'
|
||
)
|
||
return await handler(event, data)
|
||
|
||
# Проверяем подписку на все каналы
|
||
not_subscribed: list[ChannelConfig] = []
|
||
|
||
for channel_config in self.channels:
|
||
# Пропускаем необязательные каналы
|
||
if not channel_config.required:
|
||
continue
|
||
|
||
is_subscribed = await self._check_subscription(user_id, channel_config)
|
||
|
||
if not is_subscribed:
|
||
not_subscribed.append(channel_config)
|
||
|
||
# Если есть каналы без подписки
|
||
if not_subscribed:
|
||
logger.info(
|
||
f"Пользователь не подписан на {len(not_subscribed)} каналов",
|
||
log_type='SUBSCRIPTION',
|
||
user=user_str
|
||
)
|
||
|
||
# Создаем сообщение
|
||
text, keyboard = await self._build_subscription_message(not_subscribed)
|
||
|
||
# Отправляем сообщение
|
||
if isinstance(event, Message):
|
||
await event.answer(
|
||
text,
|
||
reply_markup=keyboard.as_markup(),
|
||
parse_mode="HTML"
|
||
)
|
||
elif isinstance(event, CallbackQuery):
|
||
# Для callback отправляем в чат или редактируем
|
||
if event.message:
|
||
try:
|
||
await event.message.edit_text(
|
||
text,
|
||
reply_markup=keyboard.as_markup(),
|
||
parse_mode="HTML"
|
||
)
|
||
except:
|
||
await event.message.answer(
|
||
text,
|
||
reply_markup=keyboard.as_markup(),
|
||
parse_mode="HTML"
|
||
)
|
||
|
||
await event.answer(
|
||
"⚠️ Требуется подписка на каналы",
|
||
show_alert=True
|
||
)
|
||
|
||
return None
|
||
|
||
# Все подписки в порядке
|
||
logger.debug(
|
||
f"Проверка подписки пройдена",
|
||
log_type='SUBSCRIPTION',
|
||
user=user_str
|
||
)
|
||
|
||
return await handler(event, data)
|
||
|
||
def invalidate_cache(
|
||
self,
|
||
user_id: Optional[int] = None,
|
||
channel_id: Optional[Union[str, int]] = None
|
||
) -> None:
|
||
"""
|
||
Публичный метод для инвалидации кэша.
|
||
|
||
Используется при обработке callback "check_subscription".
|
||
|
||
Args:
|
||
user_id: ID пользователя
|
||
channel_id: ID канала
|
||
"""
|
||
self.cache.invalidate(user_id, channel_id)
|
||
|
||
|
||
# ================= HANDLER ДЛЯ ПРОВЕРКИ ПОДПИСКИ =================
|
||
|
||
async def handle_check_subscription(
|
||
callback: CallbackQuery,
|
||
subscription_middleware: SubscriptionMiddleware
|
||
):
|
||
"""
|
||
Обработчик callback для повторной проверки подписки.
|
||
|
||
Example:
|
||
```python
|
||
from filters.callback import CallbackStartsWith
|
||
from middleware.subscription import handle_check_subscription, subscription_middleware
|
||
|
||
@router.callback_query(CallbackStartsWith("check_subscription"))
|
||
async def check_sub(callback: CallbackQuery):
|
||
await handle_check_subscription(callback, subscription_middleware)
|
||
```
|
||
"""
|
||
user_id = callback.from_user.id
|
||
|
||
# Инвалидируем кэш для пользователя
|
||
subscription_middleware.invalidate_cache(user_id=user_id)
|
||
|
||
await callback.answer("🔄 Проверяю подписку...", show_alert=False)
|
||
|
||
# Перепроверяем подписку
|
||
not_subscribed = []
|
||
|
||
for channel_config in subscription_middleware.channels:
|
||
if not channel_config.required:
|
||
continue
|
||
|
||
is_subscribed = await subscription_middleware._check_subscription(
|
||
user_id,
|
||
channel_config
|
||
)
|
||
|
||
if not is_subscribed:
|
||
not_subscribed.append(channel_config)
|
||
|
||
if not_subscribed:
|
||
# Все еще не подписан
|
||
text, keyboard = await subscription_middleware._build_subscription_message(not_subscribed)
|
||
|
||
await callback.message.edit_text(
|
||
text,
|
||
reply_markup=keyboard.as_markup(),
|
||
parse_mode="HTML"
|
||
)
|
||
|
||
await callback.answer(
|
||
f"❌ Вы еще не подписаны на {len(not_subscribed)} каналов",
|
||
show_alert=True
|
||
)
|
||
else:
|
||
# Подписка подтверждена
|
||
await callback.message.delete()
|
||
await callback.message.answer(
|
||
"✅ <b>Подписка подтверждена!</b>\n\n"
|
||
"Теперь вы можете пользоваться ботом. Используйте /start",
|
||
parse_mode="HTML"
|
||
)
|
||
|
||
logger.info(
|
||
f"Подписка успешно подтверждена",
|
||
log_type='SUBSCRIPTION',
|
||
user=f"@{callback.from_user.username}" if callback.from_user.username else f"id{user_id}"
|
||
)
|