Утилита скрытия юзернеймов и упоминаний
This commit is contained in:
504
bot/utils/hidden_username.py
Normal file
504
bot/utils/hidden_username.py
Normal file
@@ -0,0 +1,504 @@
|
||||
"""
|
||||
Утилиты для упоминаний пользователей (mentions)
|
||||
"""
|
||||
from typing import Optional, List, Set
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from aiogram import Bot
|
||||
from aiogram.types import Message, ChatMemberAdministrator, ChatMemberOwner, User
|
||||
from aiogram.utils.markdown import hide_link, hlink
|
||||
from aiogram.exceptions import TelegramBadRequest, TelegramForbiddenError
|
||||
|
||||
__all__ = (
|
||||
'mention_admins',
|
||||
'mention_user',
|
||||
'mention_users',
|
||||
'get_admins_list',
|
||||
'AdminCache',
|
||||
'admin_cache',
|
||||
'mention_moderators',
|
||||
'mention_owner',
|
||||
'hidden_admins_message'
|
||||
)
|
||||
|
||||
|
||||
class AdminCache:
|
||||
"""
|
||||
Кэш для списков администраторов чатов.
|
||||
|
||||
Уменьшает количество запросов к API Telegram.
|
||||
"""
|
||||
|
||||
def __init__(self, ttl: int = 300):
|
||||
"""
|
||||
Args:
|
||||
ttl: Время жизни кэша в секундах (по умолчанию 5 минут)
|
||||
"""
|
||||
self.ttl = ttl
|
||||
# {chat_id: (admins_list, timestamp)}
|
||||
self._cache: dict[int, tuple[List[User], datetime]] = {}
|
||||
# Статистика
|
||||
self.hits: int = 0
|
||||
self.misses: int = 0
|
||||
|
||||
def get(self, chat_id: int) -> Optional[List[User]]:
|
||||
"""
|
||||
Получает список админов из кэша.
|
||||
|
||||
Args:
|
||||
chat_id: ID чата
|
||||
|
||||
Returns:
|
||||
List[User] или None если кэш устарел
|
||||
"""
|
||||
if chat_id in self._cache:
|
||||
admins, timestamp = self._cache[chat_id]
|
||||
|
||||
# Проверяем актуальность
|
||||
if datetime.now() - timestamp < timedelta(seconds=self.ttl):
|
||||
self.hits += 1
|
||||
return admins
|
||||
else:
|
||||
# Удаляем устаревшую запись
|
||||
del self._cache[chat_id]
|
||||
|
||||
self.misses += 1
|
||||
return None
|
||||
|
||||
def set(self, chat_id: int, admins: List[User]) -> None:
|
||||
"""
|
||||
Сохраняет список админов в кэш.
|
||||
|
||||
Args:
|
||||
chat_id: ID чата
|
||||
admins: Список администраторов
|
||||
"""
|
||||
self._cache[chat_id] = (admins, datetime.now())
|
||||
|
||||
def invalidate(self, chat_id: Optional[int] = None) -> None:
|
||||
"""
|
||||
Инвалидирует кэш.
|
||||
|
||||
Args:
|
||||
chat_id: ID чата (если None, очищает весь кэш)
|
||||
"""
|
||||
if chat_id is None:
|
||||
self._cache.clear()
|
||||
elif chat_id in self._cache:
|
||||
del self._cache[chat_id]
|
||||
|
||||
def cleanup(self) -> int:
|
||||
"""
|
||||
Удаляет устаревшие записи.
|
||||
|
||||
Returns:
|
||||
int: Количество удаленных записей
|
||||
"""
|
||||
now = datetime.now()
|
||||
expired = [
|
||||
chat_id for chat_id, (_, timestamp) in self._cache.items()
|
||||
if now - timestamp >= timedelta(seconds=self.ttl)
|
||||
]
|
||||
|
||||
for chat_id in expired:
|
||||
del self._cache[chat_id]
|
||||
|
||||
return len(expired)
|
||||
|
||||
def get_stats(self) -> dict:
|
||||
"""Возвращает статистику кэша"""
|
||||
total = self.hits + self.misses
|
||||
hit_rate = (self.hits / total * 100) if total > 0 else 0
|
||||
|
||||
return {
|
||||
'hits': self.hits,
|
||||
'misses': self.misses,
|
||||
'hit_rate': f"{hit_rate:.1f}%",
|
||||
'cached_chats': len(self._cache)
|
||||
}
|
||||
|
||||
|
||||
# Глобальный кэш
|
||||
admin_cache = AdminCache(ttl=300)
|
||||
|
||||
|
||||
async def get_admins_list(
|
||||
bot: Bot,
|
||||
chat_id: int,
|
||||
exclude_bots: bool = True,
|
||||
exclude_users: Optional[Set[int]] = None,
|
||||
include_owner_only: bool = False,
|
||||
use_cache: bool = True
|
||||
) -> List[User]:
|
||||
"""
|
||||
Получает список администраторов чата.
|
||||
|
||||
Args:
|
||||
bot: Экземпляр бота
|
||||
chat_id: ID чата
|
||||
exclude_bots: Исключить ботов
|
||||
exclude_users: Множество ID пользователей для исключения
|
||||
include_owner_only: Только владелец чата
|
||||
use_cache: Использовать кэш
|
||||
|
||||
Returns:
|
||||
List[User]: Список администраторов
|
||||
|
||||
Example:
|
||||
>> admins = await get_admins_list(bot, chat_id)
|
||||
>> print(f"Администраторов: {len(admins)}")
|
||||
"""
|
||||
# Проверяем кэш
|
||||
if use_cache:
|
||||
cached_admins = admin_cache.get(chat_id)
|
||||
if cached_admins is not None:
|
||||
admins = cached_admins.copy()
|
||||
else:
|
||||
# Загружаем из API
|
||||
try:
|
||||
chat_admins = await bot.get_chat_administrators(chat_id)
|
||||
admins = [admin.user for admin in chat_admins]
|
||||
# Сохраняем в кэш
|
||||
admin_cache.set(chat_id, admins)
|
||||
except (TelegramBadRequest, TelegramForbiddenError):
|
||||
return []
|
||||
else:
|
||||
# Без кэша
|
||||
try:
|
||||
chat_admins = await bot.get_chat_administrators(chat_id)
|
||||
admins = [admin.user for admin in chat_admins]
|
||||
except (TelegramBadRequest, TelegramForbiddenError):
|
||||
return []
|
||||
|
||||
# Фильтрация
|
||||
filtered_admins = []
|
||||
|
||||
for admin_user in admins:
|
||||
# Исключаем ботов
|
||||
if exclude_bots and admin_user.is_bot:
|
||||
continue
|
||||
|
||||
# Исключаем конкретных пользователей
|
||||
if exclude_users and admin_user.id in exclude_users:
|
||||
continue
|
||||
|
||||
filtered_admins.append(admin_user)
|
||||
|
||||
# Только владелец
|
||||
if include_owner_only and filtered_admins:
|
||||
# Получаем информацию о владельце
|
||||
try:
|
||||
chat_admins = await bot.get_chat_administrators(chat_id)
|
||||
owner = next(
|
||||
(admin.user for admin in chat_admins if isinstance(admin, ChatMemberOwner)),
|
||||
None
|
||||
)
|
||||
if owner:
|
||||
return [owner]
|
||||
except:
|
||||
pass
|
||||
|
||||
return filtered_admins
|
||||
|
||||
|
||||
async def mention_admins(
|
||||
bot: Bot,
|
||||
chat_id: int,
|
||||
text: str = "",
|
||||
format_type: str = "hidden",
|
||||
exclude_bots: bool = True,
|
||||
exclude_users: Optional[Set[int]] = None,
|
||||
separator: str = " ",
|
||||
use_cache: bool = True
|
||||
) -> str:
|
||||
"""
|
||||
Формирует текст с упоминанием всех администраторов.
|
||||
|
||||
Args:
|
||||
bot: Экземпляр бота
|
||||
chat_id: ID чата
|
||||
text: Основной текст сообщения
|
||||
format_type: Тип форматирования:
|
||||
- 'hidden': Скрытые ссылки (невидимые)
|
||||
- 'mention': HTML mentions (видимые имена)
|
||||
- 'username': @username (только для пользователей с username)
|
||||
- 'mixed': Mentions для пользователей с именами, hidden для остальных
|
||||
exclude_bots: Исключить ботов
|
||||
exclude_users: Множество ID пользователей для исключения
|
||||
separator: Разделитель между mentions (для видимых форматов)
|
||||
use_cache: Использовать кэш
|
||||
|
||||
Returns:
|
||||
str: Отформатированный текст с упоминаниями
|
||||
|
||||
Example:
|
||||
>> # Скрытые упоминания
|
||||
>> text = await mention_admins(bot, chat_id, "Внимание, админы!")
|
||||
>> await message.answer(text, parse_mode="HTML")
|
||||
|
||||
>> # Видимые упоминания
|
||||
>> text = await mention_admins(bot, chat_id, "Админы:", format_type="mention")
|
||||
>> await message.answer(text, parse_mode="HTML")
|
||||
"""
|
||||
# Получаем список админов
|
||||
admins = await get_admins_list(
|
||||
bot=bot,
|
||||
chat_id=chat_id,
|
||||
exclude_bots=exclude_bots,
|
||||
exclude_users=exclude_users,
|
||||
use_cache=use_cache
|
||||
)
|
||||
|
||||
if not admins:
|
||||
return text
|
||||
|
||||
# Формируем упоминания в зависимости от типа
|
||||
mentions = []
|
||||
|
||||
if format_type == "hidden":
|
||||
# Скрытые ссылки (невидимые)
|
||||
for admin in admins:
|
||||
mentions.append(hide_link(f"tg://user?id={admin.id}"))
|
||||
|
||||
# Объединяем все ссылки и добавляем текст
|
||||
return "".join(mentions) + text
|
||||
|
||||
elif format_type == "mention":
|
||||
# HTML mentions (видимые имена)
|
||||
for admin in admins:
|
||||
name = admin.full_name or admin.first_name or f"User {admin.id}"
|
||||
mentions.append(hlink(name, f"tg://user?id={admin.id}"))
|
||||
|
||||
mentions_text = separator.join(mentions)
|
||||
return f"{text}\n\n{mentions_text}" if text else mentions_text
|
||||
|
||||
elif format_type == "username":
|
||||
# Только @username
|
||||
for admin in admins:
|
||||
if admin.username:
|
||||
mentions.append(f"@{admin.username}")
|
||||
|
||||
if not mentions:
|
||||
# Fallback на hidden если нет username
|
||||
return await mention_admins(
|
||||
bot, chat_id, text, format_type="hidden",
|
||||
exclude_bots=exclude_bots, exclude_users=exclude_users
|
||||
)
|
||||
|
||||
mentions_text = separator.join(mentions)
|
||||
return f"{text}\n\n{mentions_text}" if text else mentions_text
|
||||
|
||||
elif format_type == "mixed":
|
||||
# Mentions для пользователей с именами, hidden для остальных
|
||||
hidden_links = []
|
||||
visible_mentions = []
|
||||
|
||||
for admin in admins:
|
||||
if admin.username:
|
||||
# Видимый mention
|
||||
name = admin.full_name or admin.first_name or f"@{admin.username}"
|
||||
visible_mentions.append(hlink(name, f"tg://user?id={admin.id}"))
|
||||
else:
|
||||
# Скрытая ссылка
|
||||
hidden_links.append(hide_link(f"tg://user?id={admin.id}"))
|
||||
|
||||
hidden_part = "".join(hidden_links)
|
||||
visible_part = separator.join(visible_mentions)
|
||||
|
||||
if text:
|
||||
if visible_part:
|
||||
return f"{hidden_part}{text}\n\n{visible_part}"
|
||||
else:
|
||||
return f"{hidden_part}{text}"
|
||||
else:
|
||||
return f"{hidden_part}{visible_part}"
|
||||
|
||||
# По умолчанию - hidden
|
||||
return text
|
||||
|
||||
|
||||
async def mention_user(
|
||||
user: User,
|
||||
format_type: str = "mention",
|
||||
show_username: bool = False
|
||||
) -> str:
|
||||
"""
|
||||
Создает упоминание одного пользователя.
|
||||
|
||||
Args:
|
||||
user: Объект пользователя
|
||||
format_type: Тип форматирования ('mention', 'hidden', 'username')
|
||||
show_username: Показывать username вместо имени (для mention)
|
||||
|
||||
Returns:
|
||||
str: Отформатированное упоминание
|
||||
|
||||
Example:
|
||||
>> mention = await mention_user(message.from_user)
|
||||
>> await message.answer(f"Привет, {mention}!", parse_mode="HTML")
|
||||
"""
|
||||
if format_type == "hidden":
|
||||
return hide_link(f"tg://user?id={user.id}")
|
||||
|
||||
elif format_type == "username":
|
||||
if user.username:
|
||||
return f"@{user.username}"
|
||||
# Fallback на mention
|
||||
return await mention_user(user, format_type="mention")
|
||||
|
||||
else: # mention
|
||||
if show_username and user.username:
|
||||
display_name = f"@{user.username}"
|
||||
else:
|
||||
display_name = user.full_name or user.first_name or f"User {user.id}"
|
||||
|
||||
return hlink(display_name, f"tg://user?id={user.id}")
|
||||
|
||||
|
||||
async def mention_users(
|
||||
users: List[User],
|
||||
format_type: str = "mention",
|
||||
separator: str = ", ",
|
||||
max_count: Optional[int] = None
|
||||
) -> str:
|
||||
"""
|
||||
Создает упоминания списка пользователей.
|
||||
|
||||
Args:
|
||||
users: Список пользователей
|
||||
format_type: Тип форматирования
|
||||
separator: Разделитель между упоминаниями
|
||||
max_count: Максимальное количество упоминаний (остальные как "и еще N")
|
||||
|
||||
Returns:
|
||||
str: Отформатированные упоминания
|
||||
|
||||
Example:
|
||||
>> users = [msg.from_user, ...]
|
||||
>> mentions = await mention_users(users, max_count=5)
|
||||
>> await message.answer(f"Участники: {mentions}", parse_mode="HTML")
|
||||
"""
|
||||
if not users:
|
||||
return ""
|
||||
|
||||
# Ограничиваем количество
|
||||
display_users = users[:max_count] if max_count else users
|
||||
remaining = len(users) - len(display_users) if max_count else 0
|
||||
|
||||
# Создаем упоминания
|
||||
mentions = []
|
||||
for user in display_users:
|
||||
mention = await mention_user(user, format_type=format_type)
|
||||
mentions.append(mention)
|
||||
|
||||
result = separator.join(mentions)
|
||||
|
||||
# Добавляем "и еще N"
|
||||
if remaining > 0:
|
||||
result += f" и еще {remaining}"
|
||||
|
||||
return result
|
||||
|
||||
|
||||
# ================= СПЕЦИАЛИЗИРОВАННЫЕ ФУНКЦИИ =================
|
||||
|
||||
async def mention_moderators(
|
||||
bot: Bot,
|
||||
chat_id: int,
|
||||
text: str = "",
|
||||
format_type: str = "hidden"
|
||||
) -> str:
|
||||
"""
|
||||
Упоминает только модераторов (администраторов с правами на удаление/бан).
|
||||
|
||||
Args:
|
||||
bot: Экземпляр бота
|
||||
chat_id: ID чата
|
||||
text: Текст сообщения
|
||||
format_type: Тип форматирования
|
||||
|
||||
Returns:
|
||||
str: Текст с упоминаниями модераторов
|
||||
"""
|
||||
try:
|
||||
chat_admins = await bot.get_chat_administrators(chat_id)
|
||||
|
||||
# Фильтруем только модераторов
|
||||
moderators = []
|
||||
for admin in chat_admins:
|
||||
if admin.user.is_bot:
|
||||
continue
|
||||
|
||||
# Владелец всегда модератор
|
||||
if isinstance(admin, ChatMemberOwner):
|
||||
moderators.append(admin.user)
|
||||
continue
|
||||
|
||||
# Проверяем права администратора
|
||||
if isinstance(admin, ChatMemberAdministrator):
|
||||
if admin.can_delete_messages and admin.can_restrict_members:
|
||||
moderators.append(admin.user)
|
||||
|
||||
# Формируем упоминания
|
||||
if format_type == "hidden":
|
||||
mentions = "".join(hide_link(f"tg://user?id={mod.id}") for mod in moderators)
|
||||
return f"{mentions}{text}"
|
||||
else:
|
||||
mentions = []
|
||||
for mod in moderators:
|
||||
name = mod.full_name or mod.first_name or f"Moderator {mod.id}"
|
||||
mentions.append(hlink(name, f"tg://user?id={mod.id}"))
|
||||
|
||||
mentions_text = ", ".join(mentions)
|
||||
return f"{text}\n\n{mentions_text}" if text else mentions_text
|
||||
|
||||
except (TelegramBadRequest, TelegramForbiddenError):
|
||||
return text
|
||||
|
||||
|
||||
async def mention_owner(
|
||||
bot: Bot,
|
||||
chat_id: int,
|
||||
format_type: str = "mention"
|
||||
) -> Optional[str]:
|
||||
"""
|
||||
Получает упоминание владельца чата.
|
||||
|
||||
Args:
|
||||
bot: Экземпляр бота
|
||||
chat_id: ID чата
|
||||
format_type: Тип форматирования
|
||||
|
||||
Returns:
|
||||
Optional[str]: Упоминание владельца или None
|
||||
"""
|
||||
try:
|
||||
chat_admins = await bot.get_chat_administrators(chat_id)
|
||||
owner = next(
|
||||
(admin.user for admin in chat_admins if isinstance(admin, ChatMemberOwner)),
|
||||
None
|
||||
)
|
||||
|
||||
if owner:
|
||||
return await mention_user(owner, format_type=format_type)
|
||||
|
||||
except (TelegramBadRequest, TelegramForbiddenError):
|
||||
pass
|
||||
|
||||
return None
|
||||
|
||||
|
||||
# Алиас для обратной совместимости
|
||||
async def hidden_admins_message(message: Message, text: str = "") -> str:
|
||||
"""
|
||||
Алиас для mention_admins с format_type="hidden".
|
||||
|
||||
DEPRECATED: Используйте mention_admins() вместо этого.
|
||||
"""
|
||||
from bot import bot
|
||||
return await mention_admins(
|
||||
bot=bot,
|
||||
chat_id=message.chat.id,
|
||||
text=text,
|
||||
format_type="hidden"
|
||||
)
|
||||
Reference in New Issue
Block a user