505 lines
16 KiB
Python
505 lines
16 KiB
Python
"""
|
||
Утилиты для упоминаний пользователей (mentions)
|
||
"""
|
||
from typing import Optional, List, Set
|
||
from datetime import datetime, timedelta
|
||
|
||
from aiogram import Bot
|
||
from aiogram.types import Message, ChatMemberAdministrator, ChatMemberOwner, User
|
||
from aiogram.utils.markdown import hide_link, hlink
|
||
from aiogram.exceptions import TelegramBadRequest, TelegramForbiddenError
|
||
|
||
__all__ = (
|
||
'mention_admins',
|
||
'mention_user',
|
||
'mention_users',
|
||
'get_admins_list',
|
||
'AdminCache',
|
||
'admin_cache',
|
||
'mention_moderators',
|
||
'mention_owner',
|
||
'hidden_admins_message'
|
||
)
|
||
|
||
|
||
class AdminCache:
|
||
"""
|
||
Кэш для списков администраторов чатов.
|
||
|
||
Уменьшает количество запросов к API Telegram.
|
||
"""
|
||
|
||
def __init__(self, ttl: int = 300):
|
||
"""
|
||
Args:
|
||
ttl: Время жизни кэша в секундах (по умолчанию 5 минут)
|
||
"""
|
||
self.ttl = ttl
|
||
# {chat_id: (admins_list, timestamp)}
|
||
self._cache: dict[int, tuple[List[User], datetime]] = {}
|
||
# Статистика
|
||
self.hits: int = 0
|
||
self.misses: int = 0
|
||
|
||
def get(self, chat_id: int) -> Optional[List[User]]:
|
||
"""
|
||
Получает список админов из кэша.
|
||
|
||
Args:
|
||
chat_id: ID чата
|
||
|
||
Returns:
|
||
List[User] или None если кэш устарел
|
||
"""
|
||
if chat_id in self._cache:
|
||
admins, timestamp = self._cache[chat_id]
|
||
|
||
# Проверяем актуальность
|
||
if datetime.now() - timestamp < timedelta(seconds=self.ttl):
|
||
self.hits += 1
|
||
return admins
|
||
else:
|
||
# Удаляем устаревшую запись
|
||
del self._cache[chat_id]
|
||
|
||
self.misses += 1
|
||
return None
|
||
|
||
def set(self, chat_id: int, admins: List[User]) -> None:
|
||
"""
|
||
Сохраняет список админов в кэш.
|
||
|
||
Args:
|
||
chat_id: ID чата
|
||
admins: Список администраторов
|
||
"""
|
||
self._cache[chat_id] = (admins, datetime.now())
|
||
|
||
def invalidate(self, chat_id: Optional[int] = None) -> None:
|
||
"""
|
||
Инвалидирует кэш.
|
||
|
||
Args:
|
||
chat_id: ID чата (если None, очищает весь кэш)
|
||
"""
|
||
if chat_id is None:
|
||
self._cache.clear()
|
||
elif chat_id in self._cache:
|
||
del self._cache[chat_id]
|
||
|
||
def cleanup(self) -> int:
|
||
"""
|
||
Удаляет устаревшие записи.
|
||
|
||
Returns:
|
||
int: Количество удаленных записей
|
||
"""
|
||
now = datetime.now()
|
||
expired = [
|
||
chat_id for chat_id, (_, timestamp) in self._cache.items()
|
||
if now - timestamp >= timedelta(seconds=self.ttl)
|
||
]
|
||
|
||
for chat_id in expired:
|
||
del self._cache[chat_id]
|
||
|
||
return len(expired)
|
||
|
||
def get_stats(self) -> dict:
|
||
"""Возвращает статистику кэша"""
|
||
total = self.hits + self.misses
|
||
hit_rate = (self.hits / total * 100) if total > 0 else 0
|
||
|
||
return {
|
||
'hits': self.hits,
|
||
'misses': self.misses,
|
||
'hit_rate': f"{hit_rate:.1f}%",
|
||
'cached_chats': len(self._cache)
|
||
}
|
||
|
||
|
||
# Глобальный кэш
|
||
admin_cache = AdminCache(ttl=300)
|
||
|
||
|
||
async def get_admins_list(
|
||
bot: Bot,
|
||
chat_id: int,
|
||
exclude_bots: bool = True,
|
||
exclude_users: Optional[Set[int]] = None,
|
||
include_owner_only: bool = False,
|
||
use_cache: bool = True
|
||
) -> List[User]:
|
||
"""
|
||
Получает список администраторов чата.
|
||
|
||
Args:
|
||
bot: Экземпляр бота
|
||
chat_id: ID чата
|
||
exclude_bots: Исключить ботов
|
||
exclude_users: Множество ID пользователей для исключения
|
||
include_owner_only: Только владелец чата
|
||
use_cache: Использовать кэш
|
||
|
||
Returns:
|
||
List[User]: Список администраторов
|
||
|
||
Example:
|
||
>> admins = await get_admins_list(bot, chat_id)
|
||
>> print(f"Администраторов: {len(admins)}")
|
||
"""
|
||
# Проверяем кэш
|
||
if use_cache:
|
||
cached_admins = admin_cache.get(chat_id)
|
||
if cached_admins is not None:
|
||
admins = cached_admins.copy()
|
||
else:
|
||
# Загружаем из API
|
||
try:
|
||
chat_admins = await bot.get_chat_administrators(chat_id)
|
||
admins = [admin.user for admin in chat_admins]
|
||
# Сохраняем в кэш
|
||
admin_cache.set(chat_id, admins)
|
||
except (TelegramBadRequest, TelegramForbiddenError):
|
||
return []
|
||
else:
|
||
# Без кэша
|
||
try:
|
||
chat_admins = await bot.get_chat_administrators(chat_id)
|
||
admins = [admin.user for admin in chat_admins]
|
||
except (TelegramBadRequest, TelegramForbiddenError):
|
||
return []
|
||
|
||
# Фильтрация
|
||
filtered_admins = []
|
||
|
||
for admin_user in admins:
|
||
# Исключаем ботов
|
||
if exclude_bots and admin_user.is_bot:
|
||
continue
|
||
|
||
# Исключаем конкретных пользователей
|
||
if exclude_users and admin_user.id in exclude_users:
|
||
continue
|
||
|
||
filtered_admins.append(admin_user)
|
||
|
||
# Только владелец
|
||
if include_owner_only and filtered_admins:
|
||
# Получаем информацию о владельце
|
||
try:
|
||
chat_admins = await bot.get_chat_administrators(chat_id)
|
||
owner = next(
|
||
(admin.user for admin in chat_admins if isinstance(admin, ChatMemberOwner)),
|
||
None
|
||
)
|
||
if owner:
|
||
return [owner]
|
||
except:
|
||
pass
|
||
|
||
return filtered_admins
|
||
|
||
|
||
async def mention_admins(
|
||
bot: Bot,
|
||
chat_id: int,
|
||
text: str = "",
|
||
format_type: str = "hidden",
|
||
exclude_bots: bool = True,
|
||
exclude_users: Optional[Set[int]] = None,
|
||
separator: str = " ",
|
||
use_cache: bool = True
|
||
) -> str:
|
||
"""
|
||
Формирует текст с упоминанием всех администраторов.
|
||
|
||
Args:
|
||
bot: Экземпляр бота
|
||
chat_id: ID чата
|
||
text: Основной текст сообщения
|
||
format_type: Тип форматирования:
|
||
- 'hidden': Скрытые ссылки (невидимые)
|
||
- 'mention': HTML mentions (видимые имена)
|
||
- 'username': @username (только для пользователей с username)
|
||
- 'mixed': Mentions для пользователей с именами, hidden для остальных
|
||
exclude_bots: Исключить ботов
|
||
exclude_users: Множество ID пользователей для исключения
|
||
separator: Разделитель между mentions (для видимых форматов)
|
||
use_cache: Использовать кэш
|
||
|
||
Returns:
|
||
str: Отформатированный текст с упоминаниями
|
||
|
||
Example:
|
||
>> # Скрытые упоминания
|
||
>> text = await mention_admins(bot, chat_id, "Внимание, админы!")
|
||
>> await message.answer(text, parse_mode="HTML")
|
||
|
||
>> # Видимые упоминания
|
||
>> text = await mention_admins(bot, chat_id, "Админы:", format_type="mention")
|
||
>> await message.answer(text, parse_mode="HTML")
|
||
"""
|
||
# Получаем список админов
|
||
admins = await get_admins_list(
|
||
bot=bot,
|
||
chat_id=chat_id,
|
||
exclude_bots=exclude_bots,
|
||
exclude_users=exclude_users,
|
||
use_cache=use_cache
|
||
)
|
||
|
||
if not admins:
|
||
return text
|
||
|
||
# Формируем упоминания в зависимости от типа
|
||
mentions = []
|
||
|
||
if format_type == "hidden":
|
||
# Скрытые ссылки (невидимые)
|
||
for admin in admins:
|
||
mentions.append(hide_link(f"tg://user?id={admin.id}"))
|
||
|
||
# Объединяем все ссылки и добавляем текст
|
||
return "".join(mentions) + text
|
||
|
||
elif format_type == "mention":
|
||
# HTML mentions (видимые имена)
|
||
for admin in admins:
|
||
name = admin.full_name or admin.first_name or f"User {admin.id}"
|
||
mentions.append(hlink(name, f"tg://user?id={admin.id}"))
|
||
|
||
mentions_text = separator.join(mentions)
|
||
return f"{text}\n\n{mentions_text}" if text else mentions_text
|
||
|
||
elif format_type == "username":
|
||
# Только @username
|
||
for admin in admins:
|
||
if admin.username:
|
||
mentions.append(f"@{admin.username}")
|
||
|
||
if not mentions:
|
||
# Fallback на hidden если нет username
|
||
return await mention_admins(
|
||
bot, chat_id, text, format_type="hidden",
|
||
exclude_bots=exclude_bots, exclude_users=exclude_users
|
||
)
|
||
|
||
mentions_text = separator.join(mentions)
|
||
return f"{text}\n\n{mentions_text}" if text else mentions_text
|
||
|
||
elif format_type == "mixed":
|
||
# Mentions для пользователей с именами, hidden для остальных
|
||
hidden_links = []
|
||
visible_mentions = []
|
||
|
||
for admin in admins:
|
||
if admin.username:
|
||
# Видимый mention
|
||
name = admin.full_name or admin.first_name or f"@{admin.username}"
|
||
visible_mentions.append(hlink(name, f"tg://user?id={admin.id}"))
|
||
else:
|
||
# Скрытая ссылка
|
||
hidden_links.append(hide_link(f"tg://user?id={admin.id}"))
|
||
|
||
hidden_part = "".join(hidden_links)
|
||
visible_part = separator.join(visible_mentions)
|
||
|
||
if text:
|
||
if visible_part:
|
||
return f"{hidden_part}{text}\n\n{visible_part}"
|
||
else:
|
||
return f"{hidden_part}{text}"
|
||
else:
|
||
return f"{hidden_part}{visible_part}"
|
||
|
||
# По умолчанию - hidden
|
||
return text
|
||
|
||
|
||
async def mention_user(
|
||
user: User,
|
||
format_type: str = "mention",
|
||
show_username: bool = False
|
||
) -> str:
|
||
"""
|
||
Создает упоминание одного пользователя.
|
||
|
||
Args:
|
||
user: Объект пользователя
|
||
format_type: Тип форматирования ('mention', 'hidden', 'username')
|
||
show_username: Показывать username вместо имени (для mention)
|
||
|
||
Returns:
|
||
str: Отформатированное упоминание
|
||
|
||
Example:
|
||
>> mention = await mention_user(message.from_user)
|
||
>> await message.answer(f"Привет, {mention}!", parse_mode="HTML")
|
||
"""
|
||
if format_type == "hidden":
|
||
return hide_link(f"tg://user?id={user.id}")
|
||
|
||
elif format_type == "username":
|
||
if user.username:
|
||
return f"@{user.username}"
|
||
# Fallback на mention
|
||
return await mention_user(user, format_type="mention")
|
||
|
||
else: # mention
|
||
if show_username and user.username:
|
||
display_name = f"@{user.username}"
|
||
else:
|
||
display_name = user.full_name or user.first_name or f"User {user.id}"
|
||
|
||
return hlink(display_name, f"tg://user?id={user.id}")
|
||
|
||
|
||
async def mention_users(
|
||
users: List[User],
|
||
format_type: str = "mention",
|
||
separator: str = ", ",
|
||
max_count: Optional[int] = None
|
||
) -> str:
|
||
"""
|
||
Создает упоминания списка пользователей.
|
||
|
||
Args:
|
||
users: Список пользователей
|
||
format_type: Тип форматирования
|
||
separator: Разделитель между упоминаниями
|
||
max_count: Максимальное количество упоминаний (остальные как "и еще N")
|
||
|
||
Returns:
|
||
str: Отформатированные упоминания
|
||
|
||
Example:
|
||
>> users = [msg.from_user, ...]
|
||
>> mentions = await mention_users(users, max_count=5)
|
||
>> await message.answer(f"Участники: {mentions}", parse_mode="HTML")
|
||
"""
|
||
if not users:
|
||
return ""
|
||
|
||
# Ограничиваем количество
|
||
display_users = users[:max_count] if max_count else users
|
||
remaining = len(users) - len(display_users) if max_count else 0
|
||
|
||
# Создаем упоминания
|
||
mentions = []
|
||
for user in display_users:
|
||
mention = await mention_user(user, format_type=format_type)
|
||
mentions.append(mention)
|
||
|
||
result = separator.join(mentions)
|
||
|
||
# Добавляем "и еще N"
|
||
if remaining > 0:
|
||
result += f" и еще {remaining}"
|
||
|
||
return result
|
||
|
||
|
||
# ================= СПЕЦИАЛИЗИРОВАННЫЕ ФУНКЦИИ =================
|
||
|
||
async def mention_moderators(
|
||
bot: Bot,
|
||
chat_id: int,
|
||
text: str = "",
|
||
format_type: str = "hidden"
|
||
) -> str:
|
||
"""
|
||
Упоминает только модераторов (администраторов с правами на удаление/бан).
|
||
|
||
Args:
|
||
bot: Экземпляр бота
|
||
chat_id: ID чата
|
||
text: Текст сообщения
|
||
format_type: Тип форматирования
|
||
|
||
Returns:
|
||
str: Текст с упоминаниями модераторов
|
||
"""
|
||
try:
|
||
chat_admins = await bot.get_chat_administrators(chat_id)
|
||
|
||
# Фильтруем только модераторов
|
||
moderators = []
|
||
for admin in chat_admins:
|
||
if admin.user.is_bot:
|
||
continue
|
||
|
||
# Владелец всегда модератор
|
||
if isinstance(admin, ChatMemberOwner):
|
||
moderators.append(admin.user)
|
||
continue
|
||
|
||
# Проверяем права администратора
|
||
if isinstance(admin, ChatMemberAdministrator):
|
||
if admin.can_delete_messages and admin.can_restrict_members:
|
||
moderators.append(admin.user)
|
||
|
||
# Формируем упоминания
|
||
if format_type == "hidden":
|
||
mentions = "".join(hide_link(f"tg://user?id={mod.id}") for mod in moderators)
|
||
return f"{mentions}{text}"
|
||
else:
|
||
mentions = []
|
||
for mod in moderators:
|
||
name = mod.full_name or mod.first_name or f"Moderator {mod.id}"
|
||
mentions.append(hlink(name, f"tg://user?id={mod.id}"))
|
||
|
||
mentions_text = ", ".join(mentions)
|
||
return f"{text}\n\n{mentions_text}" if text else mentions_text
|
||
|
||
except (TelegramBadRequest, TelegramForbiddenError):
|
||
return text
|
||
|
||
|
||
async def mention_owner(
|
||
bot: Bot,
|
||
chat_id: int,
|
||
format_type: str = "mention"
|
||
) -> Optional[str]:
|
||
"""
|
||
Получает упоминание владельца чата.
|
||
|
||
Args:
|
||
bot: Экземпляр бота
|
||
chat_id: ID чата
|
||
format_type: Тип форматирования
|
||
|
||
Returns:
|
||
Optional[str]: Упоминание владельца или None
|
||
"""
|
||
try:
|
||
chat_admins = await bot.get_chat_administrators(chat_id)
|
||
owner = next(
|
||
(admin.user for admin in chat_admins if isinstance(admin, ChatMemberOwner)),
|
||
None
|
||
)
|
||
|
||
if owner:
|
||
return await mention_user(owner, format_type=format_type)
|
||
|
||
except (TelegramBadRequest, TelegramForbiddenError):
|
||
pass
|
||
|
||
return None
|
||
|
||
|
||
# Алиас для обратной совместимости
|
||
async def hidden_admins_message(message: Message, text: str = "") -> str:
|
||
"""
|
||
Алиас для mention_admins с format_type="hidden".
|
||
|
||
DEPRECATED: Используйте mention_admins() вместо этого.
|
||
"""
|
||
from bot import bot
|
||
return await mention_admins(
|
||
bot=bot,
|
||
chat_id=message.chat.id,
|
||
text=text,
|
||
format_type="hidden"
|
||
)
|