Первый коммит

This commit is contained in:
admin
2025-08-30 07:39:44 +07:00
commit d0baf76f8f
86 changed files with 7362 additions and 0 deletions

View File

@@ -0,0 +1,47 @@
from aiogram import Dispatcher
from configs import ImportantID
from .logging_mdw import LoggingMiddleware
from .msg_mdw import MessageCounterMiddleware
from .spam_mdw import RateLimitMiddleware
from .subscription_mdw import SubscriptionMiddleware
from .error_mdw import ErrorHandlingMiddleware
from .time_mdw import TimingMiddleware
__all__ = [
LoggingMiddleware,
SubscriptionMiddleware,
RateLimitMiddleware,
ErrorHandlingMiddleware,
TimingMiddleware,
MessageCounterMiddleware,
]
def setup_middlewares(dp: Dispatcher, bot, channel_ids: list[int | str] = None) -> None:
"""
Регистрирует все middleware в диспетчере.
"""
channel_ids = channel_ids or []
# Middleware для ВСЕХ событий (update level)
middlewares_updates: list = [
TimingMiddleware(), # Замер времени
LoggingMiddleware(), # Логирование
ErrorHandlingMiddleware(admin_ids=ImportantID.ADMIN_ID), # Обработка ошибок
]
# Middleware только для СООБЩЕНИЙ (message level)
middlewares_msg: list = [
#RateLimitMiddleware(rate_limit=3, time_period=5.0), # Антифлуд
#SubscriptionMiddleware(bot=bot, channel_ids=channel_ids), # Проверка подписки
MessageCounterMiddleware(), # Подсчет сообщений
]
# Регистрируем middleware для всех событий
for middleware in middlewares_updates:
dp.update.middleware(middleware)
# Регистрируем middleware только для сообщений
for middleware in middlewares_msg:
dp.message.middleware(middleware)

View File

