Files
PrimoGuardBot-/database/manager.py

642 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Высокоуровневый менеджер для работы с банвордами.
Упрощает использование repository в handlers и middleware.
"""
from typing import Set, Optional, List, Dict, Any
from datetime import datetime, timezone
from middleware.loggers import logger
from .database import Database, get_db
from .repository import BanWordsRepository
from .models import BanWordType, SpamStat, SpamLog, TempBanWord
from sqlalchemy import select, delete, func, desc
__all__ = ("BanWordsManager", "get_manager")
class BanWordsManager:
"""
Менеджер для удобной работы с банвордами.
Предоставляет упрощённый API для handlers и middleware.
Attributes:
db: Экземпляр Database
repo: Repository для CRUD операций
"""
def __init__(self, db: Optional[Database] = None):
"""
Args:
db: Экземпляр Database (если None, берётся глобальный)
"""
self.db = db or get_db()
self.repo = BanWordsRepository(self.db)
# Кэш для часто используемых данных
self._cache_banwords: Optional[dict] = None
self._cache_whitelist: Optional[Set[str]] = None
self._cache_admins: Optional[Set[int]] = None
self._cache_updated_at: Optional[datetime] = None
async def init(self) -> None:
"""Инициализирует базу данных и загружает кэш"""
await self.db.init()
await self.refresh_cache()
logger.info("BanWordsManager инициализирован", log_type="DATABASE")
async def close(self) -> None:
"""Закрывает соединение с БД"""
await self.db.close()
# === CACHE MANAGEMENT ===
async def refresh_cache(self) -> None:
"""Обновляет кэш из БД"""
try:
self._cache_banwords = await self.repo.get_all_banwords()
temp_banwords = await self.repo.get_all_temp_banwords()
# Объединяем постоянные и временные банворды
for word_type, words in temp_banwords.items():
if word_type in self._cache_banwords:
self._cache_banwords[word_type] |= words
self._cache_whitelist = await self.repo.get_whitelist()
self._cache_admins = await self.repo.get_admins()
self._cache_updated_at = datetime.now()
logger.debug("Кэш банвордов обновлён", log_type="DATABASE")
except Exception as e:
logger.error(f"Ошибка обновления кэша: {e}", log_type="DATABASE")
def invalidate_cache(self) -> None:
"""Сбрасывает кэш (требует refresh_cache)"""
self._cache_banwords = None
self._cache_whitelist = None
self._cache_admins = None
self._cache_updated_at = None
# === BANWORDS (с кэшем) ===
async def add_banword(
self,
word: str,
word_type: BanWordType,
added_by: Optional[int] = None,
reason: Optional[str] = None,
refresh_cache: bool = True
) -> bool:
"""
Добавляет банворд и обновляет кэш.
Args:
word: Слово
word_type: Тип
added_by: ID админа
reason: Причина
refresh_cache: Обновить кэш после добавления
Returns:
bool: True если добавлен
"""
result = await self.repo.add_banword(word, word_type, added_by, reason)
if result and refresh_cache:
await self.refresh_cache()
return result
async def remove_banword(
self,
word: str,
word_type: BanWordType,
refresh_cache: bool = True
) -> bool:
"""Удаляет банворд и обновляет кэш"""
result = await self.repo.remove_banword(word, word_type)
if result and refresh_cache:
await self.refresh_cache()
return result
def get_banwords_cached(self, word_type: BanWordType) -> Set[str]:
"""
Получает банворды из кэша (быстро).
Args:
word_type: Тип банвордов
Returns:
Set[str]: Набор слов из кэша
"""
if self._cache_banwords is None:
logger.warning("Кэш не инициализирован", log_type="DATABASE")
return set()
return self._cache_banwords.get(word_type, set())
async def get_banwords(self, word_type: BanWordType) -> Set[str]:
"""Получает банворды напрямую из БД (без кэша)"""
return await self.repo.get_banwords(word_type)
# === TEMPORARY BANWORDS ===
async def add_temp_banword(
self,
word: str,
word_type: BanWordType,
minutes: int,
added_by: Optional[int] = None,
refresh_cache: bool = True
) -> bool:
"""Добавляет временный банворд"""
result = await self.repo.add_temp_banword(
word, word_type, minutes, added_by
)
if result and refresh_cache:
await self.refresh_cache()
return result
async def remove_temp_banword(
self,
word: str,
word_type: BanWordType,
refresh_cache: bool = True
) -> bool:
"""Удаляет временный банворд"""
result = await self.repo.remove_temp_banword(word, word_type)
if result and refresh_cache:
await self.refresh_cache()
return result
async def cleanup_expired(self) -> int:
"""
Очищает истёкшие временные банворды.
Вызывается периодически (например, раз в минуту).
Returns:
int: Количество удалённых записей
"""
deleted = await self.repo.cleanup_expired_temp_banwords()
if deleted > 0:
await self.refresh_cache()
return deleted
# === WHITELIST ===
async def add_whitelist(
self,
word: str,
added_by: Optional[int] = None,
reason: Optional[str] = None,
refresh_cache: bool = True
) -> bool:
"""Добавляет слово в белый список"""
result = await self.repo.add_whitelist(word, added_by, reason)
if result and refresh_cache:
await self.refresh_cache()
return result
async def remove_whitelist(
self,
word: str,
refresh_cache: bool = True
) -> bool:
"""Удаляет слово из белого списка"""
result = await self.repo.remove_whitelist(word)
if result and refresh_cache:
await self.refresh_cache()
return result
def get_whitelist_cached(self) -> Set[str]:
"""Получает белый список из кэша"""
if self._cache_whitelist is None:
logger.warning("Кэш whitelist не инициализирован", log_type="DATABASE")
return set()
return self._cache_whitelist
def is_whitelisted(self, text: str) -> bool:
"""
Проверяет, содержит ли текст слово из белого списка.
Args:
text: Текст для проверки (lowercase)
Returns:
bool: True если найдено исключение
"""
whitelist = self.get_whitelist_cached()
return any(word in text for word in whitelist)
# === ADMINS ===
async def add_admin(
self,
user_id: int,
added_by: Optional[int] = None,
refresh_cache: bool = True
) -> bool:
"""Добавляет администратора"""
result = await self.repo.add_admin(user_id, added_by)
if result and refresh_cache:
await self.refresh_cache()
return result
async def remove_admin(
self,
user_id: int,
refresh_cache: bool = True
) -> bool:
"""Удаляет администратора"""
result = await self.repo.remove_admin(user_id)
if result and refresh_cache:
await self.refresh_cache()
return result
def get_admins_cached(self) -> Set[int]:
"""Получает список админов из кэша"""
if self._cache_admins is None:
logger.warning("Кэш админов не инициализирован", log_type="DATABASE")
return set()
return self._cache_admins
def is_admin_cached(self, user_id: int) -> bool:
"""
Проверяет, является ли пользователь админом (из кэша).
Args:
user_id: Telegram ID
Returns:
bool: True если админ
"""
return user_id in self.get_admins_cached()
async def is_admin(self, user_id: int) -> bool:
"""Проверяет админа напрямую из БД"""
return await self.repo.is_admin(user_id)
# === SETTINGS (режимы silence/conflict) ===
async def set_silence_mode(self, minutes: int) -> datetime:
"""
Включает режим тишины на указанное время.
Args:
minutes: Длительность в минутах
Returns:
datetime: Время окончания режима
"""
expires_at = datetime.now().timestamp() + (minutes * 60)
await self.repo.set_setting("silence_until", str(expires_at))
logger.info(
f"Режим тишины активирован на {minutes} мин",
log_type="SILENCE"
)
return datetime.fromtimestamp(expires_at)
async def disable_silence_mode(self) -> None:
"""Отключает режим тишины"""
await self.repo.delete_setting("silence_until")
logger.info("Режим тишины отключён", log_type="SILENCE")
async def is_silence_active(self) -> bool:
"""Проверяет, активен ли режим тишины"""
silence_until_str = await self.repo.get_setting("silence_until")
if not silence_until_str:
return False
try:
silence_until = float(silence_until_str)
now = datetime.now().timestamp()
if now >= silence_until:
# Время истекло - удаляем настройку
await self.disable_silence_mode()
return False
return True
except (ValueError, TypeError):
return False
async def set_conflict_mode(self, minutes: int) -> datetime:
"""
Включает режим антиконфликта на указанное время.
Args:
minutes: Длительность в минутах
Returns:
datetime: Время окончания режима
"""
expires_at = datetime.now().timestamp() + (minutes * 60)
await self.repo.set_setting("conflict_until", str(expires_at))
logger.info(
f"Режим антиконфликта активирован на {minutes} мин",
log_type="CONFLICT"
)
return datetime.fromtimestamp(expires_at)
async def disable_conflict_mode(self) -> None:
"""Отключает режим антиконфликта"""
await self.repo.delete_setting("conflict_until")
logger.info("Режим антиконфликта отключён", log_type="CONFLICT")
async def is_conflict_active(self) -> bool:
"""Проверяет, активен ли режим антиконфликта"""
conflict_until_str = await self.repo.get_setting("conflict_until")
if not conflict_until_str:
return False
try:
conflict_until = float(conflict_until_str)
now = datetime.now().timestamp()
if now >= conflict_until:
# Время истекло
await self.disable_conflict_mode()
return False
return True
except (ValueError, TypeError):
return False
# === STATISTICS ===
async def log_spam(
self,
user_id: int,
username: str,
chat_id: int,
message_text: str,
matched_word: str,
match_type: str
) -> None:
"""Логирует удаление спам-сообщения"""
await self.repo.log_spam_deletion(
user_id=user_id,
username=username,
chat_id=chat_id,
message_text=message_text,
matched_word=matched_word,
match_type=match_type
)
async def get_spam_stats(
self,
limit: int = 100,
user_id: Optional[int] = None
) -> List[SpamStat]:
"""Получает статистику удалений"""
return await self.repo.get_spam_stats(limit, user_id)
async def get_user_spam_count(self, user_id: int) -> int:
"""Получает количество удалённых сообщений пользователя"""
return await self.repo.get_user_spam_count(user_id)
async def get_top_spammers(self, limit: int = 10) -> List[tuple[int, int]]:
"""Получает топ спамеров"""
return await self.repo.get_top_spammers(limit)
# === INFO ===
async def get_stats(self) -> dict:
"""Получает общую статистику"""
db_stats = await self.repo.get_stats()
# Добавляем информацию о кэше
cache_info = {
'cache_active': self._cache_banwords is not None,
'cache_updated_at': self._cache_updated_at.isoformat() if self._cache_updated_at else None
}
return {**db_stats, **cache_info}
async def get_all_words_list(self) -> dict:
"""
Получает все слова для команды /listwords.
Returns:
dict: Словарь со всеми категориями слов
"""
banwords = await self.repo.get_all_banwords()
temp_banwords = await self.repo.get_all_temp_banwords()
whitelist = await self.repo.get_whitelist()
admins = await self.repo.get_admins()
return {
'substring': banwords.get(BanWordType.SUBSTRING, set()),
'lemma': banwords.get(BanWordType.LEMMA, set()),
'part': banwords.get(BanWordType.PART, set()),
'conflict_substring': banwords.get(BanWordType.CONFLICT_SUBSTRING, set()),
'conflict_lemma': banwords.get(BanWordType.CONFLICT_LEMMA, set()),
'temp_substring': temp_banwords.get(BanWordType.SUBSTRING, set()),
'temp_lemma': temp_banwords.get(BanWordType.LEMMA, set()),
'whitelist': whitelist,
'admins': admins
}
async def get_top_words(self, limit: int = 10) -> List[Dict[str, Any]]:
"""
Получает топ N самых часто срабатывающих слов.
Args:
limit: Количество слов в топе
Returns:
List[Dict]: Список словарей с данными:
- word: слово
- count: количество срабатываний
- type: тип проверки
"""
async with self.db.get_session() as session:
try:
# Группируем по matched_word и считаем количество
query = select(
SpamLog.matched_word,
SpamLog.match_type,
func.count(SpamLog.id).label('count')
).where(
SpamLog.matched_word.isnot(None)
).group_by(
SpamLog.matched_word,
SpamLog.match_type
).order_by(
desc('count')
).limit(limit)
result = await session.execute(query)
rows = result.all()
# Форматируем результат
top_words = []
for row in rows:
top_words.append({
'word': row.matched_word,
'type': row.match_type,
'count': row.count
})
logger.debug(
f"Получен топ-{limit} слов: {len(top_words)} записей",
log_type="DATABASE"
)
return top_words
except Exception as e:
logger.error(
f"Ошибка получения топ-слов: {e}",
log_type="DATABASE"
)
return []
async def cleanup_expired_temp_words(self) -> int:
"""
Удаляет истёкшие временные банворды.
Returns:
int: Количество удалённых слов
"""
async with self.db.get_session() as session:
try:
now = datetime.now(timezone.utc)
# Ищем истёкшие временные слова
query = select(TempBanWord).where(
TempBanWord.expires_at < now
)
result = await session.execute(query)
expired_words = result.scalars().all()
if not expired_words:
return 0
# Собираем информацию для логирования
expired_info = []
for word in expired_words:
expired_info.append({
'word': word.word,
'type': word.word_type.value,
'expires_at': word.expires_at
})
await session.delete(word)
# Сохраняем изменения
await session.commit()
# Обновляем кеш
await self.refresh_cache()
# Логируем подробности
logger.info(
f"Удалено {len(expired_words)} истёкших временных банвордов",
log_type="DATABASE"
)
for info in expired_info:
logger.debug(
f" └─ {info['type']}: '{info['word']}' (истёк: {info['expires_at']})",
log_type="DATABASE"
)
return len(expired_words)
except Exception as e:
logger.error(
f"Ошибка удаления истёкших временных слов: {e}",
log_type="DATABASE"
)
await session.rollback()
return 0
async def get_total_spam_count(self) -> int:
"""
Получает общее количество удалённых сообщений.
Returns:
int: Количество записей в SpamLog
"""
async with self.db.get_session() as session:
try:
query = select(func.count(SpamLog.id))
result = await session.execute(query)
count = result.scalar_one()
return count
except Exception as e:
logger.error(
f"Ошибка подсчёта спам-логов: {e}",
log_type="DATABASE"
)
return 0
async def reset_spam_stats(self) -> bool:
"""
Очищает всю статистику спама.
Returns:
bool: True если успешно
"""
async with self.db.get_session() as session:
try:
# Удаляем все записи
await session.execute(delete(SpamLog))
await session.commit()
logger.info("Статистика спама сброшена", log_type="DATABASE")
return True
except Exception as e:
logger.error(
f"Ошибка сброса статистики: {e}",
log_type="DATABASE"
)
await session.rollback()
return False
# Глобальный экземпляр менеджера
_manager_instance: Optional[BanWordsManager] = None
def get_manager() -> BanWordsManager:
"""
Возвращает глобальный экземпляр BanWordsManager (Singleton).
Returns:
BanWordsManager: Менеджер банвордов
"""
global _manager_instance
if _manager_instance is None:
_manager_instance = BanWordsManager()
return _manager_instance