545 lines
18 KiB
Python
545 lines
18 KiB
Python
"""
|
||
Middleware для обработки реферальных ссылок и deep links
|
||
"""
|
||
from typing import Callable, Awaitable, Any, Dict, Optional
|
||
from dataclasses import dataclass, field
|
||
from datetime import datetime
|
||
from collections import defaultdict
|
||
import re
|
||
|
||
from aiogram import BaseMiddleware
|
||
from aiogram.filters.command import CommandObject
|
||
from aiogram.types import TelegramObject, Message, User
|
||
|
||
from middleware.loggers import logger
|
||
|
||
__all__ = (
|
||
'ReferralMiddleware',
|
||
'DeepLinkData',
|
||
'referral_stats',
|
||
'ReferralType'
|
||
)
|
||
|
||
|
||
class ReferralType:
|
||
"""Типы реферальных ссылок"""
|
||
REFERRAL = 'ref' # Обычная реферальная ссылка
|
||
PROMO = 'promo' # Промокод
|
||
UTM = 'utm' # UTM метки
|
||
INVITE = 'invite' # Инвайт-ссылка
|
||
DEEPLINK = 'deeplink' # Произвольный deep link
|
||
CUSTOM = 'custom' # Кастомный тип
|
||
|
||
|
||
@dataclass
|
||
class DeepLinkData:
|
||
"""
|
||
Данные deep link.
|
||
|
||
Attributes:
|
||
raw: Исходная строка (все после /start)
|
||
type: Тип ссылки (ref, promo, utm, и т.д.)
|
||
params: Распарсенные параметры
|
||
user_id: ID пользователя, перешедшего по ссылке
|
||
username: Username пользователя
|
||
timestamp: Время перехода
|
||
is_valid: Валидна ли ссылка
|
||
"""
|
||
raw: str
|
||
type: str = ReferralType.DEEPLINK
|
||
params: Dict[str, Any] = field(default_factory=dict)
|
||
user_id: Optional[int] = None
|
||
username: Optional[str] = None
|
||
timestamp: datetime = field(default_factory=datetime.now)
|
||
is_valid: bool = True
|
||
|
||
def get(self, key: str, default: Any = None) -> Any:
|
||
"""Получает параметр по ключу"""
|
||
return self.params.get(key, default)
|
||
|
||
def __getitem__(self, key: str) -> Any:
|
||
"""Позволяет использовать data['key']"""
|
||
return self.params[key]
|
||
|
||
def __contains__(self, key: str) -> bool:
|
||
"""Позволяет использовать 'key' in data"""
|
||
return key in self.params
|
||
|
||
|
||
class ReferralStatistics:
|
||
"""
|
||
Статистика реферальных переходов.
|
||
"""
|
||
|
||
def __init__(self):
|
||
# Счетчики переходов по типам: {type: count}
|
||
self.clicks_by_type: Dict[str, int] = defaultdict(int)
|
||
|
||
# Переходы по кодам: {ref_code: count}
|
||
self.clicks_by_code: Dict[str, int] = defaultdict(int)
|
||
|
||
# История переходов: [(timestamp, user_id, ref_code, type), ...]
|
||
self.history: list[tuple[datetime, int, str, str]] = []
|
||
|
||
# Уникальные пользователи: {ref_code: set(user_ids)}
|
||
self.unique_users: Dict[str, set[int]] = defaultdict(set)
|
||
|
||
def record(self, deep_link: DeepLinkData) -> None:
|
||
"""Записывает переход"""
|
||
# Счетчик по типу
|
||
self.clicks_by_type[deep_link.type] += 1
|
||
|
||
# Счетчик по коду (если есть реферальный код)
|
||
ref_code = deep_link.get('ref_code') or deep_link.get('code') or deep_link.raw
|
||
if ref_code:
|
||
self.clicks_by_code[ref_code] += 1
|
||
|
||
# Уникальные пользователи
|
||
if deep_link.user_id:
|
||
self.unique_users[ref_code].add(deep_link.user_id)
|
||
|
||
# История
|
||
if deep_link.user_id:
|
||
self.history.append((
|
||
deep_link.timestamp,
|
||
deep_link.user_id,
|
||
ref_code,
|
||
deep_link.type
|
||
))
|
||
|
||
def get_stats(self, ref_code: Optional[str] = None) -> Dict[str, Any]:
|
||
"""
|
||
Возвращает статистику.
|
||
|
||
Args:
|
||
ref_code: Код для фильтрации (если None, возвращает общую статистику)
|
||
"""
|
||
if ref_code:
|
||
return {
|
||
'ref_code': ref_code,
|
||
'total_clicks': self.clicks_by_code.get(ref_code, 0),
|
||
'unique_users': len(self.unique_users.get(ref_code, set()))
|
||
}
|
||
|
||
return {
|
||
'total_clicks': sum(self.clicks_by_type.values()),
|
||
'clicks_by_type': dict(self.clicks_by_type),
|
||
'top_codes': self.get_top_codes(10),
|
||
'total_unique_users': sum(len(users) for users in self.unique_users.values())
|
||
}
|
||
|
||
def get_top_codes(self, limit: int = 10) -> list[tuple[str, int]]:
|
||
"""Возвращает топ реферальных кодов"""
|
||
sorted_codes = sorted(
|
||
self.clicks_by_code.items(),
|
||
key=lambda x: x[1],
|
||
reverse=True
|
||
)
|
||
return sorted_codes[:limit]
|
||
|
||
|
||
# Глобальная статистика
|
||
referral_stats = ReferralStatistics()
|
||
|
||
|
||
class ReferralMiddleware(BaseMiddleware):
|
||
"""
|
||
Middleware для обработки реферальных ссылок и deep links.
|
||
|
||
Возможности:
|
||
- Парсинг различных форматов deep links
|
||
- Автоматическое определение типа ссылки
|
||
- Валидация параметров
|
||
- Сбор статистики
|
||
- Интеграция с базой данных через callback
|
||
- Поддержка сложных параметров (ref_123_promo_abc)
|
||
|
||
Поддерживаемые форматы:
|
||
- /start ref123 → {'ref_code': 'ref123'}
|
||
- /start promo_SUMMER2024 → {'type': 'promo', 'code': 'SUMMER2024'}
|
||
- /start ref_123_bonus_50 → {'ref_code': '123', 'bonus': '50'}
|
||
- /start utm_source_telegram → {'utm_source': 'telegram'}
|
||
|
||
Attributes:
|
||
on_referral: Callback функция для сохранения в БД
|
||
validator: Функция валидации кодов
|
||
parse_complex: Парсить ли сложные параметры
|
||
collect_stats: Собирать ли статистику
|
||
|
||
Example:
|
||
```python
|
||
from middleware.referral import ReferralMiddleware, DeepLinkData
|
||
|
||
async def save_referral(deep_link: DeepLinkData):
|
||
# Сохранение в БД
|
||
await db.save_referral(
|
||
user_id=deep_link.user_id,
|
||
ref_code=deep_link.get('ref_code'),
|
||
timestamp=deep_link.timestamp
|
||
)
|
||
|
||
# Регистрация middleware
|
||
referral_mdw = ReferralMiddleware(
|
||
on_referral=save_referral,
|
||
parse_complex=True,
|
||
collect_stats=True
|
||
)
|
||
|
||
dp.message.middleware(referral_mdw)
|
||
|
||
# В хендлере
|
||
@router.message(CommandStart())
|
||
async def start(message: Message, deep_link: Optional[DeepLinkData] = None):
|
||
if deep_link:
|
||
ref_code = deep_link.get('ref_code')
|
||
await message.answer(f"Привет! Вы пришли по ссылке: {ref_code}")
|
||
else:
|
||
await message.answer("Привет!")
|
||
```
|
||
"""
|
||
|
||
# Паттерны для парсинга
|
||
PATTERNS = {
|
||
# ref_123 или ref123
|
||
ReferralType.REFERRAL: re.compile(r'^ref[_-]?(\w+)$', re.IGNORECASE),
|
||
|
||
# promo_SUMMER2024
|
||
ReferralType.PROMO: re.compile(r'^promo[_-]?(\w+)$', re.IGNORECASE),
|
||
|
||
# invite_abc123
|
||
ReferralType.INVITE: re.compile(r'^invite[_-]?(\w+)$', re.IGNORECASE),
|
||
|
||
# utm_source_telegram_campaign_ads
|
||
ReferralType.UTM: re.compile(r'^utm[_-]', re.IGNORECASE),
|
||
}
|
||
|
||
def __init__(
|
||
self,
|
||
on_referral: Optional[Callable[[DeepLinkData], Awaitable[None]]] = None,
|
||
validator: Optional[Callable[[str], bool]] = None,
|
||
parse_complex: bool = True,
|
||
collect_stats: bool = True,
|
||
max_length: int = 64
|
||
):
|
||
"""
|
||
Инициализация middleware.
|
||
|
||
Args:
|
||
on_referral: Callback для обработки реферала (сохранение в БД)
|
||
validator: Функция валидации кода (должна вернуть True если валиден)
|
||
parse_complex: Парсить ли сложные параметры (ref_123_bonus_50)
|
||
collect_stats: Собирать ли статистику
|
||
max_length: Максимальная длина deep link
|
||
"""
|
||
super().__init__()
|
||
self.on_referral = on_referral
|
||
self.validator = validator
|
||
self.parse_complex = parse_complex
|
||
self.collect_stats = collect_stats
|
||
self.max_length = max_length
|
||
|
||
def _parse_simple(self, args: str) -> tuple[str, Dict[str, Any]]:
|
||
"""
|
||
Парсит простые форматы deep links.
|
||
|
||
Args:
|
||
args: Аргументы команды /start
|
||
|
||
Returns:
|
||
tuple: (тип, параметры)
|
||
"""
|
||
# Проверка по паттернам
|
||
for link_type, pattern in self.PATTERNS.items():
|
||
match = pattern.match(args)
|
||
if match:
|
||
if link_type == ReferralType.REFERRAL:
|
||
return link_type, {'ref_code': match.group(1)}
|
||
elif link_type == ReferralType.PROMO:
|
||
return link_type, {'code': match.group(1), 'promo_code': match.group(1)}
|
||
elif link_type == ReferralType.INVITE:
|
||
return link_type, {'invite_code': match.group(1)}
|
||
elif link_type == ReferralType.UTM:
|
||
# Парсим UTM параметры
|
||
return link_type, self._parse_utm(args)
|
||
|
||
# Если не совпало ни с одним паттерном - просто код
|
||
return ReferralType.DEEPLINK, {'code': args}
|
||
|
||
def _parse_utm(self, args: str) -> Dict[str, Any]:
|
||
"""
|
||
Парсит UTM параметры: utm_source_telegram_campaign_ads
|
||
|
||
Args:
|
||
args: Строка с UTM параметрами
|
||
|
||
Returns:
|
||
Dict с UTM параметрами
|
||
"""
|
||
params = {}
|
||
|
||
# Удаляем префикс utm_
|
||
if args.lower().startswith('utm_'):
|
||
args = args[4:]
|
||
|
||
# Разбиваем по _ и парсим пары ключ-значение
|
||
parts = args.split('_')
|
||
|
||
i = 0
|
||
while i < len(parts) - 1:
|
||
key = f"utm_{parts[i]}"
|
||
value = parts[i + 1]
|
||
params[key] = value
|
||
i += 2
|
||
|
||
return params
|
||
|
||
def _parse_complex(self, args: str) -> tuple[str, Dict[str, Any]]:
|
||
"""
|
||
Парсит сложные форматы: ref_123_bonus_50_promo_SUMMER
|
||
|
||
Args:
|
||
args: Аргументы команды
|
||
|
||
Returns:
|
||
tuple: (тип, параметры)
|
||
"""
|
||
params = {}
|
||
parts = args.split('_')
|
||
|
||
# Определяем тип по первому элементу
|
||
link_type = ReferralType.DEEPLINK
|
||
|
||
if parts[0].lower() in ['ref', 'referral']:
|
||
link_type = ReferralType.REFERRAL
|
||
if len(parts) > 1:
|
||
params['ref_code'] = parts[1]
|
||
parts = parts[2:] # Пропускаем первые 2 элемента
|
||
elif parts[0].lower() == 'promo':
|
||
link_type = ReferralType.PROMO
|
||
if len(parts) > 1:
|
||
params['promo_code'] = parts[1]
|
||
parts = parts[2:]
|
||
elif parts[0].lower() == 'invite':
|
||
link_type = ReferralType.INVITE
|
||
if len(parts) > 1:
|
||
params['invite_code'] = parts[1]
|
||
parts = parts[2:]
|
||
|
||
# Парсим остальные параметры как пары ключ-значение
|
||
i = 0
|
||
while i < len(parts) - 1:
|
||
key = parts[i]
|
||
value = parts[i + 1]
|
||
|
||
# Пытаемся преобразовать в число
|
||
try:
|
||
value = int(value)
|
||
except ValueError:
|
||
try:
|
||
value = float(value)
|
||
except ValueError:
|
||
pass # Оставляем строкой
|
||
|
||
params[key] = value
|
||
i += 2
|
||
|
||
return link_type, params
|
||
|
||
def _validate_deep_link(self, args: str) -> bool:
|
||
"""
|
||
Валидирует deep link.
|
||
|
||
Args:
|
||
args: Строка для валидации
|
||
|
||
Returns:
|
||
bool: True если валиден
|
||
"""
|
||
# Проверка длины
|
||
if len(args) > self.max_length:
|
||
logger.warning(
|
||
f"Deep link слишком длинный: {len(args)} > {self.max_length}",
|
||
log_type='REFERRAL'
|
||
)
|
||
return False
|
||
|
||
# Проверка на запрещенные символы (только буквы, цифры, _ и -)
|
||
if not re.match(r'^[a-zA-Z0-9_-]+$', args):
|
||
logger.warning(
|
||
f"Deep link содержит недопустимые символы: {args}",
|
||
log_type='REFERRAL'
|
||
)
|
||
return False
|
||
|
||
# Кастомная валидация
|
||
if self.validator:
|
||
return self.validator(args)
|
||
|
||
return True
|
||
|
||
def _parse_deep_link(self, args: str, user: User) -> DeepLinkData:
|
||
"""
|
||
Парсит deep link и создает объект DeepLinkData.
|
||
|
||
Args:
|
||
args: Аргументы команды /start
|
||
user: Пользователь, перешедший по ссылке
|
||
|
||
Returns:
|
||
DeepLinkData: Распарсенные данные
|
||
"""
|
||
# Валидация
|
||
is_valid = self._validate_deep_link(args)
|
||
|
||
# Парсинг
|
||
if self.parse_complex and '_' in args:
|
||
link_type, params = self._parse_complex(args)
|
||
else:
|
||
link_type, params = self._parse_simple(args)
|
||
|
||
# Создаем объект
|
||
deep_link = DeepLinkData(
|
||
raw=args,
|
||
type=link_type,
|
||
params=params,
|
||
user_id=user.id,
|
||
username=user.username,
|
||
is_valid=is_valid
|
||
)
|
||
|
||
return deep_link
|
||
|
||
async def __call__(
|
||
self,
|
||
handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]],
|
||
event: TelegramObject,
|
||
data: Dict[str, Any]
|
||
) -> Any:
|
||
"""
|
||
Перехватывает команды /start с аргументами.
|
||
|
||
Args:
|
||
handler: Функция хендлера
|
||
event: Объект события
|
||
data: Дополнительные данные
|
||
|
||
Returns:
|
||
Результат хендлера
|
||
"""
|
||
# Обрабатываем только сообщения
|
||
if not isinstance(event, Message):
|
||
return await handler(event, data)
|
||
|
||
# Извлекаем команду
|
||
command: Optional[CommandObject] = data.get('command')
|
||
|
||
# Проверяем, что это /start с аргументами
|
||
if not command or command.command.lower() != 'start' or not command.args:
|
||
return await handler(event, data)
|
||
|
||
user = event.from_user
|
||
args = command.args
|
||
|
||
# Парсим deep link
|
||
deep_link = self._parse_deep_link(args, user)
|
||
|
||
# Логирование
|
||
if deep_link.is_valid:
|
||
logger.info(
|
||
f"Deep link: type={deep_link.type}, params={deep_link.params}",
|
||
log_type='REFERRAL',
|
||
user=f"@{user.username}" if user.username else f"id{user.id}"
|
||
)
|
||
else:
|
||
logger.warning(
|
||
f"Невалидный deep link: {args}",
|
||
log_type='REFERRAL',
|
||
user=f"@{user.username}" if user.username else f"id{user.id}"
|
||
)
|
||
|
||
# Собираем статистику
|
||
if self.collect_stats and deep_link.is_valid:
|
||
referral_stats.record(deep_link)
|
||
|
||
# Вызываем callback для сохранения в БД
|
||
if self.on_referral and deep_link.is_valid:
|
||
try:
|
||
await self.on_referral(deep_link)
|
||
except Exception as e:
|
||
logger.error(
|
||
f"Ошибка в on_referral callback: {e}",
|
||
log_type='REFERRAL'
|
||
)
|
||
|
||
# Добавляем deep_link в data для хендлера
|
||
data['deep_link'] = deep_link
|
||
data['ref_code'] = deep_link.get('ref_code') # Для обратной совместимости
|
||
|
||
# Выполняем хендлер
|
||
return await handler(event, data)
|
||
|
||
|
||
# ================= ВСПОМОГАТЕЛЬНЫЕ ФУНКЦИИ =================
|
||
|
||
def create_deep_link(bot_username: str, **params) -> str:
|
||
"""
|
||
Создает deep link для бота.
|
||
|
||
Args:
|
||
bot_username: Username бота (без @)
|
||
**params: Параметры для ссылки
|
||
|
||
Returns:
|
||
str: Готовая ссылка
|
||
|
||
Example:
|
||
>>> create_deep_link('mybot', ref_code='123', bonus='50')
|
||
'https://t.me/mybot?start=ref_123_bonus_50'
|
||
"""
|
||
# Формируем строку параметров
|
||
parts = []
|
||
|
||
for key, value in params.items():
|
||
parts.append(str(key))
|
||
parts.append(str(value))
|
||
|
||
param_string = '_'.join(parts)
|
||
|
||
return f"https://t.me/{bot_username}?start={param_string}"
|
||
|
||
|
||
def create_referral_link(bot_username: str, ref_code: str) -> str:
|
||
"""
|
||
Создает простую реферальную ссылку.
|
||
|
||
Args:
|
||
bot_username: Username бота
|
||
ref_code: Реферальный код
|
||
|
||
Returns:
|
||
str: Реферальная ссылка
|
||
|
||
Example:
|
||
>>> create_referral_link('mybot', '123')
|
||
'https://t.me/mybot?start=ref_123'
|
||
"""
|
||
return f"https://t.me/{bot_username}?start=ref_{ref_code}"
|
||
|
||
|
||
def create_promo_link(bot_username: str, promo_code: str) -> str:
|
||
"""
|
||
Создает ссылку с промокодом.
|
||
|
||
Args:
|
||
bot_username: Username бота
|
||
promo_code: Промокод
|
||
|
||
Returns:
|
||
str: Ссылка с промокодом
|
||
|
||
Example:
|
||
>>> create_promo_link('mybot', 'SUMMER2024')
|
||
'https://t.me/mybot?start=promo_SUMMER2024'
|
||
"""
|
||
return f"https://t.me/{bot_username}?start=promo_{promo_code}"
|