diff --git a/bot/middlewares/banwords_mdw.py b/bot/middlewares/banwords_mdw.py
index baf1683..99e2ea3 100644
--- a/bot/middlewares/banwords_mdw.py
+++ b/bot/middlewares/banwords_mdw.py
@@ -1,11 +1,16 @@
"""
Middleware для проверки сообщений на запрещённые слова (банворды).
-...
-✅ ИСПРАВЛЕНО:
-- ❌ PatternError: bad character range 🀀-\\\\ (исправлено экранирование Unicode)
-- ✅ НЕТ уведомлений в режиме тишины
+
+✅ ИСПРАВЛЕНО:
+- Полная нормализация текста с использованием UNICODE_MAP
+- Удаление повторов символов (леееейн → лейн)
+- Игнорирование разделителей (л.е.й.н → лейн)
+- Поддержка всех типов проверок (SUBSTRING, LEMMA, PART, CONFLICT)
+- Белый список и режимы тишины/конфликта
+- Нет уведомлений в режиме тишины
"""
-from typing import Callable, Dict, Any, Awaitable, Optional
+
+from typing import Callable, Dict, Any, Awaitable, Optional, Set
import re
import unicodedata
@@ -13,7 +18,7 @@ from aiogram import BaseMiddleware
from aiogram.types import Message, InlineKeyboardMarkup, InlineKeyboardButton
from aiogram.exceptions import TelegramBadRequest
-from configs import settings
+from configs import settings, UNICODE_MAP, LATIN_TO_CYRILLIC, CYRILLIC_NORMALIZE
from database import get_manager, BanWordType
from bot.special import process_text, extract_words, get_lemma
from middleware.loggers import logger
@@ -21,10 +26,102 @@ from middleware.loggers import logger
__all__ = ("BanWordsMiddleware",)
+class TextNormalizer:
+ """
+ Класс для многоступенчатой нормализации текста.
+ Приводит различные юникод-символы к базовым буквам,
+ удаляет повторы, убирает разделители.
+ """
+
+ # Объединяем все словари замен в один
+ FULL_MAP = {}
+ FULL_MAP.update(LATIN_TO_CYRILLIC)
+ FULL_MAP.update(CYRILLIC_NORMALIZE)
+ FULL_MAP.update(UNICODE_MAP)
+
+ # Символы-разделители, которые могут быть вставлены между буквами
+ SEPARATORS = re.compile(r'[\s\.\-_,;:|]+', re.UNICODE)
+
+ # Паттерн для поиска повторяющихся букв (3+ раза)
+ REPEAT_PATTERN = re.compile(r'([а-яёa-z])\1{2,}', re.IGNORECASE)
+
+ @classmethod
+ def normalize_characters(cls, text: str) -> str:
+ """
+ Заменяет все символы из FULL_MAP на их базовые эквиваленты.
+ Проходит по строке посимвольно для максимальной замены.
+ """
+ result = []
+ for ch in text:
+ # Сначала пробуем заменить по карте
+ if ch in cls.FULL_MAP:
+ result.append(cls.FULL_MAP[ch])
+ else:
+ result.append(ch)
+ # Приводим к нижнему регистру после замен (чтобы избежать потери регистра в карте)
+ return ''.join(result).lower()
+
+ @classmethod
+ def remove_separators(cls, text: str) -> str:
+ """Удаляет разделители между буквами (пробелы, точки и т.д.)"""
+ return cls.SEPARATORS.sub('', text)
+
+ @classmethod
+ def collapse_repeats(cls, text: str, max_repeat: int = 2) -> str:
+ """
+ Заменяет повторения символов более max_repeat подряд на один/два символа.
+ По умолчанию оставляет максимум 2 (леееейн → леейн? но обычно хватит 2).
+ Можно настроить: для банворда "лейн" превратит "леееейн" в "леейн", что всё равно содержит "лейн".
+ """
+ def repl(m):
+ ch = m.group(1)
+ # Оставляем два символа, чтобы не терять удвоенные буквы (например, "дд" в слове "поддон")
+ return ch * 2
+ return cls.REPEAT_PATTERN.sub(repl, text)
+
+ @classmethod
+ def normalize_full(cls, text: str, remove_sep: bool = True, collapse: bool = True) -> str:
+ """
+ Полная нормализация:
+ 1. Unicode нормализация (NFKC) для разложения составных символов
+ 2. Замена по карте
+ 3. Приведение к нижнему регистру
+ 4. Удаление разделителей (опционально)
+ 5. Схлопывание повторов (опционально)
+ """
+ # NFKC разлагает символы типа "ё" в "е" + умляут, но нам лучше оставить как есть,
+ # т.к. у нас есть прямые замены. Однако для совместимости применим.
+ text = unicodedata.normalize('NFKC', text)
+ # Замена символов
+ text = cls.normalize_characters(text)
+ # Удаление разделителей
+ if remove_sep:
+ text = cls.remove_separators(text)
+ # Схлопывание повторов
+ if collapse:
+ text = cls.collapse_repeats(text)
+ return text
+
+ @classmethod
+ def normalize_for_part(cls, text: str) -> str:
+ """
+ Нормализация для типа PART:
+ - Полная нормализация
+ - Удаление всех не-буквенных символов (кроме пробелов)
+ - Приведение к нижнему регистру
+ """
+ text = cls.normalize_full(text, remove_sep=False, collapse=True)
+ # Оставляем только буквы и пробелы
+ text = re.sub(r'[^а-яёa-z\s]', '', text, flags=re.IGNORECASE)
+ text = re.sub(r'\s+', ' ', text).strip()
+ return text.lower()
+
+
class BanWordsMiddleware(BaseMiddleware):
def __init__(self):
super().__init__()
self.manager = get_manager()
+ self.normalizer = TextNormalizer()
async def __call__(
self,
@@ -32,53 +129,110 @@ class BanWordsMiddleware(BaseMiddleware):
event: Message,
data: Dict[str, Any]
) -> Any:
+ # Проверяем наличие текста или подписи
if not event.text and not event.caption:
return await handler(event, data)
message_text = event.text or event.caption
+
+ # Игнорируем команды
if message_text.startswith('/'):
return await handler(event, data)
- # Админ проверка
+ # Проверка на админа
user_id = event.from_user.id
is_super_admin = user_id in settings.OWNER_ID
is_admin = is_super_admin or self.manager.is_admin_cached(user_id)
if is_admin:
return await handler(event, data)
+ # Проверяем сообщение на спам
spam_result = await self._check_message(message_text)
if spam_result:
await self._handle_spam(event, spam_result)
- return None
+ return None # Сообщение удалено, дальше не обрабатываем
return await handler(event, data)
- @staticmethod
- def _normalize_universal(text: str, mode: str = "strict") -> str:
- """✅ ИСПРАВЛЕНО: Универсальная нормализация для всех типов проверок"""
- # БЕЗОПАСНАЯ нормализация - убираем все проблемные символы
- text = unicodedata.normalize('NFKC', text)
-
- if mode == "strict": # PART - сохраняем буквы, цифры, пробелы
- # ✅ ИСПРАВЛЕНО: безопасный паттерн только для букв/цифр/пробелов
- text = re.sub(r'[^a-zA-Zа-яА-ЯёЁ0-9\s]', '', text)
- text = re.sub(r'\s+', ' ', text).strip()
- else: # SUBSTRING/LEMMA - только буквы и цифры
- text = re.sub(r'[^a-zA-Zа-яА-ЯёЁ0-9]', '', text)
-
- return BanWordsMiddleware._normalize_repeated_chars(text)
+ async def _check_message(self, text: str) -> Optional[Dict[str, str]]:
+ """
+ Многоступенчатая проверка текста.
+ Возвращает словарь с причиной блокировки или None.
+ """
+ # 1. Повторяющиеся символы (например, "леееейн") — блокируем сразу
+ repeat_result = self._check_repeated_chars(text)
+ if repeat_result:
+ return repeat_result
- @staticmethod
- def _normalize_repeated_chars(text: str) -> str:
- """Убирает повторения >2 (лееееин → лейн)"""
- return re.sub(r'([а-яёa-z])\\1{2,}', r'\\1\\1', text, flags=re.IGNORECASE)
+ # 2. Получаем кэшированные списки
+ substring_words = self.manager.get_banwords_cached(BanWordType.SUBSTRING)
+ lemma_words = self.manager.get_banwords_cached(BanWordType.LEMMA)
+ part_words = self.manager.get_banwords_cached(BanWordType.PART)
+ conflict_substring = self.manager.get_banwords_cached(BanWordType.CONFLICT_SUBSTRING)
+ conflict_lemma = self.manager.get_banwords_cached(BanWordType.CONFLICT_LEMMA)
+
+ # 3. Белый список
+ if self.manager.is_whitelisted(text):
+ logger.debug(f"⏭️ Пропуск по белому списку: {text[:30]}", log_type="BANWORDS")
+ return None
+
+ # 4. Режим тишины
+ if await self.manager.is_silence_active():
+ return {"word": "[режим тишины]", "type": "silence"}
+
+ # 5. Режим конфликта (более мягкие правила)
+ if await self.manager.is_conflict_active():
+ # Проверка conflict_substring (с нормализацией)
+ normalized_text = self.normalizer.normalize_full(text, remove_sep=True, collapse=True)
+ for word in conflict_substring:
+ norm_word = self.normalizer.normalize_full(word, remove_sep=True, collapse=True)
+ if norm_word in normalized_text:
+ return {"word": word, "type": "conflict_substring"}
+
+ # conflict_lemma
+ for word_text in extract_words(text):
+ lemma = get_lemma(word_text)
+ if lemma in conflict_lemma:
+ return {"word": lemma, "type": "conflict_lemma"}
+
+ # Если в конфликтном режиме ничего не найдено — пропускаем
+ return None
+
+ # 6. Обычный режим: проверка substring (с удалением разделителей и схлопыванием повторов)
+ normalized_text = self.normalizer.normalize_full(text, remove_sep=True, collapse=True)
+ for word in substring_words:
+ norm_word = self.normalizer.normalize_full(word, remove_sep=True, collapse=True)
+ if norm_word in normalized_text:
+ logger.info(f"✅ SUBSTRING: '{word}'", log_type="BANWORDS")
+ return {"word": word, "type": "substring"}
+
+ # 7. Проверка part (строгая нормализация, только буквы и пробелы)
+ part_normalized = self.normalizer.normalize_for_part(text)
+ for part in part_words:
+ norm_part = self.normalizer.normalize_for_part(part)
+ if norm_part in part_normalized:
+ logger.info(f"✅ PART: '{part}'", log_type="BANWORDS")
+ return {"word": part, "type": "part"}
+
+ # 8. Проверка lemma
+ for word_text in extract_words(text):
+ # Для леммы тоже применяем нормализацию (удаляем разделители, схлопываем повторы)
+ normalized_word = self.normalizer.normalize_full(word_text, remove_sep=True, collapse=True)
+ lemma = get_lemma(normalized_word)
+ if lemma in lemma_words:
+ logger.info(f"✅ LEMMA: '{lemma}' из '{word_text}'", log_type="BANWORDS")
+ return {"word": lemma, "type": "lemma"}
+
+ return None
def _check_repeated_chars(self, text: str) -> Optional[Dict[str, str]]:
- """🔥 Блокирует 3+ повторяющиеся символы подряд"""
- # ✅ ИСПРАВЛЕНО: безопасный паттерн только для букв
- pattern = r'([а-яёa-zA-Z])\\1{2,}'
- matches = re.finditer(pattern, text, flags=re.IGNORECASE)
-
+ """
+ Проверяет на наличие 3+ повторяющихся букв подряд.
+ Использует сырой текст без нормализации (чтобы поймать "леееейн").
+ """
+ # Ищем повторения букв (только кириллица/латиница)
+ pattern = re.compile(r'([а-яёa-zA-Z])\1{2,}', re.IGNORECASE)
+ matches = pattern.finditer(text)
for match in matches:
char = match.group(1)
count = len(match.group(0))
@@ -87,97 +241,31 @@ class BanWordsMiddleware(BaseMiddleware):
return {"word": f"'{match.group(0)}' ({count}x)", "type": "repeated_chars"}
return None
- async def _check_message(self, text: str) -> Optional[Dict[str, str]]:
- text_lower = text.lower()
-
- # 🔥 1. Повторяющиеся символы (лееееин)
- repeat_result = self._check_repeated_chars(text_lower)
- if repeat_result:
- return repeat_result
-
- # 2. ✅ БЕЗОПАСНАЯ нормализация
- text_universal = self._normalize_universal(text_lower, "strict") # PART
- text_loose = self._normalize_universal(text_lower) # SUBSTRING/LEMMA
- text_processed = process_text(text_lower)
-
- logger.debug(
- f"🔍 | universal='{text_universal}' | loose='{text_loose}' | proc='{text_processed}'",
- log_type="BANWORDS"
- )
-
- # 3. WHITELIST
- if (self.manager.is_whitelisted(text_processed) or
- self.manager.is_whitelisted(text_loose) or
- self.manager.is_whitelisted(text_universal)):
- return None
-
- # 4. SILENCE MODE
- if await self.manager.is_silence_active():
- return {"word": "[режим тишины]", "type": "silence"}
-
- # 5. CONFLICT MODE
- if await self.manager.is_conflict_active():
- for word in self.manager.get_banwords_cached(BanWordType.CONFLICT_SUBSTRING):
- word_norm = self._normalize_universal(word.lower(), "loose")
- if word_norm in text_loose:
- return {"word": word, "type": "conflict_substring"}
-
- conflict_lemma = self.manager.get_banwords_cached(BanWordType.CONFLICT_LEMMA)
- for word_text in extract_words(text_processed):
- lemma = get_lemma(self._normalize_repeated_chars(word_text))
- if lemma in conflict_lemma:
- return {"word": lemma, "type": "conflict_lemma"}
-
- # 6. SUBSTRING
- for word in self.manager.get_banwords_cached(BanWordType.SUBSTRING):
- word_norm = self._normalize_universal(word.lower())
- if word_norm in text_loose:
- logger.info(f"✅ SUBSTRING: '{word}' → '{text_loose}'", log_type="BANWORDS")
- return {"word": word, "type": "substring"}
-
- # 7. PART (строгая нормализация)
- for part in self.manager.get_banwords_cached(BanWordType.PART):
- part_norm = self._normalize_universal(part.lower(), "strict")
- if part_norm in text_universal:
- logger.info(f"✅ PART: '{part}' → '{text_universal}'", log_type="BANWORDS")
- return {"word": part, "type": "part"}
-
- # 8. LEMMA
- for word_text in extract_words(text_processed):
- lemma = get_lemma(self._normalize_repeated_chars(word_text))
- if lemma in self.manager.get_banwords_cached(BanWordType.LEMMA):
- logger.info(f"✅ LEMMA: '{lemma}' из '{word_text}'", log_type="BANWORDS")
- return {"word": lemma, "type": "lemma"}
-
- return None
-
async def _handle_spam(self, message: Message, spam_result: Dict[str, str]) -> None:
+ """Обрабатывает спам-сообщение: удаляет, логирует, уведомляет (кроме silence)"""
user = message.from_user
matched_word = spam_result["word"]
match_type = spam_result["type"]
message_text = message.text or message.caption or "[нет текста]"
- # ✅ ПРОВЕРКА: НЕ отправляем уведомления в режиме тишины
+ # В режиме тишины удаляем молча
if match_type == "silence":
- # Удаляем сообщение молча
try:
await message.delete()
- logger.info(f"🔇 SILENCE: @{user.username or user.id} удалено молча",
- log_type="BANWORDS")
+ logger.info(f"🔇 SILENCE: @{user.username or user.id} удалено молча", log_type="BANWORDS")
except TelegramBadRequest as e:
logger.error(f"❌ Не удалено (silence): {e}", log_type="BANWORDS")
return
- # Удаляем
+ # Удаляем сообщение
try:
await message.delete()
- logger.info(f"🚫 @{user.username or user.id}: '{matched_word}' ({match_type})",
- log_type="BANWORDS")
+ logger.info(f"🚫 @{user.username or user.id}: '{matched_word}' ({match_type})", log_type="BANWORDS")
except TelegramBadRequest as e:
logger.error(f"❌ Не удалено: {e}", log_type="BANWORDS")
return
- # Логируем в БД (только НЕ silence)
+ # Логируем в БД
await self.manager.log_spam(
user_id=user.id,
username=user.username or f"id{user.id}",
@@ -187,17 +275,17 @@ class BanWordsMiddleware(BaseMiddleware):
match_type=match_type
)
- # Уведомляем админов (только НЕ silence)
+ # Уведомляем админов
await self._notify_admins(message, matched_word, match_type, message_text)
- # Остальные методы без изменений...
async def _notify_admins(
- self,
- message: Message,
- matched_word: str,
- match_type: str,
- message_text: str
- ) -> None:
+ self,
+ message: Message,
+ matched_word: str,
+ match_type: str,
+ message_text: str
+) -> None:
+ """Отправляет уведомление об удалении в админ-чат (берёт ID из БД)"""
user = message.from_user
username = f"@{user.username}" if user.username else f"ID: {user.id}"
spam_count = await self.manager.get_user_spam_count(user.id)
@@ -205,17 +293,17 @@ class BanWordsMiddleware(BaseMiddleware):
source_thread_id = message.message_thread_id
notification_text = (
- f"🚫 Удалено сообщение\\n\\n"
- f"👤 Пользователь: {username}\\n"
- f"🆔 ID: {user.id}\\n"
- f"📊 Нарушений: {spam_count}\\n\\n"
- f"💬 Чат: {self._escape_html(chat_title)}\\n"
- f"🆔 Chat ID: {message.chat.id}\\n"
- f"{'📌 Topic ID: {source_thread_id}\\n' if source_thread_id else ''}"
- f"🔗 Message ID: {message.message_id}\\n\\n"
- f"🔍 Триггер: {self._escape_html(matched_word)}\\n"
- f"📝 Тип: {self._get_type_emoji(match_type)} {self._escape_html(match_type)}\\n\\n"
- f"💬 Текст:\\n{self._escape_html(message_text[:500])}"
+ f"🚫 Удалено сообщение\n\n"
+ f"👤 Пользователь: {username}\n"
+ f"🆔 ID: {user.id}\n"
+ f"📊 Нарушений: {spam_count}\n\n"
+ f"💬 Чат: {self._escape_html(chat_title)}\n"
+ f"🆔 Chat ID: {message.chat.id}\n"
+ f"{'📌 Topic ID: ' + str(source_thread_id) + '\n' if source_thread_id else ''}"
+ f"🔗 Message ID: {message.message_id}\n\n"
+ f"🔍 Триггер: {self._escape_html(matched_word)}\n"
+ f"📝 Тип: {self._get_type_emoji(match_type)} {self._escape_html(match_type)}\n\n"
+ f"💬 Текст:\n{self._escape_html(message_text[:500])}"
)
keyboard = InlineKeyboardMarkup(inline_keyboard=[
@@ -227,16 +315,18 @@ class BanWordsMiddleware(BaseMiddleware):
])
try:
- admin_chat_id = getattr(settings, "ADMIN_CHAT_ID", None)
- admin_thread_id = getattr(settings, "ADMIN_THREAD_ID", None) or None
+ # ✅ Получаем настройки из БД (динамические, установленные через /settings)
+ admin_chat_id = await self.manager.get_bot_setting("admin_chat_id")
+ admin_thread_id = await self.manager.get_bot_setting("admin_thread_id")
- await message.bot.send_message(
- chat_id=admin_chat_id,
- text=notification_text,
- reply_markup=keyboard,
- parse_mode="HTML",
- message_thread_id=admin_thread_id
- )
+ if admin_chat_id:
+ await message.bot.send_message(
+ chat_id=int(admin_chat_id),
+ text=notification_text,
+ reply_markup=keyboard,
+ parse_mode="HTML",
+ message_thread_id=int(admin_thread_id) if admin_thread_id else None
+ )
except Exception as e:
logger.error(f"❌ Уведомление админам: {e}", log_type="BANWORDS")
diff --git a/configs/config.py b/configs/config.py
index 77bb8a4..6eb0423 100644
--- a/configs/config.py
+++ b/configs/config.py
@@ -26,6 +26,7 @@ class _Settings(BaseSettings):
# Параметры сообщений
PARSE_MODE: str = "HTML"
PREFIX: str = "/!.&?"
+ LOG_LEVEL: str = "TRACE"
# Разрешения и логирование
BOT_EDIT: bool = False
diff --git a/database/manager.py b/database/manager.py
index 7c1f09c..f29b405 100644
--- a/database/manager.py
+++ b/database/manager.py
@@ -813,7 +813,7 @@ class BanWordsManager:
return settings.get(key)
async def init_default_bot_settings(self) -> None:
- """Инициализирует настройки по умолчанию из .env"""
+ """Инициализирует настройки по умолчанию из .env, только если они ещё не установлены"""
try:
from configs import settings
@@ -825,10 +825,15 @@ class BanWordsManager:
}
for key, value in defaults.items():
- if value: # Не null
- await self.set_bot_setting(key, str(value))
+ if value is not None: # В .env значение задано
+ existing = await self.get_bot_setting(key)
+ if existing is None:
+ await self.set_bot_setting(key, str(value))
+ logger.debug(f"Установлена настройка {key} из .env", log_type="SETTINGS")
+ else:
+ logger.debug(f"Настройка {key} уже существует ({existing}), пропускаем", log_type="SETTINGS")
- logger.info("✅ Настройки бота инициализированы из .env", log_type="SETTINGS")
+ logger.info("✅ Настройки бота инициализированы из .env (существующие сохранены)", log_type="SETTINGS")
except Exception as e:
logger.warning(f"Не удалось инициализировать настройки из .env: {e}", log_type="SETTINGS")
diff --git a/middleware/loggers/logs.py b/middleware/loggers/logs.py
index f6a0479..d2d0566 100644
--- a/middleware/loggers/logs.py
+++ b/middleware/loggers/logs.py
@@ -1,5 +1,5 @@
"""
-Кастомный логгер с поддержкой декораторов и прямого вызова
+Кастомный логгер с поддержством декораторов и прямого вызова
"""
from sys import stderr
from pathlib import Path
@@ -43,7 +43,6 @@ class Logger:
'{extra[user]} | {message}'
)
-
def __init__(self, system_name: str = 'PRIMO') -> None:
"""
Инициализация логгера.
@@ -58,6 +57,11 @@ class Logger:
"""
Настройка обработчиков Loguru: консоль и файлы.
+ Учитывает переменную LOG_LEVEL из settings.
+ LOG_LEVEL определяет минимальный уровень для консоли и общего файла,
+ а также влияет на то, какие отдельные файлы создаются:
+ создаются только файлы для уровней >= LOG_LEVEL.
+
Args:
start: Если True, сразу логирует запуск проекта
"""
@@ -67,6 +71,15 @@ class Logger:
# Полная очистка настроек
nlogger.remove()
+ # Определяем уровень логирования из настроек
+ log_level_str = getattr(settings, 'LOG_LEVEL', 'INFO').upper()
+ # Проверка на допустимость
+ try:
+ log_level_no = nlogger.level(log_level_str).no
+ except ValueError:
+ log_level_str = 'INFO'
+ log_level_no = nlogger.level('INFO').no
+
# Создание директории для файловых логов
log_dir: Path = settings.LOG_DIR
if not log_dir.exists():
@@ -78,45 +91,49 @@ class Logger:
sink=stderr,
format=self._log_format,
colorize=True,
- level='INFO',
+ level=log_level_str,
filter=lambda rec: rec['extra'].get('log_type') != 'TRACE'
)
# Файловые логи
if settings.LOG_FILE:
- # Общий лог со всеми уровнями
+ # Общий лог со всеми уровнями (начиная с LOG_LEVEL)
nlogger.add(
sink=log_dir / 'bot.log',
rotation=settings.LOG_ROTATION,
retention=settings.LOG_RETENTION,
format=self._log_format,
- level='DEBUG',
+ level=log_level_str,
enqueue=True,
backtrace=True,
diagnose=True,
encoding='utf-8'
)
- # Раздельные логи по уровням
- log_levels = {
- 'INFO': 'info.log',
- 'WARNING': 'warning.log',
- 'ERROR': 'error.log',
- 'CRITICAL': 'critical.log'
- }
+ # Раздельные логи по уровням – создаём только для уровней >= LOG_LEVEL
+ # Список интересующих нас уровней (в порядке возрастания)
+ level_configs = [
+ ('DEBUG', 'debug.log'),
+ ('INFO', 'info.log'),
+ ('SUCCESS', 'success.log'),
+ ('WARNING', 'warning.log'),
+ ('ERROR', 'error.log'),
+ ('CRITICAL', 'critical.log')
+ ]
-
- for level_name, filename in log_levels.items():
- nlogger.add(
- sink=log_dir / filename,
- rotation='10 MB',
- retention=settings.LOG_RETENTION,
- format=self._log_format,
- level=level_name,
- filter=lambda rec, lvl=level_name: rec['level'].name == lvl,
- enqueue=True,
- encoding='utf-8'
- )
+ for level_name, filename in level_configs:
+ level_no = nlogger.level(level_name).no
+ if level_no >= log_level_no:
+ nlogger.add(
+ sink=log_dir / filename,
+ rotation='10 MB',
+ retention=settings.LOG_RETENTION,
+ format=self._log_format,
+ level=level_name,
+ filter=lambda rec, lvl=level_name: rec['level'].name == lvl,
+ enqueue=True,
+ encoding='utf-8'
+ )
self._setup_done = True
@@ -128,7 +145,6 @@ class Logger:
log_type='START'
)
-
@staticmethod
def format_user(event: Optional[EventType] = None) -> str:
"""
@@ -322,8 +338,8 @@ class Logger:
user: Optional[str] = None,
message: Optional[EventType] = None
) -> None:
- """Логирование успешного выполнения (уровень INFO)"""
- self.log_entry('INFO', f"✓ {text}", log_type, user, message)
+ """Логирование успешного выполнения (уровень SUCCESS)"""
+ self.log_entry('SUCCESS', text, log_type, user, message)
# ================= КОНТЕКСТНЫЕ МЕНЕДЖЕРЫ =================