Первый коммит

This commit is contained in:
2026-02-17 11:24:55 +07:00
commit a06448ca4b
109 changed files with 21165 additions and 0 deletions

38
bot/utils/__init__.py Normal file
View File

@@ -0,0 +1,38 @@
"""
Утилиты бота PrimoGuardBot
Модули:
- usernames: Работа с пользователями (username, mentions, display names)
- type_message: Типы контента и чатов
- hidden_username: Упоминания администраторов
- format_time: Форматирование времени и дат
- argument: Парсинг команд и аргументов
- state_utils: Работа с FSM состояниями
- auto_delete: Автоматическое удаление сообщений
- decorators: Декораторы для хендлеров
"""
# ================= USER INFO =================
from .usernames import *
# ================= CONTENT TYPES =================
from .type_message import *
# ================= MENTIONS =================
from .hidden_username import *
# ================= TIME FORMATTING =================
from .format_time import *
# ================= COMMANDS =================
from .argument import *
# ================= STATE UTILS =================
from .state_utils import *
# ================= AUTO DELETE =================
from .auto_delete import *
# ================= DECORATORS =================
from .decorators import *

688
bot/utils/argument.py Normal file
View File

@@ -0,0 +1,688 @@
"""
Утилиты для работы с командами бота
"""
from typing import Optional, Union, Dict, List, Tuple, Set
from dataclasses import dataclass, field
import re
from aiogram.types import Message
from configs import settings
__all__ = (
'is_command',
'find_argument',
'get_command',
'parse_arguments',
'parse_flags',
'CommandParser',
'ParsedCommand',
'parse_command',
'validate_command',
'get_command_usage',
'extract_mentions',
'extract_user_ids',
'extract_hashtags'
)
@dataclass
class ParsedCommand:
"""
Распарсенная команда.
Attributes:
command: Название команды
prefix: Префикс команды
args: Список аргументов
raw_args: Исходная строка аргументов
flags: Словарь флагов (--flag value)
bot_username: Username бота (если было упоминание)
is_group_command: Команда в группе с упоминанием бота
"""
command: str
prefix: str
args: List[str] = field(default_factory=list)
raw_args: Optional[str] = None
flags: Dict[str, Union[str, bool]] = field(default_factory=dict)
bot_username: Optional[str] = None
is_group_command: bool = False
@property
def has_args(self) -> bool:
"""Есть ли аргументы"""
return len(self.args) > 0
@property
def has_flags(self) -> bool:
"""Есть ли флаги"""
return len(self.flags) > 0
def get_arg(self, index: int, default: Optional[str] = None) -> Optional[str]:
"""Получает аргумент по индексу"""
return self.args[index] if index < len(self.args) else default
def get_flag(self, name: str, default: Optional[Union[str, bool]] = None) -> Union[str, bool, None]:
"""Получает значение флага"""
return self.flags.get(name, default)
def __repr__(self) -> str:
return (
f"ParsedCommand(command='{self.command}', "
f"args={self.args}, flags={self.flags})"
)
class CommandParser:
"""
Парсер команд бота.
Возможности:
- Поддержка нескольких префиксов
- Парсинг аргументов
- Парсинг флагов (--flag value, -f value)
- Поддержка упоминаний бота (@botname)
- Парсинг quoted аргументов ("arg with spaces")
- Валидация команд
Example:
```python
parser = CommandParser()
# Парсинг команды
parsed = parser.parse("/ban @user 7d --reason спам")
print(parsed.command) # "ban"
print(parsed.args) # ["@user", "7d"]
print(parsed.flags) # {"reason": "спам"}
```
"""
def __init__(
self,
prefixes: Optional[List[str]] = None,
bot_username: Optional[str] = None
):
"""
Args:
prefixes: Список префиксов (по умолчанию из settings)
bot_username: Username бота для проверки упоминаний
"""
self.prefixes = prefixes or settings.PREFIX
self.bot_username = bot_username
def is_command(self, text: Optional[str]) -> bool:
"""
Проверяет, является ли текст командой.
Args:
text: Текст для проверки
Returns:
bool: True если это команда
Example:
>> parser.is_command("/start")
True
>> parser.is_command("hello")
False
"""
if not text:
return False
text = text.strip()
# Проверяем все префиксы
return any(text.startswith(prefix) for prefix in self.prefixes)
def get_command(
self,
text: Optional[str],
strip_mention: bool = True
) -> Optional[str]:
"""
Извлекает название команды из текста.
Args:
text: Текст сообщения
strip_mention: Убирать упоминание бота (@botname)
Returns:
Optional[str]: Название команды или None
Example:
>> parser.get_command("/start@mybot arg")
'start'
>> parser.get_command("!help")
'help'
"""
if not self.is_command(text):
return None
text = text.strip()
# Находим префикс
prefix = next(p for p in self.prefixes if text.startswith(p))
# Убираем префикс
without_prefix = text[len(prefix):]
# Берем первое слово
command = without_prefix.split()[0] if without_prefix else ""
# Убираем упоминание бота если есть
if strip_mention and '@' in command:
command = command.split('@')[0]
return command.lower() if command else None
def find_argument(self, text: Optional[str]) -> Optional[str]:
"""
Извлекает аргументы команды (все после команды).
Args:
text: Текст сообщения
Returns:
Optional[str]: Аргументы или None
Example:
>> parser.find_argument("/start referrer")
'referrer'
>> parser.find_argument("/ban @user reason text")
'@user reason text'
"""
if not self.is_command(text):
return None
parts = text.strip().split(maxsplit=1)
return parts[1] if len(parts) > 1 else None
@staticmethod
def parse_arguments(
args_text: Optional[str],
preserve_quotes: bool = False
) -> List[str]:
"""
Парсит аргументы, поддерживает кавычки.
Args:
args_text: Строка аргументов
preserve_quotes: Сохранять кавычки в результате
Returns:
List[str]: Список аргументов
Example:
>> parser.parse_arguments('user 7d "ban reason here"')
['user', '7d', 'ban reason here']
"""
if not args_text:
return []
# Regex для парсинга с кавычками
# Поддерживает: "arg with spaces" 'arg' arg
pattern = r'''(?:[^\s"']+|"[^"]*"|'[^']*')+'''
matches = re.findall(pattern, args_text)
if preserve_quotes:
return matches
# Убираем кавычки
return [m.strip('"').strip("'") for m in matches]
@staticmethod
def parse_flags(
args: List[str]
) -> Tuple[List[str], Dict[str, Union[str, bool]]]:
"""
Парсит флаги из аргументов.
Поддерживает:
- --flag value
- --flag (boolean, True)
- -f value (короткая форма)
Args:
args: Список аргументов
Returns:
Tuple: (аргументы_без_флагов, словарь_флагов)
Example:
>> args = ['user', '--reason', 'spam', '--silent']
>> clean_args, flags = parser.parse_flags(args)
>> print(clean_args) # ['user']
>> print(flags) # {'reason': 'spam', 'silent': True}
"""
clean_args = []
flags = {}
i = 0
while i < len(args):
arg = args[i]
# Длинный флаг --flag
if arg.startswith('--'):
flag_name = arg[2:]
# Проверяем, есть ли значение
if i + 1 < len(args) and not args[i + 1].startswith('-'):
flags[flag_name] = args[i + 1]
i += 2
else:
# Boolean флаг
flags[flag_name] = True
i += 1
# Короткий флаг -f
elif arg.startswith('-') and len(arg) == 2:
flag_name = arg[1]
# Проверяем значение
if i + 1 < len(args) and not args[i + 1].startswith('-'):
flags[flag_name] = args[i + 1]
i += 2
else:
flags[flag_name] = True
i += 1
# Обычный аргумент
else:
clean_args.append(arg)
i += 1
return clean_args, flags
def parse(
self,
text: str,
parse_flags: bool = True
) -> Optional[ParsedCommand]:
"""
Полный парсинг команды.
Args:
text: Текст команды
parse_flags: Парсить флаги
Returns:
Optional[ParsedCommand]: Распарсенная команда или None
Example:
>> parsed = parser.parse('/ban @user 7d --reason "spam bot"')
>> print(parsed.command) # 'ban'
>> print(parsed.args) # ['@user', '7d']
>> print(parsed.flags) # {'reason': 'spam bot'}
"""
if not self.is_command(text):
return None
text = text.strip()
# Находим префикс
prefix = next(p for p in self.prefixes if text.startswith(p))
# Убираем префикс
without_prefix = text[len(prefix):]
# Разделяем на команду и аргументы
parts = without_prefix.split(maxsplit=1)
if not parts:
return None
command_part = parts[0]
raw_args = parts[1] if len(parts) > 1 else None
# Проверяем упоминание бота
bot_username = None
is_group_command = False
if '@' in command_part:
cmd_parts = command_part.split('@')
command_name = cmd_parts[0]
bot_username = cmd_parts[1] if len(cmd_parts) > 1 else None
is_group_command = True
else:
command_name = command_part
# Парсим аргументы
args = self.parse_arguments(raw_args) if raw_args else []
# Парсим флаги
flags = {}
if parse_flags and args:
args, flags = self.parse_flags(args)
return ParsedCommand(
command=command_name.lower(),
prefix=prefix,
args=args,
raw_args=raw_args,
flags=flags,
bot_username=bot_username,
is_group_command=is_group_command
)
def parse_from_message(
self,
message: Message,
parse_flags: bool = True
) -> Optional[ParsedCommand]:
"""
Парсит команду из объекта Message.
Args:
message: Объект сообщения
parse_flags: Парсить флаги
Returns:
Optional[ParsedCommand]: Распарсенная команда
Example:
>> parsed = parser.parse_from_message(message)
>> if parsed:
... print(f"Команда: {parsed.command}")
"""
if not message.text:
return None
return self.parse(message.text, parse_flags=parse_flags)
# Глобальный парсер
_default_parser: Optional[CommandParser] = None
def get_parser() -> CommandParser:
"""Получает глобальный парсер команд"""
global _default_parser
if _default_parser is None:
_default_parser = CommandParser()
return _default_parser
# ================= УДОБНЫЕ ФУНКЦИИ =================
def is_command(text: Optional[str]) -> bool:
"""
Проверяет, является ли текст командой.
Args:
text: Текст для проверки
Returns:
bool: True если это команда
Example:
>> is_command("/start")
True
>> is_command("hello")
False
"""
return get_parser().is_command(text)
def find_argument(text: Optional[str]) -> Optional[str]:
"""
Извлекает аргументы команды.
Args:
text: Текст команды
Returns:
Optional[str]: Аргументы или None
Example:
>> find_argument("/start referrer")
'referrer'
>> find_argument("/ban @user spam")
'@user spam'
"""
return get_parser().find_argument(text)
def get_command(text: Optional[str]) -> Optional[str]:
"""
Извлекает название команды.
Args:
text: Текст сообщения
Returns:
Optional[str]: Название команды или None
Example:
>> get_command("/start@mybot")
'start'
>> get_command("!help")
'help'
"""
return get_parser().get_command(text)
def parse_arguments(args_text: Optional[str]) -> List[str]:
"""
Парсит аргументы команды.
Args:
args_text: Строка аргументов
Returns:
List[str]: Список аргументов
Example:
>> parse_arguments('user 7d "ban reason"')
['user', '7d', 'ban reason']
"""
return get_parser().parse_arguments(args_text)
def parse_flags(args: List[str]) -> Tuple[List[str], Dict[str, Union[str, bool]]]:
"""
Парсит флаги из аргументов.
Args:
args: Список аргументов
Returns:
Tuple: (аргументы, флаги)
Example:
>> args = ['user', '--reason', 'spam', '--silent']
>> clean_args, flags = parse_flags(args)
>> print(flags) # {'reason': 'spam', 'silent': True}
"""
return get_parser().parse_flags(args)
def parse_command(text: str) -> Optional[ParsedCommand]:
"""
Полный парсинг команды.
Args:
text: Текст команды
Returns:
Optional[ParsedCommand]: Распарсенная команда
Example:
>> parsed = parse_command('/ban @user --reason spam')
>> print(parsed.command) # 'ban'
>> print(parsed.args) # ['@user']
>> print(parsed.flags) # {'reason': 'spam'}
"""
return get_parser().parse(text)
# ================= ВАЛИДАЦИЯ КОМАНД =================
def validate_command(
text: str,
expected_command: str,
min_args: int = 0,
max_args: Optional[int] = None,
required_flags: Optional[Set[str]] = None
) -> Tuple[bool, Optional[str]]:
"""
Валидирует команду.
Args:
text: Текст команды
expected_command: Ожидаемая команда
min_args: Минимальное количество аргументов
max_args: Максимальное количество аргументов
required_flags: Обязательные флаги
Returns:
Tuple[bool, Optional[str]]: (валидна, сообщение_об_ошибке)
Example:
>> valid, error = validate_command(
... '/ban user',
... 'ban',
... min_args=1,
... max_args=2
... )
>> if not valid:
... print(error)
"""
parsed = parse_command(text)
if not parsed:
return False, "Невалидная команда"
# Проверка команды
if parsed.command != expected_command:
return False, f"Ожидалась команда '{expected_command}'"
# Проверка количества аргументов
arg_count = len(parsed.args)
if arg_count < min_args:
return False, f"Недостаточно аргументов (минимум {min_args})"
if max_args is not None and arg_count > max_args:
return False, f"Слишком много аргументов (максимум {max_args})"
# Проверка обязательных флагов
if required_flags:
missing_flags = required_flags - set(parsed.flags.keys())
if missing_flags:
return False, f"Отсутствуют обязательные флаги: {', '.join(missing_flags)}"
return True, None
def get_command_usage(
command: str,
args: List[str],
flags: Optional[Dict[str, str]] = None,
description: Optional[str] = None
) -> str:
"""
Формирует строку использования команды.
Args:
command: Название команды
args: Список аргументов
flags: Словарь флагов с описанием
description: Описание команды
Returns:
str: Форматированная строка использования
Example:
>> usage = get_command_usage(
... 'ban',
... ['<user>', '[duration]'],
... {'reason': 'Причина бана', 'silent': 'Тихий бан'},
... 'Банит пользователя'
... )
>> print(usage)
"""
lines = []
# Описание
if description:
lines.append(f"📝 {description}\n")
# Использование
args_str = ' '.join(args)
lines.append(f"<b>Использование:</b>")
lines.append(f"<code>/{command} {args_str}</code>\n")
# Аргументы
if args:
lines.append("<b>Аргументы:</b>")
for arg in args:
# Определяем обязательность
if arg.startswith('<') and arg.endswith('>'):
lines.append(f"{arg} - обязательный")
elif arg.startswith('[') and arg.endswith(']'):
lines.append(f"{arg} - необязательный")
lines.append("")
# Флаги
if flags:
lines.append("<b>Флаги:</b>")
for flag, desc in flags.items():
lines.append(f"• --{flag} - {desc}")
return '\n'.join(lines)
# ================= ИЗВЛЕЧЕНИЕ УПОМИНАНИЙ =================
def extract_mentions(text: str) -> List[str]:
"""
Извлекает все упоминания (@username) из текста.
Args:
text: Текст для анализа
Returns:
List[str]: Список username (без @)
Example:
>> extract_mentions("Бан @user1 и @user2")
['user1', 'user2']
"""
pattern = r'@(\w+)'
return re.findall(pattern, text)
def extract_user_ids(text: str) -> List[int]:
"""
Извлекает все ID пользователей из текста.
Args:
text: Текст для анализа
Returns:
List[int]: Список ID
Example:
>> extract_user_ids("Бан id123456789 и id987654321")
[123456789, 987654321]
"""
pattern = r'id(\d+)'
matches = re.findall(pattern, text)
return [int(m) for m in matches]
def extract_hashtags(text: str) -> List[str]:
"""
Извлекает все хештеги из текста.
Args:
text: Текст для анализа
Returns:
List[str]: Список хештегов (без #)
Example:
>> extract_hashtags("Пост #важное #новости")
['важное', 'новости']
"""
pattern = r'#(\w+)'
return re.findall(pattern, text)

