Files
PrimoGuardBot-/bot/middlewares/logging_mdw.py
2026-02-17 11:24:55 +07:00

351 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Middleware для логирования всех событий бота
"""
from typing import Callable, Awaitable, Any, Dict, Optional, Tuple
from datetime import datetime
from aiogram import BaseMiddleware
from aiogram.types import (
TelegramObject,
Update,
Message,
CallbackQuery,
InlineQuery,
ChatMemberUpdated
)
from middleware.loggers import logger
from ..utils import (
username,
get_content_type,
is_command,
parse_command,
is_group_chat
)
__all__ = ('LoggingMiddleware',)
class LoggingMiddleware(BaseMiddleware):
"""
Middleware для детального логирования всех событий бота.
Типы логов:
- CMD: Команды бота
- MSG: Текстовые сообщения
- MEDIA: Медиа сообщения
- CBD: Callback queries
- INLINE: Inline queries
- MEMBER: Изменения участников чата
"""
def __init__(self, project_prefix: str = "PRIMO"):
super().__init__()
self.project_prefix = project_prefix
# Статистика
self.stats = {
'total': 0,
'commands': 0,
'messages': 0,
'callbacks': 0,
'errors': 0
}
async def __call__(
self,
handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]],
event: TelegramObject,
data: Dict[str, Any]
) -> Any:
"""Обрабатывает входящее событие"""
self.stats['total'] += 1
start_time = datetime.now()
# Анализируем событие
log_info = self._analyze_event(event)
if not log_info:
return await handler(event, data)
log_type, log_text, user_str = log_info
# Добавляем префикс проекта
prefixed_log_type = f"{self.project_prefix}-{log_type}"
# Логируем получение события
logger.info(text=log_text, log_type=prefixed_log_type, user=user_str)
try:
# Выполняем обработчик
result = await handler(event, data)
# Вычисляем время обработки
processing_time = (datetime.now() - start_time).total_seconds()
# Логируем успешное выполнение для команд
if log_type == "CMD":
self.stats['commands'] += 1
logger.debug(
text=f"✅ Команда обработана за {processing_time:.3f}s",
log_type=prefixed_log_type,
user=user_str
)
return result
except Exception as e:
self.stats['errors'] += 1
logger.error(
text=f"❌ Ошибка обработки: {str(e)}",
log_type=prefixed_log_type,
user=user_str,
)
raise
def _analyze_event(self, event: TelegramObject) -> Optional[Tuple[str, str, str]]:
"""
Анализирует событие и извлекает информацию для логирования.
Returns:
Tuple: (тип_лога, текст_лога, пользователь) или None
"""
if isinstance(event, Update):
return self._analyze_update(event)
elif isinstance(event, Message):
return self._analyze_message(event)
elif isinstance(event, CallbackQuery):
return self._analyze_callback(event)
elif isinstance(event, InlineQuery):
return self._analyze_inline_query(event)
elif isinstance(event, ChatMemberUpdated):
return self._analyze_member_update(event)
return None
def _analyze_update(self, update: Update) -> Optional[Tuple[str, str, str]]:
"""Анализирует Update объект"""
if update.message:
return self._analyze_message(update.message)
elif update.edited_message:
result = self._analyze_message(update.edited_message)
if result:
log_type, log_text, user_str = result
log_text = f"✏️ [РЕДАКТИРОВАНО] {log_text}"
return log_type, log_text, user_str
elif update.channel_post:
return self._analyze_message(update.channel_post, is_channel=True)
elif update.edited_channel_post:
result = self._analyze_message(update.edited_channel_post, is_channel=True)
if result:
log_type, log_text, user_str = result
log_text = f"✏️ [РЕДАКТИРОВАНО] {log_text}"
return log_type, log_text, user_str
elif update.callback_query:
return self._analyze_callback(update.callback_query)
elif update.inline_query:
return self._analyze_inline_query(update.inline_query)
elif update.my_chat_member:
return self._analyze_member_update(update.my_chat_member)
elif update.chat_member:
return self._analyze_member_update(update.chat_member)
return None
def _analyze_message(self, message: Message, is_channel: bool = False) -> Tuple[str, str, str]:
"""Анализирует сообщение"""
user_str = username(message)
# Формируем префикс с информацией о чате
chat_info = ""
if is_group_chat(message):
chat_info = f"[{message.chat.type.upper()} {message.chat.id}] "
elif is_channel:
chat_info = f"[CHANNEL {message.chat.id}] "
else:
chat_info = f"[PM {message.chat.id}] "
# Проверяем команду
if message.text and is_command(message.text):
self.stats['messages'] += 1
parsed = parse_command(message.text)
if parsed:
log_text = f"{chat_info}📝 Команда: /{parsed.command}"
if parsed.args:
args_str = ' '.join(parsed.args[:3])
if len(parsed.args) > 3:
args_str += f" ... (+{len(parsed.args) - 3})"
log_text += f" | Аргументы: {args_str}"
if parsed.flags:
flags_str = ', '.join(f"--{k}" for k in list(parsed.flags.keys())[:3])
if len(parsed.flags) > 3:
flags_str += f" ... (+{len(parsed.flags) - 3})"
log_text += f" | Флаги: {flags_str}"
return "CMD", log_text, user_str
# Обычное сообщение
self.stats['messages'] += 1
content_type = get_content_type(message, russian=True)
content_emoji = self._get_content_emoji(message)
# Текстовое сообщение
if message.text:
text_preview = message.text
if len(text_preview) > 100:
text_preview = text_preview[:100] + "..."
log_text = f"{chat_info}{content_emoji} Сообщение ({len(message.text)} симв.): {text_preview!r}"
# Медиа с caption
elif message.caption:
caption_preview = message.caption
if len(caption_preview) > 50:
caption_preview = caption_preview[:50] + "..."
log_text = f"{chat_info}{content_emoji} {content_type}"
# Добавляем детали медиа
media_details = self._get_media_details_str(message)
if media_details:
log_text += f" {media_details}"
log_text += f" | Описание: {caption_preview!r}"
# Медиа без caption
else:
log_text = f"{chat_info}{content_emoji} {content_type}"
media_details = self._get_media_details_str(message)
if media_details:
log_text += f" {media_details}"
# Определяем тип лога
log_type = "MEDIA" if message.content_type != "text" else "MSG"
# Добавляем префикс канала
if is_channel:
log_text = f"📢 {log_text}"
return log_type, log_text, user_str
def _analyze_callback(self, callback: CallbackQuery) -> Tuple[str, str, str]:
"""Анализирует callback query"""
self.stats['callbacks'] += 1
user_str = f"@{callback.from_user.username}" if callback.from_user.username else f"id{callback.from_user.id}"
callback_data = callback.data or "None"
if len(callback_data) > 50:
callback_data = callback_data[:50] + "..."
chat_info = f"[MSG {callback.message.message_id}] " if callback.message else ""
log_text = f"{chat_info}🔘 Callback: {callback_data!r}"
return "CBD", log_text, user_str
@staticmethod
def _analyze_inline_query(inline_query: InlineQuery) -> Tuple[str, str, str]:
"""Анализирует inline query"""
user_str = f"@{inline_query.from_user.username}" if inline_query.from_user.username else f"id{inline_query.from_user.id}"
query = inline_query.query or ""
if len(query) > 50:
query = query[:50] + "..."
log_text = f"🔍 Inline запрос: {query!r}"
return "INLINE", log_text, user_str
@staticmethod
def _analyze_member_update(update: ChatMemberUpdated) -> Tuple[str, str, str]:
"""Анализирует изменения участников"""
user_str = f"@{update.from_user.username}" if update.from_user.username else f"id{update.from_user.id}"
old_status = update.old_chat_member.status
new_status = update.new_chat_member.status
chat_info = f"[{update.chat.type.upper()} {update.chat.id}] "
log_text = f"{chat_info}👥 Изменение статуса: {old_status}{new_status}"
return "MEMBER", log_text, user_str
@staticmethod
def _get_content_emoji(message: Message) -> str:
"""Возвращает emoji для типа контента"""
emoji_map = {
'text': '💬',
'photo': '📷',
'video': '🎥',
'animation': '🎞️',
'audio': '🎵',
'voice': '🎤',
'video_note': '🎬',
'document': '📄',
'sticker': '🎨',
'location': '📍',
'contact': '👤',
'poll': '📊',
'dice': '🎲'
}
return emoji_map.get(message.content_type, '📎')
@staticmethod
def _get_media_details_str(message: Message) -> Optional[str]:
"""Возвращает строку с деталями медиа файла"""
from ..utils import get_media_info
try:
media_info = get_media_info(message)
details = []
# Размер файла
if 'file_size_mb' in media_info:
details.append(f"{media_info['file_size_mb']} MB")
elif 'file_size_kb' in media_info:
details.append(f"{media_info['file_size_kb']} KB")
# Длительность
if 'duration_formatted' in media_info:
details.append(media_info['duration_formatted'])
# Разрешение
if 'width' in media_info and 'height' in media_info:
details.append(f"{media_info['width']}x{media_info['height']}")
return f"({', '.join(details)})" if details else None
except:
return None
def get_stats(self) -> Dict[str, int]:
"""Возвращает статистику middleware"""
return self.stats.copy()
def reset_stats(self):
"""Сбрасывает статистику"""
self.stats = {
'total': 0,
'commands': 0,
'messages': 0,
'callbacks': 0,
'errors': 0
}
def format_log_stats(stats: Dict[str, int]) -> str:
"""Форматирует статистику для вывода"""
return (
f"📊 Статистика логирования:\n"
f"├─ 📨 Всего событий: {stats['total']}\n"
f"├─ 📝 Команд: {stats['commands']}\n"
f"├─ 💬 Сообщений: {stats['messages']}\n"
f"├─ 🔘 Callbacks: {stats['callbacks']}\n"
f"└─ ❌ Ошибок: {stats['errors']}"
)