diff --git a/bot/utils/hidden_username.py b/bot/utils/hidden_username.py new file mode 100644 index 0000000..8d2472c --- /dev/null +++ b/bot/utils/hidden_username.py @@ -0,0 +1,504 @@ +""" +Утилиты для упоминаний пользователей (mentions) +""" +from typing import Optional, List, Set +from datetime import datetime, timedelta + +from aiogram import Bot +from aiogram.types import Message, ChatMemberAdministrator, ChatMemberOwner, User +from aiogram.utils.markdown import hide_link, hlink +from aiogram.exceptions import TelegramBadRequest, TelegramForbiddenError + +__all__ = ( + 'mention_admins', + 'mention_user', + 'mention_users', + 'get_admins_list', + 'AdminCache', + 'admin_cache', + 'mention_moderators', + 'mention_owner', + 'hidden_admins_message' +) + + +class AdminCache: + """ + Кэш для списков администраторов чатов. + + Уменьшает количество запросов к API Telegram. + """ + + def __init__(self, ttl: int = 300): + """ + Args: + ttl: Время жизни кэша в секундах (по умолчанию 5 минут) + """ + self.ttl = ttl + # {chat_id: (admins_list, timestamp)} + self._cache: dict[int, tuple[List[User], datetime]] = {} + # Статистика + self.hits: int = 0 + self.misses: int = 0 + + def get(self, chat_id: int) -> Optional[List[User]]: + """ + Получает список админов из кэша. + + Args: + chat_id: ID чата + + Returns: + List[User] или None если кэш устарел + """ + if chat_id in self._cache: + admins, timestamp = self._cache[chat_id] + + # Проверяем актуальность + if datetime.now() - timestamp < timedelta(seconds=self.ttl): + self.hits += 1 + return admins + else: + # Удаляем устаревшую запись + del self._cache[chat_id] + + self.misses += 1 + return None + + def set(self, chat_id: int, admins: List[User]) -> None: + """ + Сохраняет список админов в кэш. + + Args: + chat_id: ID чата + admins: Список администраторов + """ + self._cache[chat_id] = (admins, datetime.now()) + + def invalidate(self, chat_id: Optional[int] = None) -> None: + """ + Инвалидирует кэш. + + Args: + chat_id: ID чата (если None, очищает весь кэш) + """ + if chat_id is None: + self._cache.clear() + elif chat_id in self._cache: + del self._cache[chat_id] + + def cleanup(self) -> int: + """ + Удаляет устаревшие записи. + + Returns: + int: Количество удаленных записей + """ + now = datetime.now() + expired = [ + chat_id for chat_id, (_, timestamp) in self._cache.items() + if now - timestamp >= timedelta(seconds=self.ttl) + ] + + for chat_id in expired: + del self._cache[chat_id] + + return len(expired) + + def get_stats(self) -> dict: + """Возвращает статистику кэша""" + total = self.hits + self.misses + hit_rate = (self.hits / total * 100) if total > 0 else 0 + + return { + 'hits': self.hits, + 'misses': self.misses, + 'hit_rate': f"{hit_rate:.1f}%", + 'cached_chats': len(self._cache) + } + + +# Глобальный кэш +admin_cache = AdminCache(ttl=300) + + +async def get_admins_list( + bot: Bot, + chat_id: int, + exclude_bots: bool = True, + exclude_users: Optional[Set[int]] = None, + include_owner_only: bool = False, + use_cache: bool = True +) -> List[User]: + """ + Получает список администраторов чата. + + Args: + bot: Экземпляр бота + chat_id: ID чата + exclude_bots: Исключить ботов + exclude_users: Множество ID пользователей для исключения + include_owner_only: Только владелец чата + use_cache: Использовать кэш + + Returns: + List[User]: Список администраторов + + Example: + >> admins = await get_admins_list(bot, chat_id) + >> print(f"Администраторов: {len(admins)}") + """ + # Проверяем кэш + if use_cache: + cached_admins = admin_cache.get(chat_id) + if cached_admins is not None: + admins = cached_admins.copy() + else: + # Загружаем из API + try: + chat_admins = await bot.get_chat_administrators(chat_id) + admins = [admin.user for admin in chat_admins] + # Сохраняем в кэш + admin_cache.set(chat_id, admins) + except (TelegramBadRequest, TelegramForbiddenError): + return [] + else: + # Без кэша + try: + chat_admins = await bot.get_chat_administrators(chat_id) + admins = [admin.user for admin in chat_admins] + except (TelegramBadRequest, TelegramForbiddenError): + return [] + + # Фильтрация + filtered_admins = [] + + for admin_user in admins: + # Исключаем ботов + if exclude_bots and admin_user.is_bot: + continue + + # Исключаем конкретных пользователей + if exclude_users and admin_user.id in exclude_users: + continue + + filtered_admins.append(admin_user) + + # Только владелец + if include_owner_only and filtered_admins: + # Получаем информацию о владельце + try: + chat_admins = await bot.get_chat_administrators(chat_id) + owner = next( + (admin.user for admin in chat_admins if isinstance(admin, ChatMemberOwner)), + None + ) + if owner: + return [owner] + except: + pass + + return filtered_admins + + +async def mention_admins( + bot: Bot, + chat_id: int, + text: str = "", + format_type: str = "hidden", + exclude_bots: bool = True, + exclude_users: Optional[Set[int]] = None, + separator: str = " ", + use_cache: bool = True +) -> str: + """ + Формирует текст с упоминанием всех администраторов. + + Args: + bot: Экземпляр бота + chat_id: ID чата + text: Основной текст сообщения + format_type: Тип форматирования: + - 'hidden': Скрытые ссылки (невидимые) + - 'mention': HTML mentions (видимые имена) + - 'username': @username (только для пользователей с username) + - 'mixed': Mentions для пользователей с именами, hidden для остальных + exclude_bots: Исключить ботов + exclude_users: Множество ID пользователей для исключения + separator: Разделитель между mentions (для видимых форматов) + use_cache: Использовать кэш + + Returns: + str: Отформатированный текст с упоминаниями + + Example: + >> # Скрытые упоминания + >> text = await mention_admins(bot, chat_id, "Внимание, админы!") + >> await message.answer(text, parse_mode="HTML") + + >> # Видимые упоминания + >> text = await mention_admins(bot, chat_id, "Админы:", format_type="mention") + >> await message.answer(text, parse_mode="HTML") + """ + # Получаем список админов + admins = await get_admins_list( + bot=bot, + chat_id=chat_id, + exclude_bots=exclude_bots, + exclude_users=exclude_users, + use_cache=use_cache + ) + + if not admins: + return text + + # Формируем упоминания в зависимости от типа + mentions = [] + + if format_type == "hidden": + # Скрытые ссылки (невидимые) + for admin in admins: + mentions.append(hide_link(f"tg://user?id={admin.id}")) + + # Объединяем все ссылки и добавляем текст + return "".join(mentions) + text + + elif format_type == "mention": + # HTML mentions (видимые имена) + for admin in admins: + name = admin.full_name or admin.first_name or f"User {admin.id}" + mentions.append(hlink(name, f"tg://user?id={admin.id}")) + + mentions_text = separator.join(mentions) + return f"{text}\n\n{mentions_text}" if text else mentions_text + + elif format_type == "username": + # Только @username + for admin in admins: + if admin.username: + mentions.append(f"@{admin.username}") + + if not mentions: + # Fallback на hidden если нет username + return await mention_admins( + bot, chat_id, text, format_type="hidden", + exclude_bots=exclude_bots, exclude_users=exclude_users + ) + + mentions_text = separator.join(mentions) + return f"{text}\n\n{mentions_text}" if text else mentions_text + + elif format_type == "mixed": + # Mentions для пользователей с именами, hidden для остальных + hidden_links = [] + visible_mentions = [] + + for admin in admins: + if admin.username: + # Видимый mention + name = admin.full_name or admin.first_name or f"@{admin.username}" + visible_mentions.append(hlink(name, f"tg://user?id={admin.id}")) + else: + # Скрытая ссылка + hidden_links.append(hide_link(f"tg://user?id={admin.id}")) + + hidden_part = "".join(hidden_links) + visible_part = separator.join(visible_mentions) + + if text: + if visible_part: + return f"{hidden_part}{text}\n\n{visible_part}" + else: + return f"{hidden_part}{text}" + else: + return f"{hidden_part}{visible_part}" + + # По умолчанию - hidden + return text + + +async def mention_user( + user: User, + format_type: str = "mention", + show_username: bool = False +) -> str: + """ + Создает упоминание одного пользователя. + + Args: + user: Объект пользователя + format_type: Тип форматирования ('mention', 'hidden', 'username') + show_username: Показывать username вместо имени (для mention) + + Returns: + str: Отформатированное упоминание + + Example: + >> mention = await mention_user(message.from_user) + >> await message.answer(f"Привет, {mention}!", parse_mode="HTML") + """ + if format_type == "hidden": + return hide_link(f"tg://user?id={user.id}") + + elif format_type == "username": + if user.username: + return f"@{user.username}" + # Fallback на mention + return await mention_user(user, format_type="mention") + + else: # mention + if show_username and user.username: + display_name = f"@{user.username}" + else: + display_name = user.full_name or user.first_name or f"User {user.id}" + + return hlink(display_name, f"tg://user?id={user.id}") + + +async def mention_users( + users: List[User], + format_type: str = "mention", + separator: str = ", ", + max_count: Optional[int] = None +) -> str: + """ + Создает упоминания списка пользователей. + + Args: + users: Список пользователей + format_type: Тип форматирования + separator: Разделитель между упоминаниями + max_count: Максимальное количество упоминаний (остальные как "и еще N") + + Returns: + str: Отформатированные упоминания + + Example: + >> users = [msg.from_user, ...] + >> mentions = await mention_users(users, max_count=5) + >> await message.answer(f"Участники: {mentions}", parse_mode="HTML") + """ + if not users: + return "" + + # Ограничиваем количество + display_users = users[:max_count] if max_count else users + remaining = len(users) - len(display_users) if max_count else 0 + + # Создаем упоминания + mentions = [] + for user in display_users: + mention = await mention_user(user, format_type=format_type) + mentions.append(mention) + + result = separator.join(mentions) + + # Добавляем "и еще N" + if remaining > 0: + result += f" и еще {remaining}" + + return result + + +# ================= СПЕЦИАЛИЗИРОВАННЫЕ ФУНКЦИИ ================= + +async def mention_moderators( + bot: Bot, + chat_id: int, + text: str = "", + format_type: str = "hidden" +) -> str: + """ + Упоминает только модераторов (администраторов с правами на удаление/бан). + + Args: + bot: Экземпляр бота + chat_id: ID чата + text: Текст сообщения + format_type: Тип форматирования + + Returns: + str: Текст с упоминаниями модераторов + """ + try: + chat_admins = await bot.get_chat_administrators(chat_id) + + # Фильтруем только модераторов + moderators = [] + for admin in chat_admins: + if admin.user.is_bot: + continue + + # Владелец всегда модератор + if isinstance(admin, ChatMemberOwner): + moderators.append(admin.user) + continue + + # Проверяем права администратора + if isinstance(admin, ChatMemberAdministrator): + if admin.can_delete_messages and admin.can_restrict_members: + moderators.append(admin.user) + + # Формируем упоминания + if format_type == "hidden": + mentions = "".join(hide_link(f"tg://user?id={mod.id}") for mod in moderators) + return f"{mentions}{text}" + else: + mentions = [] + for mod in moderators: + name = mod.full_name or mod.first_name or f"Moderator {mod.id}" + mentions.append(hlink(name, f"tg://user?id={mod.id}")) + + mentions_text = ", ".join(mentions) + return f"{text}\n\n{mentions_text}" if text else mentions_text + + except (TelegramBadRequest, TelegramForbiddenError): + return text + + +async def mention_owner( + bot: Bot, + chat_id: int, + format_type: str = "mention" +) -> Optional[str]: + """ + Получает упоминание владельца чата. + + Args: + bot: Экземпляр бота + chat_id: ID чата + format_type: Тип форматирования + + Returns: + Optional[str]: Упоминание владельца или None + """ + try: + chat_admins = await bot.get_chat_administrators(chat_id) + owner = next( + (admin.user for admin in chat_admins if isinstance(admin, ChatMemberOwner)), + None + ) + + if owner: + return await mention_user(owner, format_type=format_type) + + except (TelegramBadRequest, TelegramForbiddenError): + pass + + return None + + +# Алиас для обратной совместимости +async def hidden_admins_message(message: Message, text: str = "") -> str: + """ + Алиас для mention_admins с format_type="hidden". + + DEPRECATED: Используйте mention_admins() вместо этого. + """ + from bot import bot + return await mention_admins( + bot=bot, + chat_id=message.chat.id, + text=text, + format_type="hidden" + )