636
bot/utils/auto_delete.py Normal file
View File

@@ -0,0 +1,636 @@
"""
Утилиты для автоматического удаления сообщений
"""
from typing import Optional, Callable, Awaitable, Dict, Any
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from asyncio import sleep, create_task, Task, CancelledError
from aiogram import Bot
from aiogram.types import Message
from aiogram.exceptions import TelegramBadRequest, TelegramForbiddenError
from middleware.loggers import logger
from .format_time import format_duration
__all__ = (
'auto_delete_message',
'schedule_delete',
'cancel_delete',
'delete_after',
'auto_delete_manager',
'AutoDeleteManager',
'DeleteTask',
'delete_both_after',
'delete_messages_after',
)
@dataclass
class DeleteTask:
"""
Задача на удаление сообщения.
Attributes:
chat_id: ID чата
message_id: ID сообщения
delete_at: Время удаления
task: Asyncio task
created_at: Время создания задачи
reason: Причина удаления
callback: Callback функция после удаления
"""
chat_id: int
message_id: int
delete_at: datetime
task: Optional[Task] = None
created_at: datetime = field(default_factory=datetime.now)
reason: Optional[str] = None
callback: Optional[Callable[[], Awaitable[None]]] = None
@property
def delay(self) -> int:
"""Задержка до удаления в секундах"""
delta = self.delete_at - datetime.now()
return max(0, int(delta.total_seconds()))
@property
def is_expired(self) -> bool:
"""Истекло ли время удаления"""
return datetime.now() >= self.delete_at
def __repr__(self) -> str:
return (
f"DeleteTask(chat={self.chat_id}, msg={self.message_id}, "
f"delay={self.delay}s, reason={self.reason})"
)
class AutoDeleteManager:
"""
Менеджер автоматического удаления сообщений.
Возможности:
- Планирование удаления с задержкой
- Отмена запланированного удаления
- Массовое удаление
- Callback функции
- История задач
- Автоматическая очистка завершенных задач
Example:
```python
from utils.auto_delete import auto_delete_manager
# Планирование удаления
await auto_delete_manager.schedule(
bot=bot,
chat_id=message.chat.id,
message_id=message.message_id,
delay=60,
reason="Временное сообщение"
)
# Отмена удаления
auto_delete_manager.cancel(message.chat.id, message.message_id)
# Получение статистики
stats = auto_delete_manager.get_stats()
```
"""
def __init__(self):
# Активные задачи: {(chat_id, message_id): DeleteTask}
self.tasks: Dict[tuple[int, int], DeleteTask] = {}
# Завершенные задачи (последние 100)
self.completed: list[DeleteTask] = []
self.max_completed = 100
# Статистика
self.total_scheduled: int = 0
self.total_deleted: int = 0
self.total_failed: int = 0
self.total_cancelled: int = 0
async def schedule(
self,
bot: Bot,
chat_id: int,
message_id: int,
delay: int,
reason: Optional[str] = None,
callback: Optional[Callable[[], Awaitable[None]]] = None,
log: bool = True
) -> DeleteTask:
"""
Планирует удаление сообщения.
Args:
bot: Экземпляр бота
chat_id: ID чата
message_id: ID сообщения
delay: Задержка в секундах
reason: Причина удаления
callback: Callback функция после удаления
log: Логировать планирование
Returns:
DeleteTask: Созданная задача
Example:
>> task = await auto_delete_manager.schedule(
... bot=bot,
... chat_id=message.chat.id,
... message_id=message.message_id,
... delay=60,
... reason="Спам"
... )
"""
# Отменяем предыдущую задачу если есть
key = (chat_id, message_id)
if key in self.tasks:
self.cancel(chat_id, message_id)
# Создаем задачу
delete_at = datetime.now() + timedelta(seconds=delay)
task_data = DeleteTask(
chat_id=chat_id,
message_id=message_id,
delete_at=delete_at,
reason=reason,
callback=callback
)
# Создаем asyncio task
task = create_task(self._delete_task(bot, task_data, log))
task_data.task = task
# Сохраняем
self.tasks[key] = task_data
self.total_scheduled += 1
if log:
delay_str = format_duration(delay)
logger.info(
f"Запланировано удаление сообщения через {delay_str}",
log_type='AUTO_DELETE'
)
return task_data
async def _delete_task(
self,
bot: Bot,
task_data: DeleteTask,
log: bool
) -> None:
"""
Внутренняя функция для выполнения задачи удаления.
Args:
bot: Экземпляр бота
task_data: Данные задачи
log: Логировать выполнение
"""
key = (task_data.chat_id, task_data.message_id)
try:
# Ждем
await sleep(task_data.delay)
# Удаляем сообщение
await bot.delete_message(
chat_id=task_data.chat_id,
message_id=task_data.message_id
)
self.total_deleted += 1
if log:
reason_str = f" (причина: {task_data.reason})" if task_data.reason else ""
logger.info(
f"Сообщение удалено автоматически{reason_str}",
log_type='AUTO_DELETE'
)
# Вызываем callback если есть
if task_data.callback:
try:
await task_data.callback()
except Exception as e:
logger.error(
f"Ошибка в callback автоудаления: {e}",
log_type='AUTO_DELETE'
)
except CancelledError:
# Задача отменена
self.total_cancelled += 1
if log:
logger.debug(
f"Удаление сообщения отменено",
log_type='AUTO_DELETE'
)
raise
except (TelegramBadRequest, TelegramForbiddenError) as e:
# Ошибка удаления
self.total_failed += 1
if log:
logger.warning(
f"Не удалось автоматически удалить сообщение: {e}",
log_type='AUTO_DELETE'
)
finally:
# Удаляем из активных задач
if key in self.tasks:
completed_task = self.tasks.pop(key)
# Сохраняем в завершенные
self.completed.append(completed_task)
if len(self.completed) > self.max_completed:
self.completed.pop(0)
def cancel(
self,
chat_id: int,
message_id: int,
log: bool = True
) -> bool:
"""
Отменяет запланированное удаление.
Args:
chat_id: ID чата
message_id: ID сообщения
log: Логировать отмену
Returns:
bool: True если задача была отменена
Example:
>> cancelled = auto_delete_manager.cancel(
... chat_id=message.chat.id,
... message_id=message.message_id
... )
"""
key = (chat_id, message_id)
if key in self.tasks:
task_data = self.tasks[key]
# Отменяем asyncio task
if task_data.task and not task_data.task.done():
task_data.task.cancel()
# Удаляем из активных
self.tasks.pop(key)
if log:
logger.debug(
f"Автоудаление отменено для сообщения {message_id}",
log_type='AUTO_DELETE'
)
return True
return False
def cancel_all(self, chat_id: Optional[int] = None) -> int:
"""
Отменяет все запланированные удаления.
Args:
chat_id: ID чата (если None, отменяет для всех чатов)
Returns:
int: Количество отмененных задач
Example:
>> # Отменить для всех чатов
>> count = auto_delete_manager.cancel_all()
>> # Отменить для конкретного чата
>> count = auto_delete_manager.cancel_all(chat_id=message.chat.id)
"""
cancelled_count = 0
# Собираем ключи для отмены
keys_to_cancel = []
for key, task_data in self.tasks.items():
if chat_id is None or task_data.chat_id == chat_id:
keys_to_cancel.append(key)
# Отменяем
for key in keys_to_cancel:
if self.cancel(key[0], key[1], log=False):
cancelled_count += 1
if cancelled_count > 0:
logger.info(
f"Отменено {cancelled_count} задач автоудаления",
log_type='AUTO_DELETE'
)
return cancelled_count
def get_task(
self,
chat_id: int,
message_id: int
) -> Optional[DeleteTask]:
"""
Получает задачу по ID чата и сообщения.
Args:
chat_id: ID чата
message_id: ID сообщения
Returns:
Optional[DeleteTask]: Задача или None
"""
key = (chat_id, message_id)
return self.tasks.get(key)
def get_chat_tasks(self, chat_id: int) -> list[DeleteTask]:
"""
Получает все задачи для чата.
Args:
chat_id: ID чата
Returns:
list[DeleteTask]: Список задач
"""
return [
task for task in self.tasks.values()
if task.chat_id == chat_id
]
def get_stats(self) -> Dict[str, Any]:
"""
Возвращает статистику менеджера.
Returns:
Dict: Словарь со статистикой
Example:
>> stats = auto_delete_manager.get_stats()
>> print(f"Активных задач: {stats['active_tasks']}")
"""
return {
'active_tasks': len(self.tasks),
'completed_tasks': len(self.completed),
'total_scheduled': self.total_scheduled,
'total_deleted': self.total_deleted,
'total_failed': self.total_failed,
'total_cancelled': self.total_cancelled,
'success_rate': (
f"{(self.total_deleted / self.total_scheduled * 100):.1f}%"
if self.total_scheduled > 0 else "0%"
)
}
def cleanup_expired(self) -> int:
"""
Удаляет истекшие задачи (которые должны были выполниться, но не выполнились).
Returns:
int: Количество удаленных задач
"""
expired_keys = [
key for key, task in self.tasks.items()
if task.is_expired and (not task.task or task.task.done())
]
for key in expired_keys:
self.tasks.pop(key)
return len(expired_keys)
# Глобальный менеджер
auto_delete_manager = AutoDeleteManager()
# ================= УДОБНЫЕ ФУНКЦИИ =================
async def auto_delete_message(
bot: Bot,
chat_id: int,
message_id: int,
delay: int = 604800,
reason: Optional[str] = None
) -> DeleteTask:
"""
Автоматически удаляет сообщение через указанное время.
Args:
bot: Экземпляр бота
chat_id: ID чата
message_id: ID сообщения
delay: Задержка в секундах (по умолчанию 7 дней)
reason: Причина удаления
Returns:
DeleteTask: Созданная задача
Example:
>> # Удалить через 1 минуту
>> await auto_delete_message(bot, chat_id, message_id, delay=60)
>> # Удалить через 7 дней (по умолчанию)
>> await auto_delete_message(bot, chat_id, message_id)
"""
return await auto_delete_manager.schedule(
bot=bot,
chat_id=chat_id,
message_id=message_id,
delay=delay,
reason=reason
)
async def schedule_delete(
message: Message,
delay: int,
reason: Optional[str] = None
) -> DeleteTask:
"""
Планирует удаление сообщения (упрощенная версия).
Args:
message: Объект сообщения
delay: Задержка в секундах
reason: Причина удаления
Returns:
DeleteTask: Созданная задача
Example:
>> # Планируем удаление через 30 секунд
>> await schedule_delete(message, delay=30, reason="Временное")
"""
return await auto_delete_manager.schedule(
bot=message.bot,
chat_id=message.chat.id,
message_id=message.message_id,
delay=delay,
reason=reason
)
def cancel_delete(message: Message) -> bool:
"""
Отменяет запланированное удаление сообщения.
Args:
message: Объект сообщения
Returns:
bool: True если удаление было отменено
Example:
>> if cancel_delete(message):
... await message.answer("Удаление отменено")
"""
return auto_delete_manager.cancel(
chat_id=message.chat.id,
message_id=message.message_id
)
async def delete_after(
message: Message,
text: str,
delay: int = 10,
**kwargs
) -> Message:
"""
Отправляет сообщение и автоматически удаляет его через указанное время.
Args:
message: Исходное сообщение
text: Текст нового сообщения
delay: Задержка до удаления в секундах
**kwargs: Дополнительные параметры для message.answer()
Returns:
Message: Отправленное сообщение
Example:
>> # Отправить и удалить через 10 секунд
>> await delete_after(message, "Это временное сообщение")
>> # Отправить и удалить через 5 секунд
>> await delete_after(
... message,
... "⚠️ Ошибка!",
... delay=5,
... parse_mode="HTML"
... )
"""
sent_message = await message.answer(text, **kwargs)
await auto_delete_manager.schedule(
bot=message.bot,
chat_id=sent_message.chat.id,
message_id=sent_message.message_id,
delay=delay,
reason="delete_after"
)
return sent_message
async def delete_both_after(
original: Message,
reply_text: str,
delay: int = 10,
**kwargs
) -> Message:
"""
Отправляет ответ и удаляет оба сообщения через указанное время.
Args:
original: Исходное сообщение
reply_text: Текст ответа
delay: Задержка до удаления
**kwargs: Дополнительные параметры
Returns:
Message: Отправленное сообщение
Example:
>> # Удалить и команду, и ответ через 5 секунд
>> await delete_both_after(
... message,
... "✅ Команда выполнена",
... delay=5
... )
"""
# Отправляем ответ
sent = await delete_after(original, reply_text, delay, **kwargs)
# Планируем удаление оригинала
await auto_delete_manager.schedule(
bot=original.bot,
chat_id=original.chat.id,
message_id=original.message_id,
delay=delay,
reason="delete_both"
)
return sent
async def delete_messages_after(
bot: Bot,
chat_id: int,
message_ids: list[int],
delay: int
) -> int:
"""
Планирует удаление нескольких сообщений.
Args:
bot: Экземпляр бота
chat_id: ID чата
message_ids: Список ID сообщений
delay: Задержка до удаления
Returns:
int: Количество запланированных удалений
Example:
>> # Удалить все сообщения через 1 час
>> count = await delete_messages_after(
... bot,
... chat_id,
... [123, 124, 125, 126],
... delay=3600
... )
"""
count = 0
for message_id in message_ids:
await auto_delete_manager.schedule(
bot=bot,
chat_id=chat_id,
message_id=message_id,
delay=delay,
reason="mass_delete",
log=False
)
count += 1
logger.info(
f"Запланировано удаление {count} сообщений через {format_duration(delay)}",
log_type='AUTO_DELETE'
)
return count