@@ -0,0 +1,201 @@
from typing import Callable, Awaitable, Any, Dict
from aiogram import BaseMiddleware
from aiogram.types import TelegramObject, Message, CallbackQuery, Update
from middleware.loggers import loggers # ваш логгер
class ErrorHandlingMiddleware(BaseMiddleware):
"""
Middleware для глобальной обработки ошибок в хендлерах.
Зачем нужен:
- Централизованная обработка исключений
- Уведомление администраторов об ошибках
- Graceful degradation при сбоях
"""
def __init__(self, admin_ids: list[int]):
"""
Инициализация middleware обработки ошибок.
Args:
admin_ids: Список ID администраторов для уведомлений
"""
self.admin_ids = admin_ids
super().__init__()
async def __call__(
self,
handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]],
event: TelegramObject,
data: Dict[str, Any]
) -> Any:
"""
Перехватывает и обрабатывает ошибки в хендлерах.
"""
try:
return await handler(event, data)
except Exception as e:
# Получаем информацию о пользователе безопасным способом
user_str = self._extract_user_info(event)
# Логируем ошибку
error_message = f"Ошибка в хендлере: {type(e).__name__}: {str(e)}"
loggers.error(
text=error_message,
log_type="HANDLER_ERROR",
user=user_str
)
# Уведомляем администраторов
await self._notify_admins(error_message, event, user_str)
# Отправляем пользователю сообщение об ошибке
await self._send_error_message(event, user_str)
return None
@staticmethod
def _extract_user_info(event: TelegramObject) -> str:
"""
Безопасно извлекает информацию о пользователе из события.
Args:
event: Объект события
Returns:
Строка с идентификатором пользователя
"""
user_str = "@System"
# Для Message и CallbackQuery
if isinstance(event, (Message, CallbackQuery)) and hasattr(event, 'from_user') and event.from_user:
user = event.from_user
user_str = f"@{user.username}" if user.username else f"id{user.id}"
# Для Update (который содержит message или callback_query)
elif isinstance(event, Update):
# Пытаемся найти пользователя в различных полях Update
user_object = None
if event.message and event.message.from_user:
user_object = event.message.from_user
elif event.edited_message and event.edited_message.from_user:
user_object = event.edited_message.from_user
elif event.callback_query and event.callback_query.from_user:
user_object = event.callback_query.from_user
elif event.channel_post and event.channel_post.from_user:
user_object = event.channel_post.from_user
elif event.edited_channel_post and event.edited_channel_post.from_user:
user_object = event.edited_channel_post.from_user
if user_object:
user_str = f"@{user_object.username}" if user_object.username else f"id{user_object.id}"
return user_str
@staticmethod
def _extract_event_text(event: TelegramObject) -> str:
"""
Безопасно извлекает текст из события.
Args:
event: Объект события
Returns:
Текст события или пустая строка
"""
event_text = ""
# Для Message
if isinstance(event, Message) and hasattr(event, 'text') and event.text:
event_text = event.text
# Для CallbackQuery
elif isinstance(event, CallbackQuery) and hasattr(event, 'data') and event.data:
event_text = f"callback: {event.data}"
# Для Update
elif isinstance(event, Update):
if event.message and event.message.text:
event_text = event.message.text
elif event.callback_query and event.callback_query.data:
event_text = f"callback: {event.callback_query.data}"
elif event.edited_message and event.edited_message.text:
event_text = event.edited_message.text
return event_text[:100] + "..." if len(event_text) > 100 else event_text
async def _notify_admins(
self,
error_message: str,
event: TelegramObject,
user_str: str
) -> None:
"""Уведомляет администраторов об ошибке."""
from aiogram import Bot
bot: Bot = event.bot if hasattr(event, 'bot') else None
if bot:
for admin_id in self.admin_ids:
try:
event_info = f"Событие: {type(event).__name__}"
event_text = self._extract_event_text(event)
if event_text:
event_info += f", текст: {event_text}"
full_message = (
f"🚨 Ошибка в боте:\n\n"
f"Пользователь: {user_str}\n"
f"Ошибка: {error_message}\n"
f"{event_info}"
)
await bot.send_message(admin_id, full_message)
loggers.info(
text=f"Администратор {admin_id} уведомлен об ошибке",
log_type="ADMIN_NOTIFIED",
user=user_str
)
except Exception as e:
loggers.error(
text=f"Не удалось уведомить админа {admin_id}: {e}",
log_type="ADMIN_NOTIFY_ERROR",
user=user_str
)
@staticmethod
async def _send_error_message(
event: TelegramObject,
user_str: str
) -> None:
"""Отправляет пользователю сообщение об ошибке."""
error_text = (
"⚠️ Произошла непредвиденная ошибка. "
"Разработчики уже уведомлены и работают над исправлением.\n\n"
"Попробуйте повторить действие позже или нажмите /start"
)
try:
if isinstance(event, Message):
await event.answer(error_text)
elif isinstance(event, CallbackQuery):
await event.message.answer(error_text)
await event.answer()
elif isinstance(event, Update) and event.message:
await event.message.answer(error_text)
loggers.info(
text="Пользователю отправлено сообщение об ошибке",
log_type="ERROR_MESSAGE_SENT",
user=user_str
)
except Exception as e:
loggers.error(
text=f"Не удалось отправить сообщение об ошибке: {e}",
log_type="ERROR_MESSAGE_FAILED",
user=user_str
)

View File

