""" Middleware для измерения времени выполнения хендлеров """ from time import time from typing import Callable, Awaitable, Any, Dict, Optional from dataclasses import dataclass from collections import defaultdict from datetime import datetime from aiogram import BaseMiddleware from aiogram.types import TelegramObject, Message, CallbackQuery, Update, User from middleware.loggers import logger __all__ = ('TimingMiddleware', 'TimingStats') @dataclass class HandlerMetrics: """Метрики одного хендлера""" total_calls: int = 0 total_time: float = 0.0 min_time: float = float('inf') max_time: float = 0.0 last_call: Optional[datetime] = None @property def avg_time(self) -> float: """Среднее время выполнения""" return self.total_time / self.total_calls if self.total_calls > 0 else 0.0 def update(self, execution_time: float) -> None: """Обновляет метрики""" self.total_calls += 1 self.total_time += execution_time self.min_time = min(self.min_time, execution_time) self.max_time = max(self.max_time, execution_time) self.last_call = datetime.now() class TimingStats: """ Глобальная статистика времени выполнения хендлеров. Хранит метрики для каждого хендлера и предоставляет методы для анализа. """ def __init__(self): self.metrics: Dict[str, HandlerMetrics] = defaultdict(HandlerMetrics) def record(self, handler_name: str, execution_time: float) -> None: """ Записывает время выполнения хендлера. Args: handler_name: Имя хендлера execution_time: Время выполнения в секундах """ self.metrics[handler_name].update(execution_time) def get_stats(self, handler_name: Optional[str] = None) -> Dict[str, Any]: """ Возвращает статистику по хендлеру или всем хендлерам. Args: handler_name: Имя конкретного хендлера (если None, возвращает все) Returns: Dict с метриками """ if handler_name: metrics = self.metrics.get(handler_name) if not metrics: return {} return { 'handler': handler_name, 'total_calls': metrics.total_calls, 'avg_time': f"{metrics.avg_time:.3f}s", 'min_time': f"{metrics.min_time:.3f}s", 'max_time': f"{metrics.max_time:.3f}s", 'last_call': metrics.last_call.strftime('%Y-%m-%d %H:%M:%S') if metrics.last_call else None } # Возвращаем статистику по всем хендлерам return { name: { 'total_calls': m.total_calls, 'avg_time': f"{m.avg_time:.3f}s", 'min_time': f"{m.min_time:.3f}s", 'max_time': f"{m.max_time:.3f}s" } for name, m in sorted( self.metrics.items(), key=lambda x: x[1].avg_time, reverse=True ) } def get_slowest(self, limit: int = 10) -> list[tuple[str, float]]: """ Возвращает список самых медленных хендлеров. Args: limit: Количество хендлеров в результате Returns: List кортежей (имя_хендлера, среднее_время) """ sorted_handlers = sorted( self.metrics.items(), key=lambda x: x[1].avg_time, reverse=True ) return [(name, m.avg_time) for name, m in sorted_handlers[:limit]] def reset(self, handler_name: Optional[str] = None) -> None: """ Сбрасывает статистику. Args: handler_name: Имя хендлера для сброса (если None, сбрасывает все) """ if handler_name: if handler_name in self.metrics: del self.metrics[handler_name] else: self.metrics.clear() # Глобальный экземпляр статистики timing_stats = TimingStats() class TimingMiddleware(BaseMiddleware): """ Middleware для измерения времени выполнения хендлеров. Возможности: - Измерение времени выполнения каждого хендлера - Автоматическая классификация (быстрый/средний/медленный) - Сбор статистики - Логирование медленных хендлеров - Предупреждения о критически медленных запросах Attributes: slow_threshold: Порог медленного хендлера (сек) warning_threshold: Порог критически медленного хендлера (сек) log_all: Логировать все хендлеры (даже быстрые) collect_stats: Собирать статистику Example: ```python from middleware.timing import TimingMiddleware, timing_stats # Регистрация middleware dp.message.middleware(TimingMiddleware(slow_threshold=0.5)) # Получение статистики stats = timing_stats.get_slowest(5) for handler, avg_time in stats: print(f"{handler}: {avg_time:.3f}s") ``` """ def __init__( self, slow_threshold: float = 1.0, warning_threshold: float = 3.0, log_all: bool = False, collect_stats: bool = True ): """ Инициализация middleware. Args: slow_threshold: Порог медленного хендлера в секундах warning_threshold: Порог критически медленного хендлера log_all: Логировать все хендлеры (иначе только медленные) collect_stats: Собирать статистику выполнения """ super().__init__() self.slow_threshold = slow_threshold self.warning_threshold = warning_threshold self.log_all = log_all self.collect_stats = collect_stats @staticmethod def _extract_user_info(event: TelegramObject) -> str: """ Извлекает информацию о пользователе из события. Args: event: Объект события Returns: str: Форматированная строка с информацией о пользователе """ user: Optional[User] = None # Прямое извлечение из Message/CallbackQuery if isinstance(event, (Message, CallbackQuery)): user = getattr(event, 'from_user', None) # Извлечение из Update elif isinstance(event, Update): for attr in ['message', 'edited_message', 'callback_query', 'channel_post', 'edited_channel_post', 'inline_query', 'chosen_inline_result', 'my_chat_member', 'chat_member']: obj = getattr(event, attr, None) if obj and hasattr(obj, 'from_user'): user = obj.from_user break if user: return f"@{user.username}" if user.username else f"id{user.id}" return "@System" @staticmethod def _get_handler_name(handler: Callable) -> str: """ Получает имя хендлера для логирования. Args: handler: Функция хендлера Returns: str: Имя хендлера """ # Пытаемся получить полное имя с модулем if hasattr(handler, '__module__') and hasattr(handler, '__name__'): return f"{handler.__module__}.{handler.__name__}" elif hasattr(handler, '__name__'): return handler.__name__ else: return str(handler) def _classify_speed(self, execution_time: float) -> tuple[str, str]: """ Классифицирует скорость выполнения. Args: execution_time: Время выполнения в секундах Returns: tuple: (уровень_лога, тип_лога) """ if execution_time >= self.warning_threshold: return 'ERROR', 'CRITICAL_SLOW' elif execution_time >= self.slow_threshold: return 'WARNING', 'SLOW_HANDLER' elif execution_time >= self.slow_threshold / 2: return 'INFO', 'MEDIUM_HANDLER' else: return 'DEBUG', 'FAST_HANDLER' async def __call__( self, handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]], event: TelegramObject, data: Dict[str, Any] ) -> Any: """ Основной метод middleware. Args: handler: Функция хендлера event: Объект события data: Дополнительные данные Returns: Результат выполнения хендлера """ start_time = time() handler_name = self._get_handler_name(handler) user_str = self._extract_user_info(event) # Выполняем хендлер try: result = await handler(event, data) return result finally: # Измеряем время execution_time = time() - start_time # Собираем статистику if self.collect_stats: timing_stats.record(handler_name, execution_time) # Классифицируем скорость log_level, log_type = self._classify_speed(execution_time) # Логируем результат if self.log_all or execution_time >= self.slow_threshold / 2: # Формируем сообщение if execution_time >= self.warning_threshold: message = f"⚠️ КРИТИЧЕСКИ медленный хендлер '{handler_name}': {execution_time:.3f}с" elif execution_time >= self.slow_threshold: message = f"🐌 Медленный хендлер '{handler_name}': {execution_time:.3f}с" else: message = f"⏱️ Хендлер '{handler_name}': {execution_time:.3f}с" # Логируем logger.log_entry( level=log_level, text=message, log_type=log_type, user=user_str )