812
bot/utils/decorators.py Normal file
View File

@@ -0,0 +1,812 @@
"""
Декораторы для обработчиков бота
"""
import asyncio
from typing import Callable, Optional, Union
from functools import wraps
from datetime import datetime
from collections import defaultdict
from aiogram.types import Message, CallbackQuery
from aiogram.exceptions import TelegramBadRequest, TelegramForbiddenError
from aiogram.enums import ChatType, ChatMemberStatus
from middleware.loggers import logger
from .format_time import format_duration
__all__ = (
'admin_only',
'owner_only',
'private_only',
'group_only',
'rate_limit',
'cooldown',
'log_action',
'catch_errors',
'typing_action',
'delete_command',
'answer_on_error',
'permission_required',
'throttle',
'admin_action'
)
# ================= ХРАНИЛИЩА ДЛЯ RATE LIMIT =================
class RateLimitStorage:
"""Хранилище для rate limiting"""
def __init__(self):
# {user_id: {action: datetime}}
self._storage: dict[int, dict[str, datetime]] = defaultdict(dict)
# {user_id: {action: count}}
self._counters: dict[int, dict[str, int]] = defaultdict(lambda: defaultdict(int))
def check(
self,
user_id: int,
action: str,
limit: int,
period: int
) -> tuple[bool, Optional[int]]:
"""
Проверяет лимит.
Returns:
tuple[bool, Optional[int]]: (можно ли выполнить, секунд до сброса)
"""
now = datetime.now()
if action not in self._storage[user_id]:
# Первое использование
self._storage[user_id][action] = now
self._counters[user_id][action] = 1
return True, None
last_use = self._storage[user_id][action]
time_passed = (now - last_use).total_seconds()
# Если прошел период - сбрасываем
if time_passed >= period:
self._storage[user_id][action] = now
self._counters[user_id][action] = 1
return True, None
# Проверяем счетчик
count = self._counters[user_id][action]
if count >= limit:
# Превышен лимит
retry_after = int(period - time_passed)
return False, retry_after
# Увеличиваем счетчик
self._counters[user_id][action] += 1
return True, None
def reset(self, user_id: int, action: Optional[str] = None):
"""Сбрасывает лимит для пользователя"""
if action:
if user_id in self._storage:
self._storage[user_id].pop(action, None)
self._counters[user_id].pop(action, None)
else:
self._storage.pop(user_id, None)
self._counters.pop(user_id, None)
def cleanup(self, max_age: int = 3600):
"""Очищает старые записи"""
now = datetime.now()
expired_users = []
for user_id, actions in self._storage.items():
expired_actions = [
action for action, dt in actions.items()
if (now - dt).total_seconds() > max_age
]
for action in expired_actions:
actions.pop(action, None)
self._counters[user_id].pop(action, None)
if not actions:
expired_users.append(user_id)
for user_id in expired_users:
self._storage.pop(user_id, None)
self._counters.pop(user_id, None)
# Глобальное хранилище
_rate_limit_storage = RateLimitStorage()
_cooldown_storage = RateLimitStorage()
# ================= ПРОВЕРКА ПРАВ =================
async def _check_admin_rights(
message: Message,
user_id: Optional[int] = None
) -> bool:
"""
Проверяет, является ли пользователь администратором.
Args:
message: Объект сообщения
user_id: ID пользователя (если None, проверяется отправитель)
Returns:
bool: True если администратор
"""
# В личных сообщениях все пользователи "администраторы"
if message.chat.type == ChatType.PRIVATE:
return True
check_user_id = user_id or message.from_user.id
try:
member = await message.bot.get_chat_member(
chat_id=message.chat.id,
user_id=check_user_id
)
return member.status in {
ChatMemberStatus.CREATOR,
ChatMemberStatus.ADMINISTRATOR
}
except (TelegramBadRequest, TelegramForbiddenError):
return False
async def _check_owner_rights(message: Message) -> bool:
"""Проверяет, является ли пользователь владельцем чата"""
if message.chat.type == ChatType.PRIVATE:
return True
try:
member = await message.bot.get_chat_member(
chat_id=message.chat.id,
user_id=message.from_user.id
)
return member.status == ChatMemberStatus.CREATOR
except (TelegramBadRequest, TelegramForbiddenError):
return False
async def _check_bot_admin_rights(message: Message) -> bool:
"""Проверяет, является ли бот администратором"""
if message.chat.type == ChatType.PRIVATE:
return True
try:
bot_member = await message.bot.get_chat_member(
chat_id=message.chat.id,
user_id=message.bot.id
)
return bot_member.status in {
ChatMemberStatus.ADMINISTRATOR
}
except (TelegramBadRequest, TelegramForbiddenError):
return False
# ================= ДЕКОРАТОРЫ ДЛЯ ПРАВ =================
def admin_only(
reply_text: str = "❌ Эта команда доступна только администраторам",
check_bot: bool = False
):
"""
Декоратор: выполнение только для администраторов.
Args:
reply_text: Текст ответа если не админ
check_bot: Также проверять права бота
Example:
```python
@router.message(Command("ban"))
@admin_only()
async def ban_handler(message: Message):
await message.answer("Бан пользователя...")
```
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(update: Union[Message, CallbackQuery], *args, **kwargs):
# Получаем message
message = update if isinstance(update, Message) else update.message
if not message:
return None
# Проверяем права пользователя
if not await _check_admin_rights(message):
if isinstance(update, CallbackQuery):
await update.answer(reply_text, show_alert=True)
else:
await message.answer(reply_text)
logger.warning(
f"Попытка использования admin команды от @{message.from_user.id}",
log_type='SECURITY'
)
return None
# Проверяем права бота если нужно
if check_bot and not await _check_bot_admin_rights(message):
error_text = "❌ Бот не является администратором чата"
if isinstance(update, CallbackQuery):
await update.answer(error_text, show_alert=True)
else:
await message.answer(error_text)
return None
return await func(update, *args, **kwargs)
return wrapper
return decorator
def owner_only(reply_text: str = "❌ Эта команда доступна только владельцу чата"):
"""
Декоратор: выполнение только для владельца чата.
Args:
reply_text: Текст ответа если не владелец
Example:
```python
@router.message(Command("destroy"))
@owner_only()
async def destroy_handler(message: Message):
await message.answer("Удаление чата...")
```
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(update: Union[Message, CallbackQuery], *args, **kwargs):
message = update if isinstance(update, Message) else update.message
if not message:
return None
if not await _check_owner_rights(message):
if isinstance(update, CallbackQuery):
await update.answer(reply_text, show_alert=True)
else:
await message.answer(reply_text)
logger.warning(
f"Попытка использования owner команды от @{message.from_user.id}",
log_type='SECURITY'
)
return None
return await func(update, *args, **kwargs)
return wrapper
return decorator
def permission_required(*permissions: str):
"""
Декоратор: проверка конкретных прав администратора.
Args:
permissions: Список прав (can_delete_messages, can_restrict_members, и т.д.)
Example:
```python
@router.message(Command("pin"))
@permission_required("can_pin_messages")
async def pin_handler(message: Message):
await message.reply_to_message.pin()
```
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(update: Union[Message, CallbackQuery], *args, **kwargs):
message = update if isinstance(update, Message) else update.message
if not message:
return None
# В личных сообщениях пропускаем проверку
if message.chat.type == ChatType.PRIVATE:
return await func(update, *args, **kwargs)
try:
member = await message.bot.get_chat_member(
chat_id=message.chat.id,
user_id=message.from_user.id
)
# Владелец имеет все права
if member.status == ChatMemberStatus.CREATOR:
return await func(update, *args, **kwargs)
# Проверяем права
if member.status == ChatMemberStatus.ADMINISTRATOR:
missing_permissions = []
for perm in permissions:
if not getattr(member, perm, False):
missing_permissions.append(perm)
if missing_permissions:
error_text = (
f"❌ Недостаточно прав\n"
f"Требуются: {', '.join(missing_permissions)}"
)
if isinstance(update, CallbackQuery):
await update.answer(error_text, show_alert=True)
else:
await message.answer(error_text)
return None
return await func(update, *args, **kwargs)
# Не администратор
error_text = "❌ Эта команда доступна только администраторам"
if isinstance(update, CallbackQuery):
await update.answer(error_text, show_alert=True)
else:
await message.answer(error_text)
except (TelegramBadRequest, TelegramForbiddenError):
pass
return wrapper
return decorator
# ================= ДЕКОРАТОРЫ ДЛЯ ТИПОВ ЧАТОВ =================
def private_only(reply_text: str = "❌ Эта команда работает только в личных сообщениях"):
"""
Декоратор: выполнение только в личных сообщениях.
Example:
```python
@router.message(Command("start"))
@private_only()
async def start_handler(message: Message):
await message.answer("Приветствие...")
```
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(update: Union[Message, CallbackQuery], *args, **kwargs):
message = update if isinstance(update, Message) else update.message
if not message:
return None
if message.chat.type != ChatType.PRIVATE:
if isinstance(update, CallbackQuery):
await update.answer(reply_text, show_alert=True)
else:
await message.answer(reply_text)
return None
return await func(update, *args, **kwargs)
return wrapper
return decorator
def group_only(reply_text: str = "❌ Эта команда работает только в группах"):
"""
Декоратор: выполнение только в группах.
Example:
```python
@router.message(Command("ban"))
@group_only()
async def ban_handler(message: Message):
await message.answer("Бан пользователя...")
```
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(update: Union[Message, CallbackQuery], *args, **kwargs):
message = update if isinstance(update, Message) else update.message
if not message:
return None
if message.chat.type not in {ChatType.GROUP, ChatType.SUPERGROUP}:
if isinstance(update, CallbackQuery):
await update.answer(reply_text, show_alert=True)
else:
await message.answer(reply_text)
return None
return await func(update, *args, **kwargs)
return wrapper
return decorator
# ================= RATE LIMITING =================
def rate_limit(limit: int = 1, period: int = 60, action: Optional[str] = None):
"""
Декоратор: ограничение частоты вызовов.
Args:
limit: Количество вызовов
period: Период в секундах
action: Название действия (по умолчанию имя функции)
Example:
```python
@router.message(Command("search"))
@rate_limit(limit=3, period=60) # 3 раза в минуту
async def search_handler(message: Message):
await message.answer("Поиск...")
```
"""
def decorator(func: Callable) -> Callable:
action_name = action or func.__name__
@wraps(func)
async def wrapper(update: Union[Message, CallbackQuery], *args, **kwargs):
message = update if isinstance(update, Message) else update.message
if not message:
return None
user_id = message.from_user.id
# Проверяем лимит
allowed, retry_after = _rate_limit_storage.check(
user_id, action_name, limit, period
)
if not allowed:
retry_time = format_duration(retry_after)
error_text = f"⏳ Слишком часто! Повторите через {retry_time}"
if isinstance(update, CallbackQuery):
await update.answer(error_text, show_alert=True)
else:
await message.answer(error_text)
logger.debug(
f"Rate limit для пользователя {user_id}: {action_name}",
log_type='RATE_LIMIT'
)
return None
return await func(update, *args, **kwargs)
return wrapper
return decorator
def cooldown(seconds: int, action: Optional[str] = None):
"""
Декоратор: кулдаун между вызовами (1 раз в N секунд).
Args:
seconds: Кулдаун в секундах
action: Название действия
Example:
```python
@router.message(Command("daily"))
@cooldown(seconds=86400) # Раз в день
async def daily_handler(message: Message):
await message.answer("Ежедневная награда!")
```
"""
return rate_limit(limit=1, period=seconds, action=action)
def throttle(rate: float = 1.0):
"""
Декоратор: throttling (antiflood).
Args:
rate: Минимальный интервал в секундах между вызовами
Example:
```python
@router.message()
@throttle(rate=0.5) # Не чаще 2 раз в секунду
async def echo_handler(message: Message):
await message.answer(message.text)
```
"""
return cooldown(seconds=int(rate), action='throttle')
# ================= ЛОГИРОВАНИЕ =================
def log_action(
action_name: Optional[str] = None,
log_args: bool = False
):
"""
Декоратор: логирование действий.
Args:
action_name: Название действия (по умолчанию имя функции)
log_args: Логировать аргументы
Example:
```python
@router.message(Command("ban"))
@log_action("BAN_USER", log_args=True)
async def ban_handler(message: Message):
await message.answer("Бан...")
```
"""
def decorator(func: Callable) -> Callable:
name = action_name or func.__name__
@wraps(func)
async def wrapper(update: Union[Message, CallbackQuery], *args, **kwargs):
message = update if isinstance(update, Message) else update.message
if not message:
return await func(update, *args, **kwargs)
user_id = message.from_user.id
username = message.from_user.username or f"id{user_id}"
# Логируем начало
log_msg = f"Действие '{name}' от @{username}"
if log_args and message.text:
log_msg += f" | Аргументы: {message.text}"
logger.info(log_msg, log_type='ACTION')
try:
result = await func(update, *args, **kwargs)
logger.info(f"Действие '{name}' выполнено успешно", log_type='ACTION')
return result
except Exception as e:
logger.error(f"Ошибка в действии '{name}': {e}", log_type='ACTION')
raise
return wrapper
return decorator
# ================= ОБРАБОТКА ОШИБОК =================
def catch_errors(
notify_user: bool = True,
error_message: str = "❌ Произошла ошибка при выполнении команды"
):
"""
Декоратор: перехват ошибок.
Args:
notify_user: Уведомлять пользователя об ошибке
error_message: Текст уведомления
Example:
```python
@router.message(Command("risky"))
@catch_errors(notify_user=True)
async def risky_handler(message: Message):
# Код который может вызвать ошибку
...
```
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(update: Union[Message, CallbackQuery], *args, **kwargs):
try:
return await func(update, *args, **kwargs)
except Exception as e:
logger.error(
f"Ошибка в {func.__name__}: {e}",
log_type='ERROR'
)
if notify_user:
message = update if isinstance(update, Message) else update.message
if message:
try:
if isinstance(update, CallbackQuery):
await update.answer(error_message, show_alert=True)
else:
await message.answer(error_message)
except:
pass
return wrapper
return decorator
def answer_on_error(error_message: str = "❌ Ошибка"):
"""
Декоратор: ответ пользователю при ошибке.
Alias для catch_errors с уведомлением.
"""
return catch_errors(notify_user=True, error_message=error_message)
# ================= ДЕЙСТВИЯ =================
def typing_action():
"""
Декоратор: показывает "печатает..." во время выполнения.
Example:
```python
@router.message(Command("search"))
@typing_action()
async def search_handler(message: Message):
# Долгий поиск...
await asyncio.sleep(3)
await message.answer("Результаты поиска")
```
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(update: Union[Message, CallbackQuery], *args, **kwargs):
message = update if isinstance(update, Message) else update.message
if not message:
return await func(update, *args, **kwargs)
# Отправляем действие "печатает"
async def send_typing():
try:
while True:
await message.bot.send_chat_action(
chat_id=message.chat.id,
action="typing"
)
await asyncio.sleep(4) # Обновляем каждые 4 секунды
except asyncio.CancelledError:
pass
# Создаем задачу
typing_task = asyncio.create_task(send_typing())
try:
result = await func(update, *args, **kwargs)
return result
finally:
typing_task.cancel()
try:
await typing_task
except asyncio.CancelledError:
pass
return wrapper
return decorator
def delete_command(delay: Optional[int] = None):
"""
Декоратор: удаляет команду после выполнения.
Args:
delay: Задержка перед удалением (секунды)
Example:
```python
@router.message(Command("clean"))
@delete_command(delay=0)
async def clean_handler(message: Message):
await message.answer("Очистка...")
```
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(message: Message, *args, **kwargs):
if not isinstance(message, Message):
return await func(message, *args, **kwargs)
# Выполняем функцию
result = await func(message, *args, **kwargs)
# Удаляем команду
try:
if delay:
await asyncio.sleep(delay)
await message.delete()
except (TelegramBadRequest, TelegramForbiddenError):
pass
return result
return wrapper
return decorator
# ================= КОМБИНИРОВАННЫЕ ДЕКОРАТОРЫ =================
def admin_action(
log: bool = True,
check_bot: bool = True,
delete_cmd: bool = False
):
"""
Комбинированный декоратор для admin команд.
Args:
log: Логировать действие
check_bot: Проверять права бота
delete_cmd: Удалять команду
Example:
```python
@router.message(Command("ban"))
@admin_action(log=True, check_bot=True)
async def ban_handler(message: Message):
await message.answer("Бан...")
```
"""
def decorator(func: Callable) -> Callable:
# Применяем декораторы
decorated = func
if log:
decorated = log_action(f"ADMIN_{func.__name__.upper()}")(decorated)
decorated = admin_only(check_bot=check_bot)(decorated)
if delete_cmd:
decorated = delete_command()(decorated)
return decorated
return decorator
# ================= ОЧИСТКА ХРАНИЛИЩ =================
def cleanup_storage(max_age: int = 3600):
"""
Очищает хранилища rate limit от старых записей.
Args:
max_age: Максимальный возраст записи в секундах
"""
_rate_limit_storage.cleanup(max_age)
_cooldown_storage.cleanup(max_age)