@@ -0,0 +1,271 @@
from typing import Callable, Awaitable, Any, Dict, Optional, Tuple, Set
from aiogram import BaseMiddleware
from aiogram.types import TelegramObject, Update, Message, CallbackQuery, MaybeInaccessibleMessageUnion, User
from bot.utils import type_msg
from middleware.loggers import loggers # ваш глобальный логгер
from configs import BotSettings, COMMANDS # импортируем настройки и команды
class LoggingMiddleware(BaseMiddleware):
"""
Middleware для логирования апдейтов с определением типа события,
пользователя и добавлением префикса проекта к типу лога.
Автоматически добавляет префикс проекта (например, 'PRIMO-') к типам логов:
- PRIMO-UPDATE: общий апдейт без определенного типа
- PRIMO-MSG: текстовое сообщение от пользователя
- PRIMO-CMD: команда (сообщение, начинающееся с любого префикса)
- PRIMO-CBD: callback query от инлайн-кнопок
"""
# Префикс проекта для логов
PROJECT_PREFIX: str = "PRIMO"
# Кэш для всех команд из COMMANDS
_all_commands: Optional[Set[str]] = None
def __init__(self):
super().__init__()
# Предварительно загружаем все команды
self._load_all_commands()
def _load_all_commands(self) -> None:
"""Загружает все команды из COMMANDS в множество для быстрого поиска."""
if self._all_commands is None:
self._all_commands = set()
for command_list in COMMANDS.values():
self._all_commands.update(command_list)
def _is_command(self, text: str) -> bool:
"""
Проверяет, является ли текст командой с любым префиксом.
Args:
text: Текст для проверки
Returns:
True если это команда, False если нет
"""
if not text:
return False
# Проверяем все префиксы из BotSettings
for prefix in BotSettings.PREFIX:
if text.startswith(prefix):
# Извлекаем команду без префикса
command_without_prefix = text[len(prefix):].strip()
# Проверяем, есть ли такая команда в нашем списке
if command_without_prefix in self._all_commands:
return True
# Также проверяем команды с префиксом / (стандартные)
if text.startswith('/'):
command_without_slash = text[1:].strip()
if command_without_slash in self._all_commands:
return True
return False
@staticmethod
def _extract_command_name(text: str) -> str:
"""
Извлекает название команды из текста.
Args:
text: Текст команды с префиксом
Returns:
Название команды без префикса
"""
for prefix in BotSettings.PREFIX:
if text.startswith(prefix):
return text[len(prefix):].strip()
if text.startswith('/'):
return text[1:].strip()
return text
async def __call__(
self,
handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]],
event: TelegramObject,
data: Dict[str, Any]
) -> Any:
"""
Обрабатывает входящее событие, определяет его тип, логирует с префиксом проекта
и передает следующему обработчику.
Args:
handler: Следующий обработчик в цепочке middleware
event: Входящее событие для обработки (Update, Message, CallbackQuery)
data: Словарь с контекстными данными FSM
Returns:
Результат выполнения следующего обработчика
Raises:
Exception: Любое исключение, возникшее при обработке хендлером
"""
# Определяем тип события и информацию для логирования
log_type: str
log_text: str
message_obj: Optional[Message]
log_type, log_text, message_obj = self._determine_event_type(event)
# Добавляем префикс проекта к типу лога
prefixed_log_type: str = f"{log_type}"
# Определяем информацию о пользователе
user_str: str = self._extract_user_info(event, message_obj)
# Логируем получение события с префиксом проекта
loggers.info(
text=log_text,
log_type=prefixed_log_type,
user=user_str
)
try:
# Передаем событие следующему обработчику
result: Any = await handler(event, data)
# Логируем успешное выполнение для команд
if log_type == "CMD":
loggers.info(
text=f"[SUCCESS] команда обработана",
log_type=prefixed_log_type,
user=user_str
)
return result
except Exception as e:
# Логируем ошибку при обработке с префиксом проекта
loggers.error(
text=f"Ошибка обработки: {str(e)}",
log_type=prefixed_log_type,
user=user_str
)
raise
def _determine_event_type(
self,
event: TelegramObject
) -> Tuple[str, str, Optional[Message]]:
"""
Определяет тип события и извлекает информацию для логирования.
Args:
event: Объект события для анализа
Returns:
Кортеж из (тип_лога, текст_лога, объект_сообщения)
"""
log_type: str = "UPDATE"
log_text: str = f"Получен апдейт: {type(event).__name__}"
message_obj: Optional[Message] = None
# Обработка Update объектов (основной тип в middleware)
if isinstance(event, Update):
# Пытаемся найти сообщение в различных полях Update
message_obj = (
event.message or
event.edited_message or
event.channel_post or
event.edited_channel_post
)
if message_obj and message_obj.text:
if self._is_command(message_obj.text):
log_type: str = "CMD"
log_text: str = f"использовал команду '{message_obj.text}'"
else:
log_type: str = "MSG"
log_text: str = f"получено сообщение: {message_obj.text!r}"
elif message_obj:
# Не текстовое сообщение (фото, видео и т.д.)
log_type: str = "MSG"
log_text: str = f"получено сообщение: '{type_msg(message_obj)}'"
elif event.callback_query:
# Обработка callback query
callback: CallbackQuery = event.callback_query
log_type: str = "CBD"
log_text: str = f"получен callback: {callback.data!r}"
if callback.message:
message_obj: Optional[MaybeInaccessibleMessageUnion] = callback.message
# Прямая обработка Message (если мидлварь зарегистрирован на messages)
elif isinstance(event, Message):
message_obj = event
if event.text and self._is_command(event.text):
log_type: str = "CMD"
log_text: str = f"использовал команду '{event.text}'"
elif event.text:
log_type: str = "MSG"
log_text: str = f"получено сообщение: {event.text!r}"
else:
log_type: str = "MSG"
log_text: str = f"получено сообщение типа: {event.content_type}"
# Прямая обработка CallbackQuery (если мидлварь зарегистрирован на callbacks)
elif isinstance(event, CallbackQuery):
log_type: str = "CBD"
log_text: str = f"получен callback: {event.data!r}"
if event.message:
message_obj = event.message
return log_type, log_text, message_obj
@staticmethod
def _extract_user_info(
event: TelegramObject,
message: Optional[Message] = None
) -> str:
"""
Извлекает информацию о пользователе из события.
Args:
event: Объект события (Update, Message или CallbackQuery)
message: Объект Message (если уже определен)
Returns:
Строка с идентификатором пользователя в формате '@username' или 'id<user_id>'
"""
user_str: str = "@System"
# Для CallbackQuery извлекаем пользователя из самого callback'а
if isinstance(event, CallbackQuery) and hasattr(event, 'from_user') and event.from_user:
user = event.from_user
user_str: str = f"@{user.username}" if user.username else f"id{user.id}"
# Для Message извлекаем пользователя из сообщения
elif isinstance(event, Message) and hasattr(event, 'from_user') and event.from_user:
user = event.from_user
user_str: str = f"@{user.username}" if user.username else f"id{user.id}"
# Для Update с callback_query
elif (isinstance(event, Update) and
event.callback_query and
hasattr(event.callback_query, 'from_user') and
event.callback_query.from_user):
user = event.callback_query.from_user
user_str: str = f"@{user.username}" if user.username else f"id{user.id}"
# Для Update с сообщением
elif (isinstance(event, Update) and
(event.message or event.edited_message) and
hasattr(event.message or event.edited_message, 'from_user')):
msg = event.message or event.edited_message
if msg and msg.from_user:
user: Optional[User] = msg.from_user
user_str: str = f"@{user.username}" if user.username else f"id{user.id}"
# Если передан message объект
elif message and hasattr(message, 'from_user') and message.from_user:
user: Optional[User] = message.from_user
user_str: str = f"@{user.username}" if user.username else f"id{user.id}"
return user_str

