Мидлвеер на проверку подписки на каналы

This commit is contained in:
2026-02-23 14:25:51 +07:00
parent 7567b84fa0
commit b8a0a08132

553
bot/middlewares/sub_mdw.py Normal file
View File

@@ -0,0 +1,553 @@
"""
Middleware для проверки подписки пользователей на каналы
"""
from time import time
from typing import Callable, Awaitable, Any, Dict, Optional, Union
from dataclasses import dataclass
from aiogram import BaseMiddleware, Bot
from aiogram.exceptions import TelegramBadRequest, TelegramForbiddenError
from aiogram.types import TelegramObject, Message, CallbackQuery, InlineKeyboardButton, Chat
from aiogram.utils.keyboard import InlineKeyboardBuilder
from aiogram.enums import ChatMemberStatus
from middleware.loggers import logger
from configs import settings
__all__ = ('SubscriptionMiddleware', 'ChannelConfig')
@dataclass
class ChannelConfig:
"""
Конфигурация канала для проверки подписки.
Attributes:
id: ID или username канала
name: Название канала (для отображения)
invite_link: Пригласительная ссылка
required: Обязательная ли подписка
"""
id: Union[str, int]
name: Optional[str] = None
invite_link: Optional[str] = None
required: bool = True
class SubscriptionCache:
"""
Кэш для проверок подписки.
Уменьшает количество запросов к Telegram API.
"""
def __init__(self, ttl: float = 300.0):
"""
Args:
ttl: Время жизни кэша в секундах (по умолчанию 5 минут)
"""
self.ttl = ttl
# Структура: {(user_id, channel_id): (is_subscribed, timestamp)}
self._cache: Dict[tuple[int, Union[str, int]], tuple[bool, float]] = {}
def get(self, user_id: int, channel_id: Union[str, int]) -> Optional[bool]:
"""
Получает значение из кэша.
Args:
user_id: ID пользователя
channel_id: ID канала
Returns:
bool или None: True/False если в кэше и актуально, иначе None
"""
key = (user_id, channel_id)
if key in self._cache:
is_subscribed, timestamp = self._cache[key]
# Проверяем актуальность
if time() - timestamp < self.ttl:
return is_subscribed
else:
# Удаляем устаревшую запись
del self._cache[key]
return None
def set(self, user_id: int, channel_id: Union[str, int], is_subscribed: bool) -> None:
"""
Сохраняет значение в кэш.
Args:
user_id: ID пользователя
channel_id: ID канала
is_subscribed: Статус подписки
"""
key = (user_id, channel_id)
self._cache[key] = (is_subscribed, time())
def invalidate(self, user_id: Optional[int] = None, channel_id: Optional[Union[str, int]] = None) -> None:
"""
Инвалидирует кэш.
Args:
user_id: ID пользователя (если None, инвалидирует все)
channel_id: ID канала (если None, инвалидирует все для пользователя)
"""
if user_id is None and channel_id is None:
# Полная очистка
self._cache.clear()
elif user_id is not None and channel_id is None:
# Удаляем все записи пользователя
keys_to_delete = [key for key in self._cache if key[0] == user_id]
for key in keys_to_delete:
del self._cache[key]
elif user_id is not None and channel_id is not None:
# Удаляем конкретную запись
key = (user_id, channel_id)
if key in self._cache:
del self._cache[key]
def cleanup(self) -> int:
"""
Удаляет устаревшие записи.
Returns:
int: Количество удаленных записей
"""
current_time = time()
keys_to_delete = [
key for key, (_, timestamp) in self._cache.items()
if current_time - timestamp >= self.ttl
]
for key in keys_to_delete:
del self._cache[key]
return len(keys_to_delete)
class SubscriptionMiddleware(BaseMiddleware):
"""
Middleware для проверки подписки пользователя на каналы.
Возможности:
- Проверка подписки на один или несколько каналов
- Кэширование результатов проверки
- Whitelist для администраторов
- Автоматическое получение ссылок на каналы
- Гибкая настройка обязательных/необязательных каналов
- Красивое сообщение с кнопками подписки
Attributes:
bot: Экземпляр бота
channels: Список конфигураций каналов
cache_ttl: Время жизни кэша в секундах
whitelist_admins: Пропускать ли администраторов бота
show_buttons: Показывать ли кнопки для подписки
Example:
```python
from middleware.subscription import SubscriptionMiddleware, ChannelConfig
channels = [
ChannelConfig(
id="@my_channel",
name="Основной канал",
invite_link="https://t.me/my_channel"
),
ChannelConfig(
id=-1001234567890,
name="Закрытый канал",
required=True
)
]
dp.message.middleware(SubscriptionMiddleware(bot, channels))
dp.callback_query.middleware(SubscriptionMiddleware(bot, channels))
```
"""
def __init__(
self,
bot: Bot,
channels: list[Union[ChannelConfig, str, int]],
cache_ttl: float = 300.0,
whitelist_admins: bool = True,
show_buttons: bool = True,
auto_fetch_links: bool = True
):
"""
Инициализация middleware.
Args:
bot: Экземпляр бота
channels: Список каналов (ChannelConfig, ID или username)
cache_ttl: Время жизни кэша в секундах
whitelist_admins: Пропускать администраторов бота
show_buttons: Показывать кнопки подписки
auto_fetch_links: Автоматически получать ссылки на каналы
"""
super().__init__()
self.bot = bot
self.cache = SubscriptionCache(ttl=cache_ttl)
self.whitelist_admins = whitelist_admins
self.show_buttons = show_buttons
self.auto_fetch_links = auto_fetch_links
# Преобразуем channels в ChannelConfig
self.channels: list[ChannelConfig] = []
for channel in channels:
if isinstance(channel, ChannelConfig):
self.channels.append(channel)
else:
# Простой ID/username -> ChannelConfig
self.channels.append(ChannelConfig(id=channel))
# Кэш информации о каналах
self._channel_info_cache: Dict[Union[str, int], Optional[Chat]] = {}
async def _get_channel_info(self, channel_id: Union[str, int]) -> Optional[Chat]:
"""
Получает информацию о канале.
Args:
channel_id: ID или username канала
Returns:
Chat или None: Информация о канале
"""
if channel_id in self._channel_info_cache:
return self._channel_info_cache[channel_id]
try:
chat = await self.bot.get_chat(channel_id)
self._channel_info_cache[channel_id] = chat
return chat
except (TelegramBadRequest, TelegramForbiddenError) as e:
logger.error(
f"Не удалось получить информацию о канале {channel_id}: {e}",
log_type='SUBSCRIPTION'
)
self._channel_info_cache[channel_id] = None
return None
async def _check_subscription(
self,
user_id: int,
channel_config: ChannelConfig
) -> bool:
"""
Проверяет подписку пользователя на канал.
Args:
user_id: ID пользователя
channel_config: Конфигурация канала
Returns:
bool: True если подписан
"""
channel_id = channel_config.id
# Проверяем кэш
cached = self.cache.get(user_id, channel_id)
if cached is not None:
logger.debug(
f"Использован кэш для проверки подписки на {channel_id}: {cached}",
log_type='SUBSCRIPTION'
)
return cached
# Выполняем проверку
try:
member = await self.bot.get_chat_member(
chat_id=channel_id,
user_id=user_id
)
is_subscribed = member.status in (
ChatMemberStatus.MEMBER,
ChatMemberStatus.ADMINISTRATOR,
ChatMemberStatus.CREATOR
)
# Сохраняем в кэш
self.cache.set(user_id, channel_id, is_subscribed)
logger.debug(
f"Проверка подписки user={user_id} на канал={channel_id}: "
f"{member.status.value} ({'' if is_subscribed else ''})",
log_type='SUBSCRIPTION'
)
return is_subscribed
except TelegramBadRequest as e:
logger.warning(
f"Канал {channel_id} недоступен или неверный: {e}",
log_type='SUBSCRIPTION'
)
# В случае ошибки считаем что не подписан
self.cache.set(user_id, channel_id, False)
return False
except TelegramForbiddenError as e:
logger.error(
f"Бот не имеет доступа к каналу {channel_id}: {e}",
log_type='SUBSCRIPTION'
)
self.cache.set(user_id, channel_id, False)
return False
async def _build_subscription_message(
self,
not_subscribed: list[ChannelConfig]
) -> tuple[str, InlineKeyboardBuilder]:
"""
Создает сообщение и клавиатуру для подписки.
Args:
not_subscribed: Список каналов без подписки
Returns:
tuple: (текст_сообщения, клавиатура)
"""
# Текст сообщения
text = "📢 <b>Для использования бота необходимо подписаться на каналы:</b>\n\n"
# Клавиатура
keyboard = InlineKeyboardBuilder()
for i, channel_config in enumerate(not_subscribed, 1):
# Получаем информацию о канале
channel_info = await self._get_channel_info(channel_config.id)
# Определяем название канала
if channel_config.name:
channel_name = channel_config.name
elif channel_info:
channel_name = channel_info.title
else:
channel_name = f"Канал {i}"
# Добавляем в текст
text += f"{i}. {channel_name}\n"
# Определяем ссылку
invite_link = channel_config.invite_link
if not invite_link and self.auto_fetch_links and channel_info:
# Пытаемся получить ссылку
if channel_info.username:
invite_link = f"https://t.me/{channel_info.username}"
elif channel_info.invite_link:
invite_link = channel_info.invite_link
# Добавляем кнопку если есть ссылка
if invite_link and self.show_buttons:
keyboard.row(
InlineKeyboardButton(
text=f"📌 {channel_name}",
url=invite_link
)
)
text += "\n✅ После подписки нажмите кнопку ниже для проверки."
# Кнопка проверки подписки
keyboard.row(
InlineKeyboardButton(
text="✅ Я подписался",
callback_data="check_subscription"
)
)
return text, keyboard
async def __call__(
self,
handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]],
event: TelegramObject,
data: Dict[str, Any]
) -> Optional[Any]:
"""
Проверяет подписку перед выполнением хендлера.
Args:
handler: Функция хендлера
event: Объект события
data: Дополнительные данные
Returns:
Результат хендлера или None если не подписан
"""
# Пропускаем не-сообщения и не-callback
if not isinstance(event, (Message, CallbackQuery)):
return await handler(event, data)
# Извлекаем user_id
user_id = event.from_user.id if event.from_user else None
if user_id is None:
return await handler(event, data)
user_str = f"@{event.from_user.username}" if event.from_user.username else f"id{user_id}"
# Whitelist для администраторов
if self.whitelist_admins and user_id in settings.super_admin_ids:
logger.debug(
f"Администратор {user_str} пропущен без проверки подписки",
log_type='SUBSCRIPTION'
)
return await handler(event, data)
# Проверяем подписку на все каналы
not_subscribed: list[ChannelConfig] = []
for channel_config in self.channels:
# Пропускаем необязательные каналы
if not channel_config.required:
continue
is_subscribed = await self._check_subscription(user_id, channel_config)
if not is_subscribed:
not_subscribed.append(channel_config)
# Если есть каналы без подписки
if not_subscribed:
logger.info(
f"Пользователь не подписан на {len(not_subscribed)} каналов",
log_type='SUBSCRIPTION',
user=user_str
)
# Создаем сообщение
text, keyboard = await self._build_subscription_message(not_subscribed)
# Отправляем сообщение
if isinstance(event, Message):
await event.answer(
text,
reply_markup=keyboard.as_markup(),
parse_mode="HTML"
)
elif isinstance(event, CallbackQuery):
# Для callback отправляем в чат или редактируем
if event.message:
try:
await event.message.edit_text(
text,
reply_markup=keyboard.as_markup(),
parse_mode="HTML"
)
except:
await event.message.answer(
text,
reply_markup=keyboard.as_markup(),
parse_mode="HTML"
)
await event.answer(
"⚠️ Требуется подписка на каналы",
show_alert=True
)
return None
# Все подписки в порядке
logger.debug(
f"Проверка подписки пройдена",
log_type='SUBSCRIPTION',
user=user_str
)
return await handler(event, data)
def invalidate_cache(
self,
user_id: Optional[int] = None,
channel_id: Optional[Union[str, int]] = None
) -> None:
"""
Публичный метод для инвалидации кэша.
Используется при обработке callback "check_subscription".
Args:
user_id: ID пользователя
channel_id: ID канала
"""
self.cache.invalidate(user_id, channel_id)
# ================= HANDLER ДЛЯ ПРОВЕРКИ ПОДПИСКИ =================
async def handle_check_subscription(
callback: CallbackQuery,
subscription_middleware: SubscriptionMiddleware
):
"""
Обработчик callback для повторной проверки подписки.
Example:
```python
from filters.callback import CallbackStartsWith
from middleware.subscription import handle_check_subscription, subscription_middleware
@router.callback_query(CallbackStartsWith("check_subscription"))
async def check_sub(callback: CallbackQuery):
await handle_check_subscription(callback, subscription_middleware)
```
"""
user_id = callback.from_user.id
# Инвалидируем кэш для пользователя
subscription_middleware.invalidate_cache(user_id=user_id)
await callback.answer("🔄 Проверяю подписку...", show_alert=False)
# Перепроверяем подписку
not_subscribed = []
for channel_config in subscription_middleware.channels:
if not channel_config.required:
continue
is_subscribed = await subscription_middleware._check_subscription(
user_id,
channel_config
)
if not is_subscribed:
not_subscribed.append(channel_config)
if not_subscribed:
# Все еще не подписан
text, keyboard = await subscription_middleware._build_subscription_message(not_subscribed)
await callback.message.edit_text(
text,
reply_markup=keyboard.as_markup(),
parse_mode="HTML"
)
await callback.answer(
f"❌ Вы еще не подписаны на {len(not_subscribed)} каналов",
show_alert=True
)
else:
# Подписка подтверждена
await callback.message.delete()
await callback.message.answer(
"✅ <b>Подписка подтверждена!</b>\n\n"
"Теперь вы можете пользоваться ботом. Используйте /start",
parse_mode="HTML"
)
logger.info(
f"Подписка успешно подтверждена",
log_type='SUBSCRIPTION',
user=f"@{callback.from_user.username}" if callback.from_user.username else f"id{user_id}"
)