523
bot/utils/format_time.py Normal file
View File

@@ -0,0 +1,523 @@
"""
Утилиты для форматирования времени и дат
"""
from typing import Optional, Union
from datetime import datetime, timedelta
from enum import Enum
__all__ = (
'format_duration',
'format_retry_time',
'format_timestamp',
'format_relative_time',
'parse_duration',
'TimeFormat',
'get_plural_form',
'seconds_to_human',
'time_until',
'time_since',
'format_date_range',
'is_today',
'is_yesterday',
'is_tomorrow',
'smart_date'
)
class TimeFormat(str, Enum):
"""Форматы времени"""
FULL = 'full' # 1 час 30 минут 45 секунд
SHORT = 'short' # 1ч 30м 45с
COMPACT = 'compact' # 1:30:45
MINIMAL = 'minimal' # 1ч 30м (без секунд если есть часы/минуты)
def get_plural_form(number: int, forms: tuple[str, str, str]) -> str:
"""
Возвращает правильную форму множественного числа для русского языка.
Args:
number: Число
forms: Кортеж форм (1 секунда, 2 секунды, 5 секунд)
Returns:
str: Правильная форма
Example:
>> get_plural_form(1, ('секунда', 'секунды', 'секунд'))
'секунда'
>> get_plural_form(2, ('секунда', 'секунды', 'секунд'))
'секунды'
>> get_plural_form(5, ('секунда', 'секунды', 'секунд'))
'секунд'
"""
n = abs(number)
n %= 100
if 5 <= n <= 20:
return forms[2]
n %= 10
if n == 1:
return forms[0]
elif 2 <= n <= 4:
return forms[1]
else:
return forms[2]
def format_duration(
seconds: int,
format_type: TimeFormat = TimeFormat.FULL,
include_seconds: bool = True,
max_units: Optional[int] = None
) -> str:
"""
Форматирует длительность в читаемый вид.
Args:
seconds: Длительность в секундах
format_type: Тип форматирования
include_seconds: Включать секунды в вывод
max_units: Максимальное количество единиц времени (например, только часы и минуты)
Returns:
str: Отформатированная строка
Example:
>> format_duration(3665)
'1 час 1 минута 5 секунд'
>> format_duration(3665, TimeFormat.SHORT)
'1ч 1м 5с'
>> format_duration(3665, TimeFormat.COMPACT)
'1:01:05'
>> format_duration(3665, max_units=2)
'1 час 1 минута'
"""
if seconds == 0:
if format_type == TimeFormat.FULL:
return "0 секунд"
elif format_type == TimeFormat.SHORT:
return "0с"
elif format_type == TimeFormat.COMPACT:
return "0:00"
else:
return "0с"
# Разбиваем на единицы
weeks, remainder = divmod(seconds, 604800) # 7 * 24 * 60 * 60
days, remainder = divmod(remainder, 86400) # 24 * 60 * 60
hours, remainder = divmod(remainder, 3600)
minutes, secs = divmod(remainder, 60)
# Компактный формат
if format_type == TimeFormat.COMPACT:
if weeks > 0:
return f"{weeks * 7 + days}д {hours:02d}:{minutes:02d}:{secs:02d}"
elif days > 0:
return f"{days}д {hours:02d}:{minutes:02d}:{secs:02d}"
elif hours > 0:
return f"{hours}:{minutes:02d}:{secs:02d}"
elif minutes > 0:
return f"{minutes}:{secs:02d}"
else:
return f"0:{secs:02d}"
# Собираем части
parts = []
units_count = 0
# Недели
if weeks > 0:
if format_type == TimeFormat.SHORT:
parts.append(f"{weeks}нед")
else:
week_form = get_plural_form(weeks, ('неделя', 'недели', 'недель'))
parts.append(f"{weeks} {week_form}")
units_count += 1
if max_units and units_count >= max_units:
return ' '.join(parts)
# Дни
if days > 0:
if format_type == TimeFormat.SHORT:
parts.append(f"{days}д")
else:
day_form = get_plural_form(days, ('день', 'дня', 'дней'))
parts.append(f"{days} {day_form}")
units_count += 1
if max_units and units_count >= max_units:
return ' '.join(parts)
# Часы
if hours > 0:
if format_type == TimeFormat.SHORT:
parts.append(f"{hours}ч")
else:
hour_form = get_plural_form(hours, ('час', 'часа', 'часов'))
parts.append(f"{hours} {hour_form}")
units_count += 1
if max_units and units_count >= max_units:
return ' '.join(parts)
# Минуты
if minutes > 0:
if format_type == TimeFormat.SHORT:
parts.append(f"{minutes}м")
else:
minute_form = get_plural_form(minutes, ('минута', 'минуты', 'минут'))
parts.append(f"{minutes} {minute_form}")
units_count += 1
if max_units and units_count >= max_units:
return ' '.join(parts)
# Секунды
if secs > 0 and include_seconds:
# Минимальный формат: не показываем секунды если есть часы или дни
if format_type == TimeFormat.MINIMAL and (hours > 0 or days > 0 or weeks > 0):
pass
else:
if format_type == TimeFormat.SHORT:
parts.append(f"{secs}с")
else:
second_form = get_plural_form(secs, ('секунда', 'секунды', 'секунд'))
parts.append(f"{secs} {second_form}")
return ' '.join(parts) if parts else "0 секунд"
def format_retry_time(retry_after: int, format_type: TimeFormat = TimeFormat.FULL) -> str:
"""
Форматирует время повторной попытки.
Args:
retry_after: Время в секундах до следующей попытки
format_type: Тип форматирования
Returns:
str: Отформатированная строка
Example:
>> format_retry_time(3665)
'1 час 1 минута 5 секунд'
>> format_retry_time(3665, TimeFormat.SHORT)
'1ч 1м 5с'
"""
return format_duration(retry_after, format_type=format_type)
def format_timestamp(
timestamp: Union[int, float, datetime],
format_string: str = "%d.%m.%Y %H:%M:%S",
timezone_offset: Optional[int] = None
) -> str:
"""
Форматирует timestamp в читаемую дату.
Args:
timestamp: Unix timestamp или datetime объект
format_string: Формат вывода
timezone_offset: Смещение часового пояса в часах
Returns:
str: Отформатированная дата
Example:
>> format_timestamp(1640000000)
'20.12.2021 13:33:20'
>> format_timestamp(datetime.now(), "%d %B %Y")
'17 февраля 2026'
"""
if isinstance(timestamp, (int, float)):
dt = datetime.fromtimestamp(timestamp)
else:
dt = timestamp
# Применяем смещение часового пояса
if timezone_offset is not None:
dt = dt + timedelta(hours=timezone_offset)
return dt.strftime(format_string)
def format_relative_time(
timestamp: Union[int, float, datetime],
now: Optional[datetime] = None,
detailed: bool = False
) -> str:
"""
Форматирует время относительно текущего момента.
Args:
timestamp: Unix timestamp или datetime объект
now: Текущее время (по умолчанию datetime.now())
detailed: Детальный формат (например "2 часа 30 минут назад" вместо "2 часа назад")
Returns:
str: Относительное время
Example:
>> format_relative_time(time.time() - 3600)
'1 час назад'
>> format_relative_time(time.time() + 7200)
'через 2 часа'
>> format_relative_time(time.time() - 90, detailed=True)
'1 минута 30 секунд назад'
"""
if now is None:
now = datetime.now()
if isinstance(timestamp, (int, float)):
dt = datetime.fromtimestamp(timestamp)
else:
dt = timestamp
# Вычисляем разницу
delta = now - dt
is_past = delta.total_seconds() > 0
seconds = abs(int(delta.total_seconds()))
# Если меньше минуты
if seconds < 60:
if is_past:
return "только что"
else:
return "сейчас"
# Форматируем длительность
if detailed:
duration = format_duration(seconds, TimeFormat.FULL, max_units=2)
else:
duration = format_duration(seconds, TimeFormat.FULL, max_units=1)
if is_past:
return f"{duration} назад"
else:
return f"через {duration}"
def parse_duration(duration_str: str) -> Optional[int]:
"""
Парсит строку длительности в секунды.
Args:
duration_str: Строка длительности (например "1ч 30м", "2h 15m", "90s")
Returns:
Optional[int]: Длительность в секундах или None если не удалось распарсить
Example:
>> parse_duration("1ч 30м")
5400
>> parse_duration("2h 15m 30s")
8130
>> parse_duration("90s")
90
"""
import re
# Паттерны для разных единиц
patterns = {
'weeks': r'(\d+)\s*(?:нед|w|week|weeks)',
'days': r'(\d+)\s*(?:д|d|day|days)',
'hours': r'(\d+)\s*(?:ч|h|hour|hours)',
'minutes': r'(\d+)\s*(?:м|m|min|minutes)',
'seconds': r'(\d+)\s*(?:с|s|sec|seconds)'
}
total_seconds = 0
# Ищем каждую единицу
for unit, pattern in patterns.items():
match = re.search(pattern, duration_str, re.IGNORECASE)
if match:
value = int(match.group(1))
if unit == 'weeks':
total_seconds += value * 604800
elif unit == 'days':
total_seconds += value * 86400
elif unit == 'hours':
total_seconds += value * 3600
elif unit == 'minutes':
total_seconds += value * 60
elif unit == 'seconds':
total_seconds += value
return total_seconds if total_seconds > 0 else None
# ================= ДОПОЛНИТЕЛЬНЫЕ УТИЛИТЫ =================
def seconds_to_human(seconds: int) -> str:
"""
Преобразует секунды в человекопонятный формат (самая большая единица).
Args:
seconds: Количество секунд
Returns:
str: Человекопонятный формат
Example:
>> seconds_to_human(3600)
'1 час'
>> seconds_to_human(90)
'1.5 минуты'
"""
if seconds >= 604800: # Неделя
weeks = seconds / 604800
week_form = get_plural_form(int(weeks), ('неделя', 'недели', 'недель'))
return f"{weeks:.1f} {week_form}".replace('.0', '')
elif seconds >= 86400: # День
days = seconds / 86400
day_form = get_plural_form(int(days), ('день', 'дня', 'дней'))
return f"{days:.1f} {day_form}".replace('.0', '')
elif seconds >= 3600: # Час
hours = seconds / 3600
hour_form = get_plural_form(int(hours), ('час', 'часа', 'часов'))
return f"{hours:.1f} {hour_form}".replace('.0', '')
elif seconds >= 60: # Минута
minutes = seconds / 60
minute_form = get_plural_form(int(minutes), ('минута', 'минуты', 'минут'))
return f"{minutes:.1f} {minute_form}".replace('.0', '')
else: # Секунда
second_form = get_plural_form(seconds, ('секунда', 'секунды', 'секунд'))
return f"{seconds} {second_form}"
def time_until(target_time: datetime, format_type: TimeFormat = TimeFormat.FULL) -> str:
"""
Возвращает время до указанного момента.
Args:
target_time: Целевое время
format_type: Тип форматирования
Returns:
str: Отформатированное время
Example:
>> target = datetime.now() + timedelta(hours=2, minutes=30)
>> time_until(target)
'2 часа 30 минут'
"""
now = datetime.now()
delta = target_time - now
if delta.total_seconds() <= 0:
return "уже прошло"
seconds = int(delta.total_seconds())
return format_duration(seconds, format_type=format_type)
def time_since(start_time: datetime, format_type: TimeFormat = TimeFormat.FULL) -> str:
"""
Возвращает время с указанного момента.
Args:
start_time: Начальное время
format_type: Тип форматирования
Returns:
str: Отформатированное время
Example:
>> start = datetime.now() - timedelta(hours=1, minutes=15)
>> time_since(start)
'1 час 15 минут'
"""
now = datetime.now()
delta = now - start_time
if delta.total_seconds() <= 0:
return "еще не началось"
seconds = int(delta.total_seconds())
return format_duration(seconds, format_type=format_type)
def format_date_range(start: datetime, end: datetime) -> str:
"""
Форматирует диапазон дат.
Args:
start: Начальная дата
end: Конечная дата
Returns:
str: Отформатированный диапазон
Example:
>> start = datetime(2026, 2, 17, 10, 0)
>> end = datetime(2026, 2, 17, 18, 0)
>> format_date_range(start, end)
'17.02.2026 с 10:00 до 18:00'
"""
if start.date() == end.date():
# Один день
return f"{start.strftime('%d.%m.%Y')} с {start.strftime('%H:%M')} до {end.strftime('%H:%M')}"
else:
# Разные дни
return f"с {start.strftime('%d.%m.%Y %H:%M')} до {end.strftime('%d.%m.%Y %H:%M')}"
def is_today(dt: datetime) -> bool:
"""Проверяет, является ли дата сегодняшней"""
return dt.date() == datetime.now().date()
def is_yesterday(dt: datetime) -> bool:
"""Проверяет, является ли дата вчерашней"""
yesterday = datetime.now().date() - timedelta(days=1)
return dt.date() == yesterday
def is_tomorrow(dt: datetime) -> bool:
"""Проверяет, является ли дата завтрашней"""
tomorrow = datetime.now().date() + timedelta(days=1)
return dt.date() == tomorrow
def smart_date(dt: datetime) -> str:
"""
Умное форматирование даты (сегодня, вчера, завтра, или дата).
Args:
dt: Дата для форматирования
Returns:
str: Отформатированная дата
Example:
>> smart_date(datetime.now())
'сегодня в 14:30'
>> smart_date(datetime.now() - timedelta(days=1))
'вчера в 20:15'
"""
if is_today(dt):
return f"сегодня в {dt.strftime('%H:%M')}"
elif is_yesterday(dt):
return f"вчера в {dt.strftime('%H:%M')}"
elif is_tomorrow(dt):
return f"завтра в {dt.strftime('%H:%M')}"
else:
# Если в этом году, не показываем год
if dt.year == datetime.now().year:
return dt.strftime('%d.%m в %H:%M')
else:
return dt.strftime('%d.%m.%Y в %H:%M')