View File

@@ -0,0 +1,55 @@
import logging
from typing import Callable, Dict, Any, Awaitable
from aiogram import BaseMiddleware
from aiogram.enums import ChatType
from aiogram.types import Message
from database import db
logger = logging.getLogger(__name__)
class MessageCounterMiddleware(BaseMiddleware):
"""
Middleware для подсчёта сообщений в группах и супергруппах.
"""
async def __call__(
self,
handler: Callable[[Message, Dict[str, Any]], Awaitable[Any]],
event: Any,
data: Dict[str, Any]
) -> Any:
if not isinstance(event, Message):
return await handler(event, data)
# Проверяем, что сообщение пришло из группового чата и не от бота
if (event.chat.type in (ChatType.GROUP, ChatType.SUPERGROUP) and
not event.from_user.is_bot):
try:
await self.process_group_message(event)
except Exception as e:
logger.error(msg=f"Ошибка при обработке сообщения: {e}", exc_info=True)
return await handler(event, data)
@staticmethod
async def process_group_message(message: Message) -> None:
"""
Обработка сообщения из группового чата.
"""
user_id = message.from_user.id
message_text = message.text or message.caption or ""
# Добавляем пользователя (если его ещё нет)
await db.add_user(
user_id=user_id,
username=message.from_user.username,
full_name=message.from_user.full_name,
)
# Сохраняем сообщение
await db.add_message(
user_id=user_id,
message_text=message_text,
created_at=message.date,
)
logger.info(f"Сообщение от пользователя {user_id} сохранено в БД")

View File

