""" Middleware для логирования всех событий бота """ from typing import Callable, Awaitable, Any, Dict, Optional, Tuple from datetime import datetime from aiogram import BaseMiddleware from aiogram.types import ( TelegramObject, Update, Message, CallbackQuery, InlineQuery, ChatMemberUpdated ) from middleware.loggers import logger from ..utils import ( username, get_content_type, is_command, parse_command, is_group_chat ) __all__ = ('LoggingMiddleware',) class LoggingMiddleware(BaseMiddleware): """ Middleware для детального логирования всех событий бота. Типы логов: - CMD: Команды бота - MSG: Текстовые сообщения - MEDIA: Медиа сообщения - CBD: Callback queries - INLINE: Inline queries - MEMBER: Изменения участников чата """ def __init__(self, project_prefix: str = "PRIMO"): super().__init__() self.project_prefix = project_prefix # Статистика self.stats = { 'total': 0, 'commands': 0, 'messages': 0, 'callbacks': 0, 'errors': 0 } async def __call__( self, handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]], event: TelegramObject, data: Dict[str, Any] ) -> Any: """Обрабатывает входящее событие""" self.stats['total'] += 1 start_time = datetime.now() # Анализируем событие log_info = self._analyze_event(event) if not log_info: return await handler(event, data) log_type, log_text, user_str = log_info # Добавляем префикс проекта prefixed_log_type = f"{self.project_prefix}-{log_type}" # Логируем получение события logger.info(text=log_text, log_type=prefixed_log_type, user=user_str) try: # Выполняем обработчик result = await handler(event, data) # Вычисляем время обработки processing_time = (datetime.now() - start_time).total_seconds() # Логируем успешное выполнение для команд if log_type == "CMD": self.stats['commands'] += 1 logger.debug( text=f"✅ Команда обработана за {processing_time:.3f}s", log_type=prefixed_log_type, user=user_str ) return result except Exception as e: self.stats['errors'] += 1 logger.error( text=f"❌ Ошибка обработки: {str(e)}", log_type=prefixed_log_type, user=user_str, ) raise def _analyze_event(self, event: TelegramObject) -> Optional[Tuple[str, str, str]]: """ Анализирует событие и извлекает информацию для логирования. Returns: Tuple: (тип_лога, текст_лога, пользователь) или None """ if isinstance(event, Update): return self._analyze_update(event) elif isinstance(event, Message): return self._analyze_message(event) elif isinstance(event, CallbackQuery): return self._analyze_callback(event) elif isinstance(event, InlineQuery): return self._analyze_inline_query(event) elif isinstance(event, ChatMemberUpdated): return self._analyze_member_update(event) return None def _analyze_update(self, update: Update) -> Optional[Tuple[str, str, str]]: """Анализирует Update объект""" if update.message: return self._analyze_message(update.message) elif update.edited_message: result = self._analyze_message(update.edited_message) if result: log_type, log_text, user_str = result log_text = f"✏️ [РЕДАКТИРОВАНО] {log_text}" return log_type, log_text, user_str elif update.channel_post: return self._analyze_message(update.channel_post, is_channel=True) elif update.edited_channel_post: result = self._analyze_message(update.edited_channel_post, is_channel=True) if result: log_type, log_text, user_str = result log_text = f"✏️ [РЕДАКТИРОВАНО] {log_text}" return log_type, log_text, user_str elif update.callback_query: return self._analyze_callback(update.callback_query) elif update.inline_query: return self._analyze_inline_query(update.inline_query) elif update.my_chat_member: return self._analyze_member_update(update.my_chat_member) elif update.chat_member: return self._analyze_member_update(update.chat_member) return None def _analyze_message(self, message: Message, is_channel: bool = False) -> Tuple[str, str, str]: """Анализирует сообщение""" user_str = username(message) # Формируем префикс с информацией о чате chat_info = "" if is_group_chat(message): chat_info = f"[{message.chat.type.upper()} {message.chat.id}] " elif is_channel: chat_info = f"[CHANNEL {message.chat.id}] " else: chat_info = f"[PM {message.chat.id}] " # Проверяем команду if message.text and is_command(message.text): self.stats['messages'] += 1 parsed = parse_command(message.text) if parsed: log_text = f"{chat_info}📝 Команда: /{parsed.command}" if parsed.args: args_str = ' '.join(parsed.args[:3]) if len(parsed.args) > 3: args_str += f" ... (+{len(parsed.args) - 3})" log_text += f" | Аргументы: {args_str}" if parsed.flags: flags_str = ', '.join(f"--{k}" for k in list(parsed.flags.keys())[:3]) if len(parsed.flags) > 3: flags_str += f" ... (+{len(parsed.flags) - 3})" log_text += f" | Флаги: {flags_str}" return "CMD", log_text, user_str # Обычное сообщение self.stats['messages'] += 1 content_type = get_content_type(message, russian=True) content_emoji = self._get_content_emoji(message) # Текстовое сообщение if message.text: text_preview = message.text if len(text_preview) > 100: text_preview = text_preview[:100] + "..." log_text = f"{chat_info}{content_emoji} Сообщение ({len(message.text)} симв.): {text_preview!r}" # Медиа с caption elif message.caption: caption_preview = message.caption if len(caption_preview) > 50: caption_preview = caption_preview[:50] + "..." log_text = f"{chat_info}{content_emoji} {content_type}" # Добавляем детали медиа media_details = self._get_media_details_str(message) if media_details: log_text += f" {media_details}" log_text += f" | Описание: {caption_preview!r}" # Медиа без caption else: log_text = f"{chat_info}{content_emoji} {content_type}" media_details = self._get_media_details_str(message) if media_details: log_text += f" {media_details}" # Определяем тип лога log_type = "MEDIA" if message.content_type != "text" else "MSG" # Добавляем префикс канала if is_channel: log_text = f"📢 {log_text}" return log_type, log_text, user_str def _analyze_callback(self, callback: CallbackQuery) -> Tuple[str, str, str]: """Анализирует callback query""" self.stats['callbacks'] += 1 user_str = f"@{callback.from_user.username}" if callback.from_user.username else f"id{callback.from_user.id}" callback_data = callback.data or "None" if len(callback_data) > 50: callback_data = callback_data[:50] + "..." chat_info = f"[MSG {callback.message.message_id}] " if callback.message else "" log_text = f"{chat_info}🔘 Callback: {callback_data!r}" return "CBD", log_text, user_str @staticmethod def _analyze_inline_query(inline_query: InlineQuery) -> Tuple[str, str, str]: """Анализирует inline query""" user_str = f"@{inline_query.from_user.username}" if inline_query.from_user.username else f"id{inline_query.from_user.id}" query = inline_query.query or "" if len(query) > 50: query = query[:50] + "..." log_text = f"🔍 Inline запрос: {query!r}" return "INLINE", log_text, user_str @staticmethod def _analyze_member_update(update: ChatMemberUpdated) -> Tuple[str, str, str]: """Анализирует изменения участников""" user_str = f"@{update.from_user.username}" if update.from_user.username else f"id{update.from_user.id}" old_status = update.old_chat_member.status new_status = update.new_chat_member.status chat_info = f"[{update.chat.type.upper()} {update.chat.id}] " log_text = f"{chat_info}👥 Изменение статуса: {old_status} → {new_status}" return "MEMBER", log_text, user_str @staticmethod def _get_content_emoji(message: Message) -> str: """Возвращает emoji для типа контента""" emoji_map = { 'text': '💬', 'photo': '📷', 'video': '🎥', 'animation': '🎞️', 'audio': '🎵', 'voice': '🎤', 'video_note': '🎬', 'document': '📄', 'sticker': '🎨', 'location': '📍', 'contact': '👤', 'poll': '📊', 'dice': '🎲' } return emoji_map.get(message.content_type, '📎') @staticmethod def _get_media_details_str(message: Message) -> Optional[str]: """Возвращает строку с деталями медиа файла""" from ..utils import get_media_info try: media_info = get_media_info(message) details = [] # Размер файла if 'file_size_mb' in media_info: details.append(f"{media_info['file_size_mb']} MB") elif 'file_size_kb' in media_info: details.append(f"{media_info['file_size_kb']} KB") # Длительность if 'duration_formatted' in media_info: details.append(media_info['duration_formatted']) # Разрешение if 'width' in media_info and 'height' in media_info: details.append(f"{media_info['width']}x{media_info['height']}") return f"({', '.join(details)})" if details else None except: return None def get_stats(self) -> Dict[str, int]: """Возвращает статистику middleware""" return self.stats.copy() def reset_stats(self): """Сбрасывает статистику""" self.stats = { 'total': 0, 'commands': 0, 'messages': 0, 'callbacks': 0, 'errors': 0 } def format_log_stats(stats: Dict[str, int]) -> str: """Форматирует статистику для вывода""" return ( f"📊 Статистика логирования:\n" f"├─ 📨 Всего событий: {stats['total']}\n" f"├─ 📝 Команд: {stats['commands']}\n" f"├─ 💬 Сообщений: {stats['messages']}\n" f"├─ 🔘 Callbacks: {stats['callbacks']}\n" f"└─ ❌ Ошибок: {stats['errors']}" )