View File

@@ -0,0 +1,504 @@
"""
Утилиты для упоминаний пользователей (mentions)
"""
from typing import Optional, List, Set
from datetime import datetime, timedelta
from aiogram import Bot
from aiogram.types import Message, ChatMemberAdministrator, ChatMemberOwner, User
from aiogram.utils.markdown import hide_link, hlink
from aiogram.exceptions import TelegramBadRequest, TelegramForbiddenError
__all__ = (
'mention_admins',
'mention_user',
'mention_users',
'get_admins_list',
'AdminCache',
'admin_cache',
'mention_moderators',
'mention_owner',
'hidden_admins_message'
)
class AdminCache:
"""
Кэш для списков администраторов чатов.
Уменьшает количество запросов к API Telegram.
"""
def __init__(self, ttl: int = 300):
"""
Args:
ttl: Время жизни кэша в секундах (по умолчанию 5 минут)
"""
self.ttl = ttl
# {chat_id: (admins_list, timestamp)}
self._cache: dict[int, tuple[List[User], datetime]] = {}
# Статистика
self.hits: int = 0
self.misses: int = 0
def get(self, chat_id: int) -> Optional[List[User]]:
"""
Получает список админов из кэша.
Args:
chat_id: ID чата
Returns:
List[User] или None если кэш устарел
"""
if chat_id in self._cache:
admins, timestamp = self._cache[chat_id]
# Проверяем актуальность
if datetime.now() - timestamp < timedelta(seconds=self.ttl):
self.hits += 1
return admins
else:
# Удаляем устаревшую запись
del self._cache[chat_id]
self.misses += 1
return None
def set(self, chat_id: int, admins: List[User]) -> None:
"""
Сохраняет список админов в кэш.
Args:
chat_id: ID чата
admins: Список администраторов
"""
self._cache[chat_id] = (admins, datetime.now())
def invalidate(self, chat_id: Optional[int] = None) -> None:
"""
Инвалидирует кэш.
Args:
chat_id: ID чата (если None, очищает весь кэш)
"""
if chat_id is None:
self._cache.clear()
elif chat_id in self._cache:
del self._cache[chat_id]
def cleanup(self) -> int:
"""
Удаляет устаревшие записи.
Returns:
int: Количество удаленных записей
"""
now = datetime.now()
expired = [
chat_id for chat_id, (_, timestamp) in self._cache.items()
if now - timestamp >= timedelta(seconds=self.ttl)
]
for chat_id in expired:
del self._cache[chat_id]
return len(expired)
def get_stats(self) -> dict:
"""Возвращает статистику кэша"""
total = self.hits + self.misses
hit_rate = (self.hits / total * 100) if total > 0 else 0
return {
'hits': self.hits,
'misses': self.misses,
'hit_rate': f"{hit_rate:.1f}%",
'cached_chats': len(self._cache)
}
# Глобальный кэш
admin_cache = AdminCache(ttl=300)
async def get_admins_list(
bot: Bot,
chat_id: int,
exclude_bots: bool = True,
exclude_users: Optional[Set[int]] = None,
include_owner_only: bool = False,
use_cache: bool = True
) -> List[User]:
"""
Получает список администраторов чата.
Args:
bot: Экземпляр бота
chat_id: ID чата
exclude_bots: Исключить ботов
exclude_users: Множество ID пользователей для исключения
include_owner_only: Только владелец чата
use_cache: Использовать кэш
Returns:
List[User]: Список администраторов
Example:
>> admins = await get_admins_list(bot, chat_id)
>> print(f"Администраторов: {len(admins)}")
"""
# Проверяем кэш
if use_cache:
cached_admins = admin_cache.get(chat_id)
if cached_admins is not None:
admins = cached_admins.copy()
else:
# Загружаем из API
try:
chat_admins = await bot.get_chat_administrators(chat_id)
admins = [admin.user for admin in chat_admins]
# Сохраняем в кэш
admin_cache.set(chat_id, admins)
except (TelegramBadRequest, TelegramForbiddenError):
return []
else:
# Без кэша
try:
chat_admins = await bot.get_chat_administrators(chat_id)
admins = [admin.user for admin in chat_admins]
except (TelegramBadRequest, TelegramForbiddenError):
return []
# Фильтрация
filtered_admins = []
for admin_user in admins:
# Исключаем ботов
if exclude_bots and admin_user.is_bot:
continue
# Исключаем конкретных пользователей
if exclude_users and admin_user.id in exclude_users:
continue
filtered_admins.append(admin_user)
# Только владелец
if include_owner_only and filtered_admins:
# Получаем информацию о владельце
try:
chat_admins = await bot.get_chat_administrators(chat_id)
owner = next(
(admin.user for admin in chat_admins if isinstance(admin, ChatMemberOwner)),
None
)
if owner:
return [owner]
except:
pass
return filtered_admins
async def mention_admins(
bot: Bot,
chat_id: int,
text: str = "",
format_type: str = "hidden",
exclude_bots: bool = True,
exclude_users: Optional[Set[int]] = None,
separator: str = " ",
use_cache: bool = True
) -> str:
"""
Формирует текст с упоминанием всех администраторов.
Args:
bot: Экземпляр бота
chat_id: ID чата
text: Основной текст сообщения
format_type: Тип форматирования:
- 'hidden': Скрытые ссылки (невидимые)
- 'mention': HTML mentions (видимые имена)
- 'username': @username (только для пользователей с username)
- 'mixed': Mentions для пользователей с именами, hidden для остальных
exclude_bots: Исключить ботов
exclude_users: Множество ID пользователей для исключения
separator: Разделитель между mentions (для видимых форматов)
use_cache: Использовать кэш
Returns:
str: Отформатированный текст с упоминаниями
Example:
>> # Скрытые упоминания
>> text = await mention_admins(bot, chat_id, "Внимание, админы!")
>> await message.answer(text, parse_mode="HTML")
>> # Видимые упоминания
>> text = await mention_admins(bot, chat_id, "Админы:", format_type="mention")
>> await message.answer(text, parse_mode="HTML")
"""
# Получаем список админов
admins = await get_admins_list(
bot=bot,
chat_id=chat_id,
exclude_bots=exclude_bots,
exclude_users=exclude_users,
use_cache=use_cache
)
if not admins:
return text
# Формируем упоминания в зависимости от типа
mentions = []
if format_type == "hidden":
# Скрытые ссылки (невидимые)
for admin in admins:
mentions.append(hide_link(f"tg://user?id={admin.id}"))
# Объединяем все ссылки и добавляем текст
return "".join(mentions) + text
elif format_type == "mention":
# HTML mentions (видимые имена)
for admin in admins:
name = admin.full_name or admin.first_name or f"User {admin.id}"
mentions.append(hlink(name, f"tg://user?id={admin.id}"))
mentions_text = separator.join(mentions)
return f"{text}\n\n{mentions_text}" if text else mentions_text
elif format_type == "username":
# Только @username
for admin in admins:
if admin.username:
mentions.append(f"@{admin.username}")
if not mentions:
# Fallback на hidden если нет username
return await mention_admins(
bot, chat_id, text, format_type="hidden",
exclude_bots=exclude_bots, exclude_users=exclude_users
)
mentions_text = separator.join(mentions)
return f"{text}\n\n{mentions_text}" if text else mentions_text
elif format_type == "mixed":
# Mentions для пользователей с именами, hidden для остальных
hidden_links = []
visible_mentions = []
for admin in admins:
if admin.username:
# Видимый mention
name = admin.full_name or admin.first_name or f"@{admin.username}"
visible_mentions.append(hlink(name, f"tg://user?id={admin.id}"))
else:
# Скрытая ссылка
hidden_links.append(hide_link(f"tg://user?id={admin.id}"))
hidden_part = "".join(hidden_links)
visible_part = separator.join(visible_mentions)
if text:
if visible_part:
return f"{hidden_part}{text}\n\n{visible_part}"
else:
return f"{hidden_part}{text}"
else:
return f"{hidden_part}{visible_part}"
# По умолчанию - hidden
return text
async def mention_user(
user: User,
format_type: str = "mention",
show_username: bool = False
) -> str:
"""
Создает упоминание одного пользователя.
Args:
user: Объект пользователя
format_type: Тип форматирования ('mention', 'hidden', 'username')
show_username: Показывать username вместо имени (для mention)
Returns:
str: Отформатированное упоминание
Example:
>> mention = await mention_user(message.from_user)
>> await message.answer(f"Привет, {mention}!", parse_mode="HTML")
"""
if format_type == "hidden":
return hide_link(f"tg://user?id={user.id}")
elif format_type == "username":
if user.username:
return f"@{user.username}"
# Fallback на mention
return await mention_user(user, format_type="mention")
else: # mention
if show_username and user.username:
display_name = f"@{user.username}"
else:
display_name = user.full_name or user.first_name or f"User {user.id}"
return hlink(display_name, f"tg://user?id={user.id}")
async def mention_users(
users: List[User],
format_type: str = "mention",
separator: str = ", ",
max_count: Optional[int] = None
) -> str:
"""
Создает упоминания списка пользователей.
Args:
users: Список пользователей
format_type: Тип форматирования
separator: Разделитель между упоминаниями
max_count: Максимальное количество упоминаний (остальные как "и еще N")
Returns:
str: Отформатированные упоминания
Example:
>> users = [msg.from_user, ...]
>> mentions = await mention_users(users, max_count=5)
>> await message.answer(f"Участники: {mentions}", parse_mode="HTML")
"""
if not users:
return ""
# Ограничиваем количество
display_users = users[:max_count] if max_count else users
remaining = len(users) - len(display_users) if max_count else 0
# Создаем упоминания
mentions = []
for user in display_users:
mention = await mention_user(user, format_type=format_type)
mentions.append(mention)
result = separator.join(mentions)
# Добавляем "и еще N"
if remaining > 0:
result += f" и еще {remaining}"
return result
# ================= СПЕЦИАЛИЗИРОВАННЫЕ ФУНКЦИИ =================
async def mention_moderators(
bot: Bot,
chat_id: int,
text: str = "",
format_type: str = "hidden"
) -> str:
"""
Упоминает только модераторов (администраторов с правами на удаление/бан).
Args:
bot: Экземпляр бота
chat_id: ID чата
text: Текст сообщения
format_type: Тип форматирования
Returns:
str: Текст с упоминаниями модераторов
"""
try:
chat_admins = await bot.get_chat_administrators(chat_id)
# Фильтруем только модераторов
moderators = []
for admin in chat_admins:
if admin.user.is_bot:
continue
# Владелец всегда модератор
if isinstance(admin, ChatMemberOwner):
moderators.append(admin.user)
continue
# Проверяем права администратора
if isinstance(admin, ChatMemberAdministrator):
if admin.can_delete_messages and admin.can_restrict_members:
moderators.append(admin.user)
# Формируем упоминания
if format_type == "hidden":
mentions = "".join(hide_link(f"tg://user?id={mod.id}") for mod in moderators)
return f"{mentions}{text}"
else:
mentions = []
for mod in moderators:
name = mod.full_name or mod.first_name or f"Moderator {mod.id}"
mentions.append(hlink(name, f"tg://user?id={mod.id}"))
mentions_text = ", ".join(mentions)
return f"{text}\n\n{mentions_text}" if text else mentions_text
except (TelegramBadRequest, TelegramForbiddenError):
return text
async def mention_owner(
bot: Bot,
chat_id: int,
format_type: str = "mention"
) -> Optional[str]:
"""
Получает упоминание владельца чата.
Args:
bot: Экземпляр бота
chat_id: ID чата
format_type: Тип форматирования
Returns:
Optional[str]: Упоминание владельца или None
"""
try:
chat_admins = await bot.get_chat_administrators(chat_id)
owner = next(
(admin.user for admin in chat_admins if isinstance(admin, ChatMemberOwner)),
None
)
if owner:
return await mention_user(owner, format_type=format_type)
except (TelegramBadRequest, TelegramForbiddenError):
pass
return None
# Алиас для обратной совместимости
async def hidden_admins_message(message: Message, text: str = "") -> str:
"""
Алиас для mention_admins с format_type="hidden".
DEPRECATED: Используйте mention_admins() вместо этого.
"""
from bot import bot
return await mention_admins(
bot=bot,
chat_id=message.chat.id,
text=text,
format_type="hidden"
)