@@ -0,0 +1,97 @@
from typing import Callable, Awaitable, Any, Dict
from aiogram import BaseMiddleware
from aiogram.types import TelegramObject, Message, CallbackQuery
import time
from collections import defaultdict
from middleware.loggers import loggers # ваш логгер
class RateLimitMiddleware(BaseMiddleware):
"""
Middleware для ограничения частоты запросов от пользователей (анти-спам).
Зачем нужен:
- Защита от DDoS и флуда
- Предотвращение злоупотребления ботом
- Контроль нагрузки на сервер
"""
def __init__(self, rate_limit: int = 10, time_period: float = 2.0):
"""
Инициализация rate limit middleware.
Args:
rate_limit: Максимальное количество запросов за период
time_period: Период времени в секундах
"""
self.rate_limit = rate_limit
self.time_period = time_period
self.user_calls: Dict[int, list[float]] = defaultdict(list)
super().__init__()
async def __call__(
self,
handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]],
event: TelegramObject,
data: Dict[str, Any],
log: bool = False,
) -> Any:
"""
Проверяет rate limit перед обработкой запроса.
"""
# Пропускаем не-сообщения и не-колбэки
if not isinstance(event, (Message, CallbackQuery)):
return await handler(event, data)
user_id: int = event.from_user.id
user_str: str = f"@{event.from_user.username}" if event.from_user.username else f"id{user_id}"
current_time: float = time.time()
# Очищаем старые запросы
self.user_calls[user_id] = [
call_time for call_time in self.user_calls[user_id]
if current_time - call_time < self.time_period
]
# Логируем текущее состояние rate limit
if log:
loggers.debug(
text=f"Rate limit: {len(self.user_calls[user_id])}/{self.rate_limit} за {self.time_period}сек",
log_type="RATE_LIMIT_STATUS",
user=user_str
)
# Проверяем текущий лимит
if len(self.user_calls[user_id]) >= self.rate_limit:
# Логируем попытку спама
if log:
loggers.warning(
text=f"Превышен rate limit ({self.rate_limit}/{self.time_period}сек)",
log_type="RATE_LIMIT_EXCEEDED",
user=user_str
)
# Отправляем сообщение о превышении лимита
if isinstance(event, Message):
await event.answer(
text="⏳ Слишком много запросов! Пожалуйста, подождите немного.",
)
elif isinstance(event, CallbackQuery):
await event.answer(
text="⏳ Подождите немного перед следующим действием.",
show_alert=True
)
return None
# Добавляем текущий запрос и продолжаем обработку
self.user_calls[user_id].append(current_time)
loggers.debug(
text=f"Запрос добавлен в rate limit",
log_type="RATE_LIMIT_ADDED",
user=user_str
)
return await handler(event, data)

View File

