Files
PrimoGuardBot/database/manager.py

902 lines
32 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Высокоуровневый менеджер для работы с банвордами.
Упрощает использование repository в handlers и middleware.
"""
from typing import Set, Optional, List, Dict, Any
from datetime import datetime, timezone
from middleware.loggers import logger
from .database import Database, get_db
from .repository import BanWordsRepository
from .models import BanWordType, SpamStat, SpamLog, TempBanWord, AutoComment
from sqlalchemy import select, delete, func, desc
__all__ = ("BanWordsManager", "get_manager")
class BanWordsManager:
"""
Менеджер для удобной работы с банвордами.
Предоставляет упрощённый API для handlers и middleware.
Attributes:
db: Экземпляр Database
repo: Repository для CRUD операций
"""
def __init__(self, db: Optional[Database] = None):
"""
Args:
db: Экземпляр Database (если None, берётся глобальный)
"""
self.db = db or get_db()
self.repo = BanWordsRepository(self.db)
# Кэш для часто используемых данных
self._cache_banwords: Optional[dict] = None
self._cache_whitelist: Optional[Set[str]] = None
self._cache_admins: Optional[Set[int]] = None
self._cache_updated_at: Optional[datetime] = None
async def init(self) -> None:
"""Инициализирует базу данных и загружает кэш"""
await self.db.init()
await self.init_default_bot_settings() # ← добавлено
await self.init_default_words()
await self.refresh_cache()
logger.info("BanWordsManager инициализирован", log_type="DATABASE")
async def close(self) -> None:
"""Закрывает соединение с БД"""
await self.db.close()
# === CACHE MANAGEMENT ===
async def refresh_cache(self) -> None:
"""Обновляет кэш из БД"""
try:
self._cache_banwords = await self.repo.get_all_banwords()
temp_banwords = await self.repo.get_all_temp_banwords()
# Объединяем постоянные и временные банворды
for word_type, words in temp_banwords.items():
if word_type in self._cache_banwords:
self._cache_banwords[word_type] |= words
self._cache_whitelist = await self.repo.get_whitelist()
self._cache_admins = await self.repo.get_admins()
self._cache_updated_at = datetime.now()
logger.debug("Кэш банвордов обновлён", log_type="DATABASE")
except Exception as e:
logger.error(f"Ошибка обновления кэша: {e}", log_type="DATABASE")
def invalidate_cache(self) -> None:
"""Сбрасывает кэш (требует refresh_cache)"""
self._cache_banwords = None
self._cache_whitelist = None
self._cache_admins = None
self._cache_updated_at = None
# === BANWORDS (с кэшем) ===
async def add_banword(
self,
word: str,
word_type: BanWordType,
added_by: Optional[int] = None,
reason: Optional[str] = None,
refresh_cache: bool = True
) -> bool:
"""
Добавляет банворд и обновляет кэш.
Args:
word: Слово
word_type: Тип
added_by: ID админа
reason: Причина
refresh_cache: Обновить кэш после добавления
Returns:
bool: True если добавлен
"""
result = await self.repo.add_banword(word, word_type, added_by, reason)
if result and refresh_cache:
await self.refresh_cache()
return result
async def remove_banword(
self,
word: str,
word_type: BanWordType,
refresh_cache: bool = True
) -> bool:
"""Удаляет банворд и обновляет кэш"""
result = await self.repo.remove_banword(word, word_type)
if result and refresh_cache:
await self.refresh_cache()
return result
def get_banwords_cached(self, word_type: BanWordType) -> Set[str]:
"""
Получает банворды из кэша (быстро).
Args:
word_type: Тип банвордов
Returns:
Set[str]: Набор слов из кэша
"""
if self._cache_banwords is None:
logger.warning("Кэш не инициализирован", log_type="DATABASE")
return set()
return self._cache_banwords.get(word_type, set())
async def get_banwords(self, word_type: BanWordType) -> Set[str]:
"""Получает банворды напрямую из БД (без кэша)"""
return await self.repo.get_banwords(word_type)
# === TEMPORARY BANWORDS ===
async def add_temp_banword(
self,
word: str,
word_type: BanWordType,
minutes: int,
added_by: Optional[int] = None,
refresh_cache: bool = True
) -> bool:
"""Добавляет временный банворд"""
result = await self.repo.add_temp_banword(
word, word_type, minutes, added_by
)
if result and refresh_cache:
await self.refresh_cache()
return result
async def remove_temp_banword(
self,
word: str,
word_type: BanWordType,
refresh_cache: bool = True
) -> bool:
"""Удаляет временный банворд"""
result = await self.repo.remove_temp_banword(word, word_type)
if result and refresh_cache:
await self.refresh_cache()
return result
async def cleanup_expired(self) -> int:
"""
Очищает истёкшие временные банворды.
Вызывается периодически (например, раз в минуту).
Returns:
int: Количество удалённых записей
"""
deleted = await self.repo.cleanup_expired_temp_banwords()
if deleted > 0:
await self.refresh_cache()
return deleted
# === WHITELIST ===
async def add_whitelist(
self,
word: str,
added_by: Optional[int] = None,
reason: Optional[str] = None,
refresh_cache: bool = True
) -> bool:
"""Добавляет слово в белый список"""
result = await self.repo.add_whitelist(word, added_by, reason)
if result and refresh_cache:
await self.refresh_cache()
return result
async def remove_whitelist(
self,
word: str,
refresh_cache: bool = True
) -> bool:
"""Удаляет слово из белого списка"""
result = await self.repo.remove_whitelist(word)
if result and refresh_cache:
await self.refresh_cache()
return result
def get_whitelist_cached(self) -> Set[str]:
"""Получает белый список из кэша"""
if self._cache_whitelist is None:
logger.warning("Кэш whitelist не инициализирован", log_type="DATABASE")
return set()
return self._cache_whitelist
def is_whitelisted(self, text: str) -> bool:
"""
Проверяет, содержит ли текст слово из белого списка.
Args:
text: Текст для проверки (lowercase)
Returns:
bool: True если найдено исключение
"""
whitelist = self.get_whitelist_cached()
return any(word in text for word in whitelist)
# === ADMINS ===
async def add_admin(
self,
user_id: int,
added_by: Optional[int] = None,
refresh_cache: bool = True
) -> bool:
"""Добавляет администратора"""
result = await self.repo.add_admin(user_id, added_by)
if result and refresh_cache:
await self.refresh_cache()
return result
async def remove_admin(
self,
user_id: int,
refresh_cache: bool = True
) -> bool:
"""Удаляет администратора"""
result = await self.repo.remove_admin(user_id)
if result and refresh_cache:
await self.refresh_cache()
return result
def get_admins_cached(self) -> Set[int]:
"""Получает список админов из кэша"""
if self._cache_admins is None:
logger.warning("Кэш админов не инициализирован", log_type="DATABASE")
return set()
return self._cache_admins
def is_admin_cached(self, user_id: int) -> bool:
"""
Проверяет, является ли пользователь админом (из кэша).
Args:
user_id: Telegram ID
Returns:
bool: True если админ
"""
return user_id in self.get_admins_cached()
async def is_admin(self, user_id: int) -> bool:
"""Проверяет админа напрямую из БД"""
return await self.repo.is_admin(user_id)
# === SETTINGS (режимы silence/conflict) ===
async def set_silence_mode(self, minutes: int) -> datetime:
"""
Включает режим тишины на указанное время.
Args:
minutes: Длительность в минутах
Returns:
datetime: Время окончания режима
"""
expires_at = datetime.now().timestamp() + (minutes * 60)
await self.repo.set_setting("silence_until", str(expires_at))
logger.info(
f"Режим тишины активирован на {minutes} мин",
log_type="SILENCE"
)
return datetime.fromtimestamp(expires_at)
async def disable_silence_mode(self) -> None:
"""Отключает режим тишины"""
await self.repo.delete_setting("silence_until")
logger.info("Режим тишины отключён", log_type="SILENCE")
async def is_silence_active(self) -> bool:
"""Проверяет, активен ли режим тишины"""
silence_until_str = await self.repo.get_setting("silence_until")
if not silence_until_str:
return False
try:
silence_until = float(silence_until_str)
now = datetime.now().timestamp()
if now >= silence_until:
await self.disable_silence_mode()
return False
return True
except (ValueError, TypeError):
return False
async def set_conflict_mode(self, minutes: int) -> datetime:
"""
Включает режим антиконфликта на указанное время.
Args:
minutes: Длительность в минутах
Returns:
datetime: Время окончания режима
"""
expires_at = datetime.now().timestamp() + (minutes * 60)
await self.repo.set_setting("conflict_until", str(expires_at))
logger.info(
f"Режим антиконфликта активирован на {minutes} мин",
log_type="CONFLICT"
)
return datetime.fromtimestamp(expires_at)
async def disable_conflict_mode(self) -> None:
"""Отключает режим антиконфликта"""
await self.repo.delete_setting("conflict_until")
logger.info("Режим антиконфликта отключён", log_type="CONFLICT")
async def is_conflict_active(self) -> bool:
"""Проверяет, активен ли режим антиконфликта"""
conflict_until_str = await self.repo.get_setting("conflict_until")
if not conflict_until_str:
return False
try:
conflict_until = float(conflict_until_str)
now = datetime.now().timestamp()
if now >= conflict_until:
await self.disable_conflict_mode()
return False
return True
except (ValueError, TypeError):
return False
# === STATISTICS ===
async def log_spam(
self,
user_id: int,
username: str,
chat_id: int,
message_text: str,
matched_word: str,
match_type: str
) -> None:
"""Логирует удаление спам-сообщения"""
await self.repo.log_spam_deletion(
user_id=user_id,
username=username,
chat_id=chat_id,
message_text=message_text,
matched_word=matched_word,
match_type=match_type
)
async def get_spam_stats(
self,
limit: int = 100,
user_id: Optional[int] = None
) -> List[SpamStat]:
"""Получает статистику удалений"""
return await self.repo.get_spam_stats(limit, user_id)
async def get_user_spam_count(self, user_id: int) -> int:
"""Получает количество удалённых сообщений пользователя"""
return await self.repo.get_user_spam_count(user_id)
async def get_top_spammers(self, limit: int = 10) -> List[tuple[int, int]]:
"""Получает топ спамеров"""
return await self.repo.get_top_spammers(limit)
# === INFO ===
async def get_stats(self) -> dict:
"""Получает общую статистику"""
db_stats = await self.repo.get_stats()
cache_info = {
'cache_active': self._cache_banwords is not None,
'cache_updated_at': self._cache_updated_at.isoformat() if self._cache_updated_at else None
}
return {**db_stats, **cache_info}
async def get_all_words_list(self) -> dict:
"""
Получает все слова для команды /listwords.
Returns:
dict: Словарь со всеми категориями слов
"""
banwords = await self.repo.get_all_banwords()
temp_banwords = await self.repo.get_all_temp_banwords()
whitelist = await self.repo.get_whitelist()
admins = await self.repo.get_admins()
return {
'word': banwords.get(BanWordType.WORD, set()),
'lemma': banwords.get(BanWordType.LEMMA, set()),
'part': banwords.get(BanWordType.PART, set()),
'conflict_word': banwords.get(BanWordType.CONFLICT_WORD, set()),
'conflict_lemma': banwords.get(BanWordType.CONFLICT_LEMMA, set()),
'conflict_part': banwords.get(BanWordType.CONFLICT_PART, set()),
'temp_word': temp_banwords.get(BanWordType.WORD, set()),
'temp_lemma': temp_banwords.get(BanWordType.LEMMA, set()),
'whitelist': whitelist,
'admins': admins
}
async def get_top_words(self, limit: int = 10) -> List[Dict[str, Any]]:
"""
Получает топ N самых часто срабатывающих слов.
Args:
limit: Количество слов в топе
Returns:
List[Dict]: Список словарей с данными:
- word: слово
- count: количество срабатываний
- type: тип проверки
"""
async with self.db.get_session() as session:
try:
query = select(
SpamLog.matched_word,
SpamLog.match_type,
func.count(SpamLog.id).label('count')
).where(
SpamLog.matched_word.isnot(None)
).group_by(
SpamLog.matched_word,
SpamLog.match_type
).order_by(
desc('count')
).limit(limit)
result = await session.execute(query)
rows = result.all()
top_words = []
for row in rows:
top_words.append({
'word': row.matched_word,
'type': row.match_type,
'count': row.count
})
logger.debug(
f"Получен топ-{limit} слов: {len(top_words)} записей",
log_type="DATABASE"
)
return top_words
except Exception as e:
logger.error(
f"Ошибка получения топ-слов: {e}",
log_type="DATABASE"
)
return []
async def init_default_words(self) -> None:
"""
Добавляет базовые банворды и whitelist при первом запуске.
Ничего не перезаписывает, если уже существует.
"""
try:
from configs import settings
# --- Базовые слова ---
default_word = {"http", "t.me/"}
default_part = {"bot"}
default_lemma = {"скам", "мошенник"}
# Проверяем уже существующие
existing = await self.repo.get_all_banwords()
# word
for word in default_word:
if word not in existing.get(BanWordType.WORD, set()):
await self.repo.add_banword(word, BanWordType.WORD)
# PART
for word in default_part:
if word not in existing.get(BanWordType.PART, set()):
await self.repo.add_banword(word, BanWordType.PART)
# LEMMA
for word in default_lemma:
if word not in existing.get(BanWordType.LEMMA, set()):
await self.repo.add_banword(word, BanWordType.LEMMA)
# --- Добавляем username бота в whitelist ---
bot_username = settings.BOT_USERNAME
if bot_username:
whitelist = await self.repo.get_whitelist()
if bot_username.lower() not in whitelist:
await self.repo.add_whitelist(bot_username.lower())
logger.info("Базовые слова и whitelist инициализированы", log_type="DATABASE")
except Exception as e:
logger.error(f"Ошибка инициализации базовых слов: {e}", log_type="DATABASE")
async def cleanup_expired_temp_words(self) -> int:
"""
Удаляет истёкшие временные банворды.
Returns:
int: Количество удалённых слов
"""
async with self.db.get_session() as session:
try:
now = datetime.now(timezone.utc)
query = select(TempBanWord).where(
TempBanWord.expires_at < now
)
result = await session.execute(query)
expired_words = result.scalars().all()
if not expired_words:
return 0
expired_info = []
for word in expired_words:
expired_info.append({
'word': word.word,
'type': word.type.value, # ← ИСПРАВЛЕНО: было word.word_type.value
'expires_at': word.expires_at
})
await session.delete(word)
await session.commit()
await self.refresh_cache()
logger.info(
f"Удалено {len(expired_words)} истёкших временных банвордов",
log_type="DATABASE"
)
for info in expired_info:
logger.debug(
f" └─ {info['type']}: '{info['word']}' (истёк: {info['expires_at']})",
log_type="DATABASE"
)
return len(expired_words)
except Exception as e:
logger.error(
f"Ошибка удаления истёкших временных слов: {e}",
log_type="DATABASE"
)
await session.rollback()
return 0
async def get_total_spam_count(self) -> int:
"""
Получает общее количество удалённых сообщений.
Returns:
int: Количество записей в SpamLog
"""
async with self.db.get_session() as session:
try:
query = select(func.count(SpamLog.id))
result = await session.execute(query)
count = result.scalar_one()
return count
except Exception as e:
logger.error(
f"Ошибка подсчёта спам-логов: {e}",
log_type="DATABASE"
)
return 0
async def reset_spam_stats(self) -> bool:
"""
Очищает всю статистику спама.
Returns:
bool: True если успешно
"""
async with self.db.get_session() as session:
try:
await session.execute(delete(SpamLog))
await session.commit()
logger.info("Статистика спама сброшена", log_type="DATABASE")
return True
except Exception as e:
logger.error(
f"Ошибка сброса статистики: {e}",
log_type="DATABASE"
)
await session.rollback()
return False
# === AUTO COMMENTS ===
async def get_auto_comment_settings(self, channel_id: int) -> dict:
"""
Получает настройки автокомментариев для канала.
ВАЖНО: возвращает сохранённые поля даже когда is_enabled=False,
чтобы UI/preview показывали реальную конфигурацию.
"""
from configs import settings
auto_comment = await self.repo.get_auto_comment(channel_id)
defaults = {
'text': settings.AUTO_COMMENT_TEXT,
'button_text': settings.AUTO_COMMENT_BUTTON_TEXT,
'button_url': settings.AUTO_COMMENT_BUTTON_URL,
'photo_url': settings.AUTO_COMMENT_PHOTO_URL,
'is_enabled': False,
}
if not auto_comment:
return defaults
return {
'text': auto_comment.text if auto_comment.text is not None else defaults['text'],
'button_text': auto_comment.button_text if auto_comment.button_text is not None else defaults['button_text'],
'button_url': auto_comment.button_url if auto_comment.button_url is not None else defaults['button_url'],
'photo_url': auto_comment.photo_url if auto_comment.photo_url is not None else defaults['photo_url'],
'is_enabled': bool(auto_comment.is_enabled),
}
async def save_auto_comment_settings(
self,
channel_id: int,
text: str,
button_text: str,
button_url: str,
photo_url: str,
updated_by: Optional[int] = None
) -> bool:
"""Сохраняет настройки автокомментариев"""
return await self.repo.set_auto_comment(
channel_id=channel_id,
text=text,
button_text=button_text,
button_url=button_url,
photo_url=photo_url,
updated_by=updated_by,
is_enabled=True
)
async def update_auto_comment_text(
self,
channel_id: int,
text: str,
updated_by: Optional[int] = None
) -> bool:
"""Обновляет текст автокомментария"""
return await self.repo.update_auto_comment_field(
channel_id, 'text', text, updated_by
)
async def update_auto_comment_button(
self,
channel_id: int,
button_text: str,
button_url: str,
updated_by: Optional[int] = None
) -> bool:
"""Обновляет кнопку автокомментария"""
success_text = await self.repo.update_auto_comment_field(
channel_id, 'button_text', button_text, updated_by
)
success_url = await self.repo.update_auto_comment_field(
channel_id, 'button_url', button_url, updated_by
)
return success_text and success_url
async def update_auto_comment_photo(
self,
channel_id: int,
photo_url: str,
updated_by: Optional[int] = None
) -> bool:
"""Обновляет фото автокомментария"""
return await self.repo.update_auto_comment_field(
channel_id, 'photo_url', photo_url, updated_by
)
async def log_report(
self,
report_id: str,
reporter_id: int,
reporter_username: Optional[str],
reported_user_id: int,
reported_username: Optional[str],
chat_id: int,
chat_title: Optional[str],
message_id: int,
message_thread_id: Optional[int],
message_text: Optional[str],
reason: str
) -> bool:
"""Логирует репорт в БД"""
return await self.repo.log_report(
report_id=report_id,
reporter_id=reporter_id,
reporter_username=reporter_username,
reported_user_id=reported_user_id,
reported_username=reported_username,
chat_id=chat_id,
chat_title=chat_title,
message_id=message_id,
message_thread_id=message_thread_id,
message_text=message_text,
reason=reason
)
# ==================== ✅ КАНАЛЫ АВТОКОММЕНТАРИЕВ ====================
async def add_auto_comment_channel(self, channel_id: int, added_by: int) -> bool:
"""✅ Добавляет новый канал в БД"""
async with self.db.get_session() as session:
try:
new_channel = AutoComment(
channel_id=channel_id,
text="",
button_text="",
button_url="",
photo_url="",
is_enabled=False,
updated_by=added_by,
)
session.add(new_channel)
await session.commit()
await session.refresh(new_channel)
logger.info(f"✅ Канал добавлен: {channel_id}", log_type="CHANNEL")
return True
except Exception as e:
await session.rollback()
logger.error(f"Ошибка добавления канала {channel_id}: {e}", log_type="CHANNEL")
return False
async def get_auto_comment_channels(self) -> List[int]:
"""✅ Возвращает все channel_id из БД"""
async with self.db.get_session() as session:
result = await session.execute(select(AutoComment.channel_id).distinct())
return [row[0] for row in result.fetchall()]
async def delete_auto_comment(self, channel_id: int) -> bool:
"""✅ Удаляет настройки канала"""
async with self.db.get_session() as session:
try:
result = await session.execute(
delete(AutoComment).where(AutoComment.channel_id == channel_id)
)
if result.rowcount > 0:
await session.commit()
logger.info(f"✅ Канал удален: {channel_id}", log_type="CHANNEL")
return True
await session.rollback()
return False
except Exception as e:
await session.rollback()
logger.error(f"Ошибка удаления канала {channel_id}: {e}", log_type="CHANNEL")
return False
# ==================== BOT SETTINGS (замена .env) ====================
async def get_bot_settings(self) -> dict:
"""Получает все настройки бота из БД"""
settings = {
'admin_chat_id': await self.repo.get_setting("admin_chat_id"),
'admin_thread_id': await self.repo.get_setting("admin_thread_id"),
'report_chat_id': await self.repo.get_setting("report_chat_id"),
'report_thread_id': await self.repo.get_setting("report_thread_id"),
}
return {k: v for k, v in settings.items() if v is not None}
async def set_bot_setting(self, key: str, value: Optional[str]) -> bool:
"""
Сохраняет настройку бота в БД
Args:
key: admin_chat_id, admin_thread_id, report_chat_id, report_thread_id
value: str или None/null
Returns:
bool: True если сохранено
"""
if value is None:
return await self.repo.delete_setting(key)
else:
return await self.repo.set_setting(key, value)
async def get_bot_setting(self, key: str) -> Optional[str]:
"""Получает ОДНУ настройку бота"""
settings = await self.get_bot_settings()
return settings.get(key)
async def init_default_bot_settings(self) -> None:
"""Инициализирует настройки по умолчанию из .env, только если они ещё не установлены"""
try:
from configs import settings
defaults = {
"admin_chat_id": getattr(settings, 'ADMIN_CHAT_ID', None),
"admin_thread_id": str(getattr(settings, 'ADMIN_THREAD_ID', None)) if getattr(settings, 'ADMIN_THREAD_ID', None) else None,
"report_chat_id": getattr(settings, 'REPORT_CHAT_ID', None),
"report_thread_id": str(getattr(settings, 'REPORT_THREAD_ID', None)) if getattr(settings, 'REPORT_THREAD_ID', None) else None,
}
for key, value in defaults.items():
if value is not None: # В .env значение задано
existing = await self.get_bot_setting(key)
if existing is None:
await self.set_bot_setting(key, str(value))
logger.debug(f"Установлена настройка {key} из .env", log_type="SETTINGS")
else:
logger.debug(f"Настройка {key} уже существует ({existing}), пропускаем", log_type="SETTINGS")
logger.info("✅ Настройки бота инициализированы из .env (существующие сохранены)", log_type="SETTINGS")
except Exception as e:
logger.warning(f"Не удалось инициализировать настройки из .env: {e}", log_type="SETTINGS")
# Глобальный экземпляр менеджера
_manager_instance: Optional[BanWordsManager] = None
def get_manager() -> BanWordsManager:
"""
Возвращает глобальный экземпляр BanWordsManager (Singleton).
Returns:
BanWordsManager: Менеджер банвордов
"""
global _manager_instance
if _manager_instance is None:
_manager_instance = BanWordsManager()
return _manager_instance