650
bot/utils/state_utils.py Normal file
View File

@@ -0,0 +1,650 @@
"""
Утилиты для работы с FSM состояниями и обновлениями
"""
from typing import Optional, Any, Set, Union
from contextlib import suppress
from aiogram.fsm.context import FSMContext
from aiogram.fsm.state import State
from aiogram.types import CallbackQuery, Message, ReplyKeyboardRemove
from aiogram.exceptions import TelegramBadRequest
from middleware.loggers import logger
__all__ = (
'clear_state',
'answer_callback',
'safe_answer_callback',
'safe_delete_message',
'safe_edit_message',
'clear_state_keep_data',
'get_state_data',
'set_state_data',
'update_state_data',
'is_state_active',
'inline_clear',
'status_clear',
'delete_messages',
'set_state_with_data',
'get_or_create_data',
'increment_state_value',
'append_to_state_list',
'remove_from_state_list',
'toggle_state_flag',
'debug_state'
)
# ================= РАБОТА С FSM СОСТОЯНИЯМИ =================
async def clear_state(
state: FSMContext,
log: bool = True
) -> None:
"""
Очищает FSM состояние.
Args:
state: Контекст FSM
log: Логировать очистку
Example:
>> await clear_state(state)
"""
current_state = await state.get_state()
if log and current_state:
logger.debug(
f"Очистка FSM состояния: {current_state}",
log_type='FSM'
)
await state.clear()
async def clear_state_keep_data(
state: FSMContext,
keep_keys: Optional[Set[str]] = None
) -> None:
"""
Очищает FSM состояние, но сохраняет определенные данные.
Args:
state: Контекст FSM
keep_keys: Множество ключей для сохранения
Example:
>> # Очищаем состояние, но сохраняем user_id и language
>> await clear_state_keep_data(state, keep_keys={'user_id', 'language'})
"""
if keep_keys:
# Получаем текущие данные
current_data = await state.get_data()
# Сохраняем только нужные ключи
saved_data = {
key: value for key, value in current_data.items()
if key in keep_keys
}
# Очищаем состояние
await state.clear()
# Восстанавливаем сохраненные данные
if saved_data:
await state.update_data(**saved_data)
logger.debug(
f"FSM очищен, сохранены ключи: {', '.join(keep_keys)}",
log_type='FSM'
)
else:
await state.clear()
async def get_state_data(
state: FSMContext,
key: Optional[str] = None,
default: Any = None
) -> Any:
"""
Получает данные из FSM состояния.
Args:
state: Контекст FSM
key: Ключ для получения (если None, возвращает все данные)
default: Значение по умолчанию
Returns:
Any: Данные из состояния
Example:
>> # Получить все данные
>> data = await get_state_data(state)
>> # Получить конкретный ключ
>> user_id = await get_state_data(state, 'user_id')
>> # С значением по умолчанию
>> lang = await get_state_data(state, 'language', default='ru')
"""
data = await state.get_data()
if key is None:
return data
return data.get(key, default)
async def set_state_data(
state: FSMContext,
key: str,
value: Any
) -> None:
"""
Устанавливает данные в FSM состояние.
Args:
state: Контекст FSM
key: Ключ
value: Значение
Example:
>> await set_state_data(state, 'user_id', 123456789)
"""
await state.update_data(**{key: value})
async def update_state_data(
state: FSMContext,
**kwargs
) -> None:
"""
Обновляет несколько полей в FSM состоянии.
Args:
state: Контекст FSM
**kwargs: Пары ключ-значение для обновления
Example:
>> await update_state_data(
... state,
... user_id=123456789,
... language='ru',
... step=1
... )
"""
await state.update_data(**kwargs)
async def is_state_active(state: FSMContext) -> bool:
"""
Проверяет, активно ли какое-либо состояние.
Args:
state: Контекст FSM
Returns:
bool: True если есть активное состояние
Example:
>> if await is_state_active(state):
... await message.answer("У вас есть незавершенное действие")
"""
current_state = await state.get_state()
return current_state is not None
# ================= РАБОТА С CALLBACK QUERIES =================
async def answer_callback(
callback: CallbackQuery,
text: Optional[str] = None,
show_alert: bool = False,
cache_time: int = 0
) -> bool:
"""
Отвечает на callback query.
Args:
callback: Callback query
text: Текст уведомления
show_alert: Показать как alert
cache_time: Время кэширования
Returns:
bool: True если успешно
Example:
>> await answer_callback(callback, "✅ Готово!")
>> await answer_callback(callback, "⚠️ Ошибка", show_alert=True)
"""
try:
await callback.answer(text=text, show_alert=show_alert, cache_time=cache_time)
return True
except TelegramBadRequest as e:
logger.warning(
f"Не удалось ответить на callback: {e}",
log_type='CALLBACK'
)
return False
async def safe_answer_callback(
callback: CallbackQuery,
text: Optional[str] = None,
show_alert: bool = False
) -> None:
"""
Безопасно отвечает на callback query (подавляет ошибки).
Args:
callback: Callback query
text: Текст уведомления
show_alert: Показать как alert
Example:
>> await safe_answer_callback(callback, "✅ Готово!")
"""
with suppress(TelegramBadRequest):
await callback.answer(text=text, show_alert=show_alert)
# ================= РАБОТА С СООБЩЕНИЯМИ =================
async def safe_delete_message(
message: Message,
log: bool = False
) -> bool:
"""
Безопасно удаляет сообщение.
Args:
message: Сообщение для удаления
log: Логировать попытку удаления
Returns:
bool: True если успешно удалено
Example:
>> await safe_delete_message(message)
"""
try:
await message.delete()
if log:
logger.debug(
f"Сообщение удалено: {message.message_id}",
log_type='MESSAGE'
)
return True
except TelegramBadRequest as e:
if log:
logger.warning(
f"Не удалось удалить сообщение: {e}",
log_type='MESSAGE'
)
return False
async def safe_edit_message(
message: Message,
text: str,
**kwargs
) -> bool:
"""
Безопасно редактирует сообщение.
Args:
message: Сообщение для редактирования
text: Новый текст
**kwargs: Дополнительные параметры (reply_markup, parse_mode, и т.д.)
Returns:
bool: True если успешно отредактировано
Example:
>> await safe_edit_message(
... message,
... "Новый текст",
... parse_mode="HTML"
... )
"""
try:
await message.edit_text(text, **kwargs)
return True
except TelegramBadRequest as e:
logger.warning(
f"Не удалось отредактировать сообщение: {e}",
log_type='MESSAGE'
)
return False
async def delete_messages(
chat_id: int,
message_ids: list[int],
bot
) -> int:
"""
Удаляет несколько сообщений.
Args:
chat_id: ID чата
message_ids: Список ID сообщений
bot: Экземпляр бота
Returns:
int: Количество успешно удаленных сообщений
Example:
>> deleted = await delete_messages(
... chat_id=message.chat.id,
... message_ids=[123, 124, 125],
... bot=bot
... )
>> print(f"Удалено {deleted} сообщений")
"""
deleted_count = 0
for message_id in message_ids:
try:
await bot.delete_message(chat_id=chat_id, message_id=message_id)
deleted_count += 1
except TelegramBadRequest:
pass
return deleted_count
# ================= КОМБИНИРОВАННЫЕ ФУНКЦИИ =================
async def inline_clear(update: Union[Message, CallbackQuery]) -> None:
"""
Очищает все инлайн взаимодействия (отвечает на callback).
Args:
update: Объект обновления (Message или CallbackQuery)
Example:
>> await inline_clear(callback)
"""
if isinstance(update, CallbackQuery):
await safe_answer_callback(update)
async def status_clear(
update: Union[Message, CallbackQuery],
state: FSMContext,
keep_data: Optional[Set[str]] = None,
remove_keyboard: bool = False
) -> None:
"""
Полная очистка: состояние FSM + ответ на callback + удаление клавиатуры.
Args:
update: Объект обновления
state: Контекст FSM
keep_data: Данные для сохранения
remove_keyboard: Удалить клавиатуру (только для Message)
Example:
>> # Полная очистка
>> await status_clear(message, state)
>> # С сохранением данных
>> await status_clear(
... callback,
... state,
... keep_data={'user_id', 'language'}
... )
>> # С удалением клавиатуры
>> await status_clear(message, state, remove_keyboard=True)
"""
# Очищаем состояние
if keep_data:
await clear_state_keep_data(state, keep_keys=keep_data)
else:
await clear_state(state, log=True)
# Отвечаем на callback
await inline_clear(update)
# Удаляем клавиатуру если нужно
if remove_keyboard and isinstance(update, Message):
with suppress(TelegramBadRequest):
await update.answer(
"Отменено",
reply_markup=ReplyKeyboardRemove()
)
# ================= УТИЛИТЫ ДЛЯ РАБОТЫ С СОСТОЯНИЯМИ =================
async def set_state_with_data(
state: FSMContext,
new_state: State,
**data
) -> None:
"""
Устанавливает новое состояние и данные одновременно.
Args:
state: Контекст FSM
new_state: Новое состояние
**data: Данные для сохранения
Example:
>> await set_state_with_data(
... state,
... FormStates.waiting_name,
... user_id=123456789,
... step=1
... )
"""
await state.set_state(new_state)
if data:
await state.update_data(**data)
logger.debug(
f"Установлено состояние: {new_state.state}",
log_type='FSM'
)
async def get_or_create_data(
state: FSMContext,
key: str,
factory: Any
) -> Any:
"""
Получает данные из состояния или создает их если их нет.
Args:
state: Контекст FSM
key: Ключ данных
factory: Значение по умолчанию или функция для создания
Returns:
Any: Данные из состояния или созданные
Example:
>> # С простым значением
>> items = await get_or_create_data(state, 'items', [])
>> # С функцией
>> data = await get_or_create_data(state, 'data', lambda: {'count': 0})
"""
data = await state.get_data()
if key not in data:
# Создаем значение
if callable(factory):
value = factory()
else:
value = factory
await state.update_data(**{key: value})
return value
return data[key]
async def increment_state_value(
state: FSMContext,
key: str,
amount: int = 1
) -> int:
"""
Инкрементирует числовое значение в состоянии.
Args:
state: Контекст FSM
key: Ключ значения
amount: Величина инкремента
Returns:
int: Новое значение
Example:
>> # Увеличиваем счетчик
>> new_count = await increment_state_value(state, 'attempts')
>> if new_count >= 3:
... await message.answer("Слишком много попыток!")
"""
data = await state.get_data()
current = data.get(key, 0)
new_value = current + amount
await state.update_data(**{key: new_value})
return new_value
async def append_to_state_list(
state: FSMContext,
key: str,
value: Any
) -> list:
"""
Добавляет значение в список в состоянии.
Args:
state: Контекст FSM
key: Ключ списка
value: Значение для добавления
Returns:
list: Обновленный список
Example:
>> # Добавляем товар в корзину
>> cart = await append_to_state_list(state, 'cart', product_id)
>> await message.answer(f"В корзине {len(cart)} товаров")
"""
data = await state.get_data()
current_list = data.get(key, [])
if not isinstance(current_list, list):
current_list = []
current_list.append(value)
await state.update_data(**{key: current_list})
return current_list
async def remove_from_state_list(
state: FSMContext,
key: str,
value: Any
) -> list:
"""
Удаляет значение из списка в состоянии.
Args:
state: Контекст FSM
key: Ключ списка
value: Значение для удаления
Returns:
list: Обновленный список
Example:
>> # Удаляем товар из корзины
>> cart = await remove_from_state_list(state, 'cart', product_id)
"""
data = await state.get_data()
current_list = data.get(key, [])
if isinstance(current_list, list) and value in current_list:
current_list.remove(value)
await state.update_data(**{key: current_list})
return current_list
async def toggle_state_flag(
state: FSMContext,
key: str
) -> bool:
"""
Переключает boolean флаг в состоянии.
Args:
state: Контекст FSM
key: Ключ флага
Returns:
bool: Новое значение флага
Example:
>> # Переключаем режим
>> is_active = await toggle_state_flag(state, 'notifications')
>> await message.answer(
... f"Уведомления: {'включены' if is_active else 'выключены'}"
... )
"""
data = await state.get_data()
current = data.get(key, False)
new_value = not current
await state.update_data(**{key: new_value})
return new_value
# ================= ОТЛАДКА =================
async def debug_state(state: FSMContext) -> str:
"""
Возвращает отладочную информацию о состоянии.
Args:
state: Контекст FSM
Returns:
str: Форматированная информация о состоянии
Example:
>> debug_info = await debug_state(state)
>> print(debug_info)
"""
current_state = await state.get_state()
data = await state.get_data()
lines = [
"🔍 <b>Debug FSM:</b>\n",
f"📊 Состояние: <code>{current_state or 'None'}</code>\n",
f"📦 Данных: {len(data)}\n"
]
if data:
lines.append("\n<b>Данные:</b>")
for key, value in data.items():
value_str = str(value)
if len(value_str) > 50:
value_str = value_str[:50] + "..."
lines.append(f"{key}: <code>{value_str}</code>")
return "\n".join(lines)