@@ -0,0 +1,115 @@
from typing import Callable, Awaitable, Any, Dict
from aiogram import BaseMiddleware, Bot
from aiogram.types import TelegramObject, Message, CallbackQuery
from aiogram.exceptions import TelegramBadRequest
from middleware.loggers import loggers # ваш логгер
class SubscriptionMiddleware(BaseMiddleware):
"""
Middleware для проверки подписки пользователя на необходимые каналы.
Блокирует обработку команд, если пользователь не подписан.
Зачем нужен:
- Автоматическая проверка подписки для всех входящих сообщений
- Единая точка управления подписками
- Предотвращение доступа к функционалу без подписки
"""
def __init__(self, bot: Bot, channel_ids: list[int | str]):
"""
Инициализация middleware проверки подписки.
Args:
bot: Экземпляр бота
channel_ids: Список ID каналов/чатов для проверки подписки
"""
self.bot = bot
self.channel_ids = channel_ids
super().__init__()
async def __call__(
self,
handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]],
event: TelegramObject,
data: Dict[str, Any]
) -> Any:
"""
Проверяет подписку пользователя перед обработкой команды.
"""
# Пропускаем не-сообщения и не-колбэки
if not isinstance(event, (Message, CallbackQuery)):
return await handler(event, data)
user_id: int = event.from_user.id
user_str: str = f"@{event.from_user.username}" if event.from_user.username else f"id{user_id}"
# Логируем начало проверки подписки
loggers.info(
text=f"Проверка подписки для пользователя",
log_type="SUBSCRIPTION_CHECK",
user=user_str
)
# Проверяем подписку на все required каналы
not_subscribed_channels: list[str] = []
for channel_id in self.channel_ids:
try:
member = await self.bot.get_chat_member(
chat_id=channel_id,
user_id=user_id
)
# Проверяем, что пользователь является участником
if member.status not in ['member', 'administrator', 'creator']:
not_subscribed_channels.append(str(channel_id))
except TelegramBadRequest as e:
loggers.error(
text=f"Ошибка проверки подписки на канал {channel_id}: {e}",
log_type="SUBSCRIPTION_ERROR",
user=user_str
)
# Если пользователь не подписан на некоторые каналы
if not_subscribed_channels:
loggers.warning(
text=f"Пользователь не подписан на каналы: {', '.join(not_subscribed_channels)}",
log_type="SUBSCRIPTION_FAILED",
user=user_str
)
warning_text = (
"📢 Для использования бота необходимо подписаться на наши каналы!\n\n"
"После подписки нажмите /start для продолжения."
)
# Создаем кнопку "Проверить подписку"
from aiogram.types import InlineKeyboardMarkup, InlineKeyboardButton
keyboard = InlineKeyboardMarkup(
inline_keyboard=[[
InlineKeyboardButton(
text="✅ Я подписался",
callback_data="check_subscription"
)
]]
)
if isinstance(event, Message):
await event.answer(warning_text, reply_markup=keyboard)
elif isinstance(event, CallbackQuery):
await event.message.answer(warning_text, reply_markup=keyboard)
await event.answer()
return None
# Логируем успешную проверку подписки
loggers.info(
text="Пользователь подписан на все required каналы",
log_type="SUBSCRIPTION_SUCCESS",
user=user_str
)
# Если подписка есть, продолжаем обработку
return await handler(event, data)

View File

@@ -0,0 +1,82 @@
from typing import Callable, Awaitable, Any, Dict
from aiogram import BaseMiddleware
from aiogram.types import TelegramObject, Message, CallbackQuery, Update
from time import time
from middleware.loggers import loggers # ваш логгер
class TimingMiddleware(BaseMiddleware):
"""
Middleware для измерения времени выполнения хендлеров.
Зачем нужен:
- Мониторинг производительности хендлеров
- Выявление медленных запросов
- Оптимизация кода бота
"""
async def __call__(
self,
handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]],
event: TelegramObject,
data: Dict[str, Any],
perm: str = None,
) -> Any:
"""
Измеряет время выполнения хендлера.
"""
start_time: float = time()
try:
result = await handler(event, data)
return result
finally:
execution_time: float = time() - start_time
# Получаем информацию о пользователе безопасным способом
user_str: str = "@System"
# Для Message и CallbackQuery
if isinstance(event, (Message, CallbackQuery)) and hasattr(event, 'from_user') and event.from_user:
user = event.from_user
user_str = f"@{user.username}" if user.username else f"id{user.id}"
# Для Update (который содержит message или callback_query)
elif isinstance(event, Update):
# Пытаемся найти пользователя в различных полях Update
user_object = None
if event.message and event.message.from_user:
user_object = event.message.from_user
elif event.edited_message and event.edited_message.from_user:
user_object = event.edited_message.from_user
elif event.callback_query and event.callback_query.from_user:
user_object = event.callback_query.from_user
elif event.channel_post and event.channel_post.from_user:
user_object = event.channel_post.from_user
elif event.edited_channel_post and event.edited_channel_post.from_user:
user_object = event.edited_channel_post.from_user
if user_object:
user_str = f"@{user_object.username}" if user_object.username else f"id{user_object.id}"
# Логируем время выполнения
if execution_time > 1.0 and perm: # Медленные запросы
loggers.warning(
text=f"Медленный хендлер: {execution_time:.2f}сек",
log_type="SLOW_HANDLER",
user=user_str
)
elif execution_time > 0.5 and perm == "medium": # Средние запросы
loggers.info(
text=f"Среднее время выполнения: {execution_time:.3f}сек",
log_type="HANDLER_TIMING",
user=user_str
)
elif perm == "fast": # Быстрые запросы
loggers.debug(
text=f"Быстрое выполнение: {execution_time:.3f}сек",
log_type="HANDLER_TIMING_FAST",
user=user_str
)