613
bot/utils/type_message.py Normal file
View File

@@ -0,0 +1,613 @@
"""
Утилиты для работы с типами контента и чатов
"""
from typing import Final, Optional, Dict, Any
from enum import Enum
from aiogram.types import Message
from aiogram.enums import ContentType, ChatType
__all__ = (
'CHAT_TYPES_RU',
'CONTENT_TYPES_RU',
'CONTENT_EMOJI',
'get_chat_type',
'get_content_type',
'get_content_text',
'get_content_emoji',
'get_media_info',
'has_media',
'has_text',
'format_content_info',
'ContentCategory',
'get_content_category',
'is_private_chat',
'is_group_chat',
'is_channel',
'type_msg',
'type_chat'
)
# ==================== КОНСТАНТЫ ====================
# Типы чатов на русском
CHAT_TYPES_RU: Final[Dict[str, str]] = {
ChatType.PRIVATE: "Личные сообщения",
ChatType.GROUP: "Группа",
ChatType.SUPERGROUP: "Супергруппа",
ChatType.CHANNEL: "Канал",
"private": "Личные сообщения",
"group": "Группа",
"supergroup": "Супергруппа",
"channel": "Канал",
}
# Типы контента на русском
CONTENT_TYPES_RU: Final[Dict[str, str]] = {
# Текст и медиа
ContentType.TEXT: "Текст",
ContentType.ANIMATION: "GIF анимация",
ContentType.AUDIO: "Аудиофайл",
ContentType.DOCUMENT: "Документ",
ContentType.PHOTO: "Фотография",
ContentType.STICKER: "Стикер",
ContentType.VIDEO: "Видео",
ContentType.VIDEO_NOTE: "Видеосообщение",
ContentType.VOICE: "Голосовое сообщение",
# Контакты и локации
ContentType.CONTACT: "Контакт",
ContentType.LOCATION: "Геолокация",
ContentType.VENUE: "Место на карте",
# Игры и развлечения
ContentType.DICE: "Игральная кость",
ContentType.GAME: "Игра",
ContentType.POLL: "Опрос",
# События чата
ContentType.NEW_CHAT_MEMBERS: "Новые участники",
ContentType.LEFT_CHAT_MEMBER: "Участник покинул чат",
ContentType.NEW_CHAT_TITLE: "Изменено название чата",
ContentType.NEW_CHAT_PHOTO: "Изменена аватарка чата",
ContentType.DELETE_CHAT_PHOTO: "Удалена аватарка чата",
ContentType.GROUP_CHAT_CREATED: "Группа создана",
ContentType.SUPERGROUP_CHAT_CREATED: "Супергруппа создана",
ContentType.CHANNEL_CHAT_CREATED: "Канал создан",
ContentType.MESSAGE_AUTO_DELETE_TIMER_CHANGED: "Изменён таймер автоудаления",
ContentType.MIGRATE_TO_CHAT_ID: "Миграция в супергруппу",
ContentType.MIGRATE_FROM_CHAT_ID: "Миграция из группы",
ContentType.PINNED_MESSAGE: "Закреплено сообщение",
# Платежи
ContentType.INVOICE: "Счёт на оплату",
ContentType.SUCCESSFUL_PAYMENT: "Успешная оплата",
# Другое
ContentType.CONNECTED_WEBSITE: "Подключён сайт",
ContentType.PASSPORT_DATA: "Данные Telegram Passport",
ContentType.PROXIMITY_ALERT_TRIGGERED: "Сработал алерт приближения",
# Видеочаты
ContentType.VIDEO_CHAT_SCHEDULED: "Запланирован видеочат",
ContentType.VIDEO_CHAT_STARTED: "Начался видеочат",
ContentType.VIDEO_CHAT_ENDED: "Завершён видеочат",
ContentType.VIDEO_CHAT_PARTICIPANTS_INVITED: "Приглашены в видеочат",
# Web App
ContentType.WEB_APP_DATA: "Данные Web App",
# Форумы
ContentType.FORUM_TOPIC_CREATED: "Создана тема форума",
ContentType.FORUM_TOPIC_EDITED: "Изменена тема форума",
ContentType.FORUM_TOPIC_CLOSED: "Закрыта тема форума",
ContentType.FORUM_TOPIC_REOPENED: "Открыта тема форума",
ContentType.GENERAL_FORUM_TOPIC_HIDDEN: "Скрыта общая тема",
ContentType.GENERAL_FORUM_TOPIC_UNHIDDEN: "Показана общая тема",
# Розыгрыши
ContentType.GIVEAWAY_CREATED: "Создан розыгрыш",
ContentType.GIVEAWAY: "Розыгрыш",
ContentType.GIVEAWAY_WINNERS: "Победители розыгрыша",
ContentType.GIVEAWAY_COMPLETED: "Завершён розыгрыш",
# Истории и реакции
ContentType.STORY: "История",
}
# Эмодзи для типов контента
CONTENT_EMOJI: Final[Dict[str, str]] = {
ContentType.TEXT: "💬",
ContentType.ANIMATION: "🎞️",
ContentType.AUDIO: "🎵",
ContentType.DOCUMENT: "📄",
ContentType.PHOTO: "📷",
ContentType.STICKER: "🎨",
ContentType.VIDEO: "🎥",
ContentType.VIDEO_NOTE: "🎬",
ContentType.VOICE: "🎤",
ContentType.CONTACT: "👤",
ContentType.LOCATION: "📍",
ContentType.VENUE: "🏢",
ContentType.DICE: "🎲",
ContentType.GAME: "🎮",
ContentType.POLL: "📊",
ContentType.INVOICE: "💰",
ContentType.SUCCESSFUL_PAYMENT: "",
}
class ContentCategory(str, Enum):
"""Категории контента"""
TEXT = "text" # Текстовые сообщения
MEDIA = "media" # Медиа (фото, видео, и т.д.)
FILE = "file" # Файлы и документы
VOICE = "voice" # Голосовые сообщения
LOCATION = "location" # Локации и места
INTERACTION = "interaction" # Игры, опросы, кости
SERVICE = "service" # Служебные сообщения
PAYMENT = "payment" # Платежи
UNKNOWN = "unknown" # Неизвестный тип
# ==================== ОСНОВНЫЕ ФУНКЦИИ ====================
def get_chat_type(message: Message, russian: bool = True) -> str:
"""
Возвращает тип чата.
Args:
message: Объект сообщения
russian: Вернуть на русском языке
Returns:
str: Тип чата
Example:
>>> get_chat_type(message)
'Личные сообщения'
>>> get_chat_type(message, russian=False)
'private'
"""
chat_type = message.chat.type
if russian:
return CHAT_TYPES_RU.get(chat_type, f"Неизвестный тип ({chat_type})")
return chat_type
def get_content_type(message: Message, russian: bool = True) -> str:
"""
Возвращает тип контента сообщения.
Args:
message: Объект сообщения
russian: Вернуть на русском языке
Returns:
str: Тип контента
Example:
>>> get_content_type(message)
'Фотография'
>>> get_content_type(message, russian=False)
'photo'
"""
content_type = message.content_type
if russian:
return CONTENT_TYPES_RU.get(content_type, f"Неизвестный тип ({content_type})")
return content_type
def get_content_emoji(message: Message) -> str:
"""
Возвращает эмодзи для типа контента.
Args:
message: Объект сообщения
Returns:
str: Эмодзи
Example:
>>> get_content_emoji(message)
'📷'
"""
return CONTENT_EMOJI.get(message.content_type, "📎")
def get_content_text(message: Message, max_length: Optional[int] = None) -> Optional[str]:
"""
Извлекает текст из сообщения (текст или caption).
Args:
message: Объект сообщения
max_length: Максимальная длина текста (обрезает если больше)
Returns:
Optional[str]: Текст сообщения или None
Example:
>>> get_content_text(message)
'Привет, мир!'
>>> get_content_text(message) # Фото с подписью
'Красивое фото'
>>> get_content_text(message, max_length=10)
'Привет,...'
"""
text = message.text or message.caption
if text and max_length and len(text) > max_length:
return f"{text[:max_length]}..."
return text
def has_media(message: Message) -> bool:
"""
Проверяет, содержит ли сообщение медиа.
Args:
message: Объект сообщения
Returns:
bool: True если есть медиа
Example:
>>> has_media(message)
True
"""
media_types = {
ContentType.PHOTO,
ContentType.VIDEO,
ContentType.ANIMATION,
ContentType.AUDIO,
ContentType.VOICE,
ContentType.VIDEO_NOTE,
ContentType.DOCUMENT,
ContentType.STICKER
}
return message.content_type in media_types
def has_text(message: Message) -> bool:
"""
Проверяет, есть ли в сообщении текст (или caption).
Args:
message: Объект сообщения
Returns:
bool: True если есть текст
Example:
>>> has_text(message)
True
"""
return bool(message.text or message.caption)
# ==================== ДЕТАЛЬНАЯ ИНФОРМАЦИЯ О МЕДИА ====================
def get_media_info(message: Message) -> Dict[str, Any]:
"""
Возвращает детальную информацию о медиа в сообщении.
Args:
message: Объект сообщения
Returns:
Dict: Словарь с информацией о медиа
Example:
>>> get_media_info(message)
{
'type': 'photo',
'type_ru': 'Фотография',
'emoji': '📷',
'has_caption': True,
'caption': 'Красивое фото',
'file_size': 123456,
'file_size_mb': 0.12,
'width': 1920,
'height': 1080,
'duration': None
}
"""
info = {
'type': message.content_type,
'type_ru': get_content_type(message),
'emoji': get_content_emoji(message),
'has_caption': bool(message.caption),
'caption': message.caption,
'has_text': bool(message.text),
'text': message.text,
}
# Фото
if message.photo:
largest_photo = max(message.photo, key=lambda p: p.file_size or 0)
info.update({
'file_id': largest_photo.file_id,
'file_unique_id': largest_photo.file_unique_id,
'file_size': largest_photo.file_size,
'file_size_kb': round(largest_photo.file_size / 1024, 2) if largest_photo.file_size else None,
'width': largest_photo.width,
'height': largest_photo.height,
'count': len(message.photo) # Количество размеров
})
# Видео
elif message.video:
info.update({
'file_id': message.video.file_id,
'file_unique_id': message.video.file_unique_id,
'file_size': message.video.file_size,
'file_size_mb': round(message.video.file_size / (1024 * 1024), 2) if message.video.file_size else None,
'width': message.video.width,
'height': message.video.height,
'duration': message.video.duration,
'duration_formatted': _format_duration(message.video.duration) if message.video.duration else None,
'mime_type': message.video.mime_type,
'file_name': message.video.file_name
})
# Документ
elif message.document:
info.update({
'file_id': message.document.file_id,
'file_unique_id': message.document.file_unique_id,
'file_size': message.document.file_size,
'file_size_mb': round(message.document.file_size / (1024 * 1024),
2) if message.document.file_size else None,
'file_name': message.document.file_name,
'mime_type': message.document.mime_type
})
# Аудио
elif message.audio:
info.update({
'file_id': message.audio.file_id,
'file_unique_id': message.audio.file_unique_id,
'file_size': message.audio.file_size,
'file_size_mb': round(message.audio.file_size / (1024 * 1024), 2) if message.audio.file_size else None,
'duration': message.audio.duration,
'duration_formatted': _format_duration(message.audio.duration) if message.audio.duration else None,
'performer': message.audio.performer,
'title': message.audio.title,
'mime_type': message.audio.mime_type,
'file_name': message.audio.file_name
})
# Голосовое сообщение
elif message.voice:
info.update({
'file_id': message.voice.file_id,
'file_unique_id': message.voice.file_unique_id,
'file_size': message.voice.file_size,
'file_size_kb': round(message.voice.file_size / 1024, 2) if message.voice.file_size else None,
'duration': message.voice.duration,
'duration_formatted': _format_duration(message.voice.duration) if message.voice.duration else None,
'mime_type': message.voice.mime_type
})
# Видеосообщение
elif message.video_note:
info.update({
'file_id': message.video_note.file_id,
'file_unique_id': message.video_note.file_unique_id,
'file_size': message.video_note.file_size,
'file_size_kb': round(message.video_note.file_size / 1024, 2) if message.video_note.file_size else None,
'duration': message.video_note.duration,
'duration_formatted': _format_duration(
message.video_note.duration) if message.video_note.duration else None,
'length': message.video_note.length # Диаметр
})
# Анимация (GIF)
elif message.animation:
info.update({
'file_id': message.animation.file_id,
'file_unique_id': message.animation.file_unique_id,
'file_size': message.animation.file_size,
'file_size_mb': round(message.animation.file_size / (1024 * 1024),
2) if message.animation.file_size else None,
'width': message.animation.width,
'height': message.animation.height,
'duration': message.animation.duration,
'duration_formatted': _format_duration(message.animation.duration) if message.animation.duration else None,
'mime_type': message.animation.mime_type,
'file_name': message.animation.file_name
})
# Стикер
elif message.sticker:
info.update({
'file_id': message.sticker.file_id,
'file_unique_id': message.sticker.file_unique_id,
'file_size': message.sticker.file_size,
'width': message.sticker.width,
'height': message.sticker.height,
'is_animated': message.sticker.is_animated,
'is_video': message.sticker.is_video,
'emoji': message.sticker.emoji,
'set_name': message.sticker.set_name
})
return info
def format_content_info(message: Message, include_text: bool = True, max_text_length: int = 50) -> str:
"""
Форматирует информацию о контенте в читаемую строку.
Args:
message: Объект сообщения
include_text: Включать текст/caption в описание
max_text_length: Максимальная длина текста
Returns:
str: Отформатированная строка
Example:
>>> format_content_info(message)
'📷 Фотография (1920x1080, 123 KB) + "Красивое фото"'
>>> format_content_info(message)
'🎥 Видео (1920x1080, 5.2 MB, 1:30) + "Смотрите это видео"'
"""
emoji = get_content_emoji(message)
content_type = get_content_type(message)
parts = [f"{emoji} {content_type}"]
# Добавляем детали медиа
if message.photo:
largest = max(message.photo, key=lambda p: p.file_size or 0)
size_kb = largest.file_size / 1024 if largest.file_size else 0
parts.append(f"({largest.width}x{largest.height}, {size_kb:.1f} KB)")
elif message.video:
size_mb = message.video.file_size / (1024 * 1024) if message.video.file_size else 0
duration = _format_duration(message.video.duration) if message.video.duration else "?"
parts.append(f"({message.video.width}x{message.video.height}, {size_mb:.1f} MB, {duration})")
elif message.document:
size_mb = message.document.file_size / (1024 * 1024) if message.document.file_size else 0
file_name = message.document.file_name or "без имени"
parts.append(f'("{file_name}", {size_mb:.2f} MB)')
elif message.audio:
duration = _format_duration(message.audio.duration) if message.audio.duration else "?"
title = message.audio.title or "без названия"
parts.append(f'("{title}", {duration})')
elif message.voice:
duration = _format_duration(message.voice.duration) if message.voice.duration else "?"
parts.append(f"({duration})")
elif message.video_note:
duration = _format_duration(message.video_note.duration) if message.video_note.duration else "?"
parts.append(f"({duration})")
elif message.sticker:
emoji_text = message.sticker.emoji or ""
parts.append(f"({emoji_text})")
# Добавляем текст/caption
if include_text:
text = get_content_text(message, max_length=max_text_length)
if text:
parts.append(f'+ "{text}"')
return ' '.join(parts)
def get_content_category(message: Message) -> ContentCategory:
"""
Определяет категорию контента.
Args:
message: Объект сообщения
Returns:
ContentCategory: Категория контента
Example:
>>> get_content_category(message)
ContentCategory.MEDIA
"""
content_type = message.content_type
# Текст
if content_type == ContentType.TEXT:
return ContentCategory.TEXT
# Медиа
if content_type in {ContentType.PHOTO, ContentType.VIDEO, ContentType.ANIMATION, ContentType.STICKER}:
return ContentCategory.MEDIA
# Файлы
if content_type in {ContentType.DOCUMENT, ContentType.AUDIO}:
return ContentCategory.FILE
# Голосовые
if content_type in {ContentType.VOICE, ContentType.VIDEO_NOTE}:
return ContentCategory.VOICE
# Локации
if content_type in {ContentType.LOCATION, ContentType.VENUE}:
return ContentCategory.LOCATION
# Интерактивные
if content_type in {ContentType.DICE, ContentType.GAME, ContentType.POLL}:
return ContentCategory.INTERACTION
# Платежи
if content_type in {ContentType.INVOICE, ContentType.SUCCESSFUL_PAYMENT}:
return ContentCategory.PAYMENT
# Служебные
if content_type in {
ContentType.NEW_CHAT_MEMBERS,
ContentType.LEFT_CHAT_MEMBER,
ContentType.NEW_CHAT_TITLE,
ContentType.PINNED_MESSAGE
}:
return ContentCategory.SERVICE
return ContentCategory.UNKNOWN
# ==================== ВСПОМОГАТЕЛЬНЫЕ ФУНКЦИИ ====================
def _format_duration(seconds: int) -> str:
"""
Форматирует длительность в читаемый вид.
Args:
seconds: Длительность в секундах
Returns:
str: Отформатированная строка (MM:SS или HH:MM:SS)
Example:
>>> _format_duration(90)
'1:30'
>>> _format_duration(3661)
'1:01:01'
"""
hours = seconds // 3600
minutes = (seconds % 3600) // 60
secs = seconds % 60
if hours > 0:
return f"{hours}:{minutes:02d}:{secs:02d}"
else:
return f"{minutes}:{secs:02d}"
def is_private_chat(message: Message) -> bool:
"""Проверяет, является ли чат личным"""
return message.chat.type == ChatType.PRIVATE
def is_group_chat(message: Message) -> bool:
"""Проверяет, является ли чат группой"""
return message.chat.type in {ChatType.GROUP, ChatType.SUPERGROUP}
def is_channel(message: Message) -> bool:
"""Проверяет, является ли чат каналом"""
return message.chat.type == ChatType.CHANNEL
# Алиасы для обратной совместимости
type_msg = get_content_type
type_chat = get_chat_type

409
bot/utils/usernames.py Normal file
View File

@@ -0,0 +1,409 @@
"""
Утилиты для работы с информацией о пользователях
"""
from typing import Optional, Union
from enum import Enum
from aiogram.types import Message, CallbackQuery, User, InlineQuery, ChatMemberUpdated
__all__ = (
'get_user_display_name',
'get_user_mention',
'get_user_id',
'username',
'format_user',
'UserFormat',
'is_bot',
'has_username',
'is_premium',
'get_language_code',
'compare_users',
'get_user_info_dict'
)
class UserFormat(str, Enum):
"""Форматы отображения пользователя"""
USERNAME = 'username' # @username или @id123
FULL_NAME = 'full_name' # Имя Фамилия
MENTION = 'mention' # HTML mention
MENTION_MARKDOWN = 'markdown' # Markdown mention
FIRST_NAME = 'first_name' # Только имя
ID_ONLY = 'id' # Только ID
DETAILED = 'detailed' # @username (Имя Фамилия, ID: 123)
# Тип для всех событий с пользователем
EventType = Union[Message, CallbackQuery, InlineQuery, ChatMemberUpdated]
def _extract_user(event: EventType) -> Optional[User]:
"""
Извлекает объект User из события.
Args:
event: Объект события
Returns:
User или None
"""
if isinstance(event, (Message, CallbackQuery, InlineQuery)):
return event.from_user
elif isinstance(event, ChatMemberUpdated):
return event.from_user or event.new_chat_member.user
return None
def get_user_display_name(
event: EventType,
default: str = "Unknown User"
) -> str:
"""
Возвращает отображаемое имя пользователя (Full Name).
Args:
event: Объект события (Message, CallbackQuery, и т.д.)
default: Значение по умолчанию если пользователь не найден
Returns:
str: Полное имя пользователя
Example:
>> get_user_display_name(message)
'John Doe'
>> get_user_display_name(message)
'John' # Если нет фамилии
"""
user = _extract_user(event)
if not user:
return default
# Полное имя (приоритет)
if user.full_name:
return user.full_name
# Только имя
if user.first_name:
return user.first_name
# Username как запасной вариант
if user.username:
return f"@{user.username}"
# ID как последний вариант
return f"User {user.id}"
def get_user_mention(
event: EventType,
parse_mode: str = 'HTML',
show_username: bool = False
) -> str:
"""
Возвращает упоминание пользователя (кликабельное).
Args:
event: Объект события
parse_mode: Режим парсинга ('HTML' или 'Markdown')
show_username: Показывать username вместо имени
Returns:
str: HTML/Markdown упоминание
Example:
>> get_user_mention(message)
'<a href="tg://user?id=123456789">John Doe</a>'
>> get_user_mention(message, parse_mode='Markdown')
'[John Doe](tg://user?id=123456789)'
>> get_user_mention(message, show_username=True)
'<a href="tg://user?id=123456789">@johndoe</a>'
"""
user = _extract_user(event)
if not user:
return "Unknown User"
# Определяем текст для отображения
if show_username and user.username:
display_text = f"@{user.username}"
else:
display_text = user.full_name or user.first_name or f"User {user.id}"
# Формируем ссылку
user_link = f"tg://user?id={user.id}"
if parse_mode.upper() == 'HTML':
return f'<a href="{user_link}">{display_text}</a>'
elif parse_mode.upper() in ('MARKDOWN', 'MARKDOWNV2'):
# Экранируем специальные символы для Markdown
display_text = display_text.replace('[', '\\[').replace(']', '\\]')
return f'[{display_text}]({user_link})'
else:
return display_text
def get_user_id(event: EventType) -> Optional[int]:
"""
Возвращает ID пользователя.
Args:
event: Объект события
Returns:
int или None: ID пользователя
Example:
>> get_user_id(message)
123456789
"""
user = _extract_user(event)
return user.id if user else None
def username(
event: EventType,
with_at: bool = True,
fallback_to_id: bool = True
) -> str:
"""
Возвращает username пользователя или ID если username отсутствует.
Это основная функция для получения идентификатора пользователя
в формате @username или @id123.
Args:
event: Объект события (Message, CallbackQuery, и т.д.)
with_at: Добавлять @ в начало
fallback_to_id: Использовать ID если нет username
Returns:
str: Username или ID пользователя
Raises:
ValueError: Если информация о пользователе отсутствует
Example:
>> username(message)
'@johndoe'
>> username(message) # Нет username
'@123456789'
>> username(message, with_at=False)
'johndoe'
>> username(message, fallback_to_id=False)
'' # Если нет username
"""
user = _extract_user(event)
if not user:
raise ValueError("Информация о пользователе отсутствует в событии")
# Если есть username
if user.username:
return f"@{user.username}" if with_at else user.username
# Fallback на ID
if fallback_to_id:
return f"@{user.id}" if with_at else str(user.id)
# Если ничего нет
return ""
def format_user(
event: EventType,
format_type: UserFormat = UserFormat.USERNAME,
default: str = "@System"
) -> str:
"""
Универсальная функция форматирования пользователя.
Args:
event: Объект события
format_type: Тип форматирования (из enum UserFormat)
default: Значение по умолчанию
Returns:
str: Отформатированная информация о пользователе
Example:
>> format_user(message, UserFormat.USERNAME)
'@johndoe'
>> format_user(message, UserFormat.FULL_NAME)
'John Doe'
>> format_user(message, UserFormat.MENTION)
'<a href="tg://user?id=123">John Doe</a>'
>> format_user(message, UserFormat.DETAILED)
'@johndoe (John Doe, ID: 123456789)'
"""
user = _extract_user(event)
if not user:
return default
# USERNAME: @username или @id
if format_type == UserFormat.USERNAME:
if user.username:
return f"@{user.username}"
return f"@{user.id}"
# FULL_NAME: Имя Фамилия
elif format_type == UserFormat.FULL_NAME:
return user.full_name or user.first_name or f"User {user.id}"
# MENTION: HTML упоминание
elif format_type == UserFormat.MENTION:
display = user.full_name or user.first_name or f"User {user.id}"
return f'<a href="tg://user?id={user.id}">{display}</a>'
# MENTION_MARKDOWN: Markdown упоминание
elif format_type == UserFormat.MENTION_MARKDOWN:
display = user.full_name or user.first_name or f"User {user.id}"
display = display.replace('[', '\\[').replace(']', '\\]')
return f'[{display}](tg://user?id={user.id})'
# FIRST_NAME: Только имя
elif format_type == UserFormat.FIRST_NAME:
return user.first_name or f"User {user.id}"
# ID_ONLY: Только ID
elif format_type == UserFormat.ID_ONLY:
return str(user.id)
# DETAILED: Подробная информация
elif format_type == UserFormat.DETAILED:
parts = []
# Username
if user.username:
parts.append(f"@{user.username}")
# Full name
if user.full_name:
parts.append(f"({user.full_name}")
elif user.first_name:
parts.append(f"({user.first_name}")
# ID
parts.append(f"ID: {user.id})")
return ' '.join(parts) if parts else f"User {user.id}"
# По умолчанию
return default
# ================= ДОПОЛНИТЕЛЬНЫЕ УТИЛИТЫ =================
def is_bot(event: EventType) -> bool:
"""
Проверяет, является ли пользователь ботом.
Args:
event: Объект события
Returns:
bool: True если бот
"""
user = _extract_user(event)
return user.is_bot if user else False
def has_username(event: EventType) -> bool:
"""
Проверяет, есть ли у пользователя username.
Args:
event: Объект события
Returns:
bool: True если есть username
"""
user = _extract_user(event)
return bool(user and user.username)
def is_premium(event: EventType) -> bool:
"""
Проверяет, есть ли у пользователя Telegram Premium.
Args:
event: Объект события
Returns:
bool: True если Premium
"""
user = _extract_user(event)
return user.is_premium if user else False
def get_language_code(event: EventType) -> Optional[str]:
"""
Возвращает код языка пользователя.
Args:
event: Объект события
Returns:
Optional[str]: Код языка ('ru', 'en', и т.д.)
"""
user = _extract_user(event)
return user.language_code if user else None
def compare_users(event1: EventType, event2: EventType) -> bool:
"""
Сравнивает двух пользователей по ID.
Args:
event1: Первое событие
event2: Второе событие
Returns:
bool: True если это один и тот же пользователь
"""
user1 = _extract_user(event1)
user2 = _extract_user(event2)
if not user1 or not user2:
return False
return user1.id == user2.id
def get_user_info_dict(event: EventType) -> dict:
"""
Возвращает всю информацию о пользователе в виде словаря.
Args:
event: Объект события
Returns:
dict: Словарь с информацией о пользователе
"""
user = _extract_user(event)
if not user:
return {}
return {
'id': user.id,
'username': user.username,
'first_name': user.first_name,
'last_name': user.last_name,
'full_name': user.full_name,
'is_bot': user.is_bot,
'is_premium': user.is_premium,
'language_code': user.language_code,
'mention': get_user_mention(event),
'display_name': get_user_display_name(event)
}