Менеджер базы данныхц

This commit is contained in:
2026-02-23 14:17:24 +07:00
parent cc953e51d2
commit ec22068bb4

855
database/manager.py Normal file
View File

@@ -0,0 +1,855 @@
"""
Высокоуровневый менеджер для работы с банвордами.
Упрощает использование repository в handlers и middleware.
"""
from typing import Set, Optional, List, Dict, Any
from datetime import datetime, timezone
from middleware.loggers import logger
from .database import Database, get_db
from .repository import BanWordsRepository
from .models import BanWordType, SpamStat, SpamLog, TempBanWord, AutoComment
from sqlalchemy import select, delete, func, desc
__all__ = ("BanWordsManager", "get_manager")
class BanWordsManager:
"""
Менеджер для удобной работы с банвордами.
Предоставляет упрощённый API для handlers и middleware.
Attributes:
db: Экземпляр Database
repo: Repository для CRUD операций
"""
def __init__(self, db: Optional[Database] = None):
"""
Args:
db: Экземпляр Database (если None, берётся глобальный)
"""
self.db = db or get_db()
self.repo = BanWordsRepository(self.db)
# Кэш для часто используемых данных
self._cache_banwords: Optional[dict] = None
self._cache_whitelist: Optional[Set[str]] = None
self._cache_admins: Optional[Set[int]] = None
self._cache_updated_at: Optional[datetime] = None
async def init(self) -> None:
"""Инициализирует базу данных и загружает кэш"""
await self.db.init()
await self.init_default_bot_settings() # ← добавлено
await self.refresh_cache()
logger.info("BanWordsManager инициализирован", log_type="DATABASE")
async def close(self) -> None:
"""Закрывает соединение с БД"""
await self.db.close()
# === CACHE MANAGEMENT ===
async def refresh_cache(self) -> None:
"""Обновляет кэш из БД"""
try:
self._cache_banwords = await self.repo.get_all_banwords()
temp_banwords = await self.repo.get_all_temp_banwords()
# Объединяем постоянные и временные банворды
for word_type, words in temp_banwords.items():
if word_type in self._cache_banwords:
self._cache_banwords[word_type] |= words
self._cache_whitelist = await self.repo.get_whitelist()
self._cache_admins = await self.repo.get_admins()
self._cache_updated_at = datetime.now()
logger.debug("Кэш банвордов обновлён", log_type="DATABASE")
except Exception as e:
logger.error(f"Ошибка обновления кэша: {e}", log_type="DATABASE")
def invalidate_cache(self) -> None:
"""Сбрасывает кэш (требует refresh_cache)"""
self._cache_banwords = None
self._cache_whitelist = None
self._cache_admins = None
self._cache_updated_at = None
# === BANWORDS (с кэшем) ===
async def add_banword(
self,
word: str,
word_type: BanWordType,
added_by: Optional[int] = None,
reason: Optional[str] = None,
refresh_cache: bool = True
) -> bool:
"""
Добавляет банворд и обновляет кэш.
Args:
word: Слово
word_type: Тип
added_by: ID админа
reason: Причина
refresh_cache: Обновить кэш после добавления
Returns:
bool: True если добавлен
"""
result = await self.repo.add_banword(word, word_type, added_by, reason)
if result and refresh_cache:
await self.refresh_cache()
return result
async def remove_banword(
self,
word: str,
word_type: BanWordType,
refresh_cache: bool = True
) -> bool:
"""Удаляет банворд и обновляет кэш"""
result = await self.repo.remove_banword(word, word_type)
if result and refresh_cache:
await self.refresh_cache()
return result
def get_banwords_cached(self, word_type: BanWordType) -> Set[str]:
"""
Получает банворды из кэша (быстро).
Args:
word_type: Тип банвордов
Returns:
Set[str]: Набор слов из кэша
"""
if self._cache_banwords is None:
logger.warning("Кэш не инициализирован", log_type="DATABASE")
return set()
return self._cache_banwords.get(word_type, set())
async def get_banwords(self, word_type: BanWordType) -> Set[str]:
"""Получает банворды напрямую из БД (без кэша)"""
return await self.repo.get_banwords(word_type)
# === TEMPORARY BANWORDS ===
async def add_temp_banword(
self,
word: str,
word_type: BanWordType,
minutes: int,
added_by: Optional[int] = None,
refresh_cache: bool = True
) -> bool:
"""Добавляет временный банворд"""
result = await self.repo.add_temp_banword(
word, word_type, minutes, added_by
)
if result and refresh_cache:
await self.refresh_cache()
return result
async def remove_temp_banword(
self,
word: str,
word_type: BanWordType,
refresh_cache: bool = True
) -> bool:
"""Удаляет временный банворд"""
result = await self.repo.remove_temp_banword(word, word_type)
if result and refresh_cache:
await self.refresh_cache()
return result
async def cleanup_expired(self) -> int:
"""
Очищает истёкшие временные банворды.
Вызывается периодически (например, раз в минуту).
Returns:
int: Количество удалённых записей
"""
deleted = await self.repo.cleanup_expired_temp_banwords()
if deleted > 0:
await self.refresh_cache()
return deleted
# === WHITELIST ===
async def add_whitelist(
self,
word: str,
added_by: Optional[int] = None,
reason: Optional[str] = None,
refresh_cache: bool = True
) -> bool:
"""Добавляет слово в белый список"""
result = await self.repo.add_whitelist(word, added_by, reason)
if result and refresh_cache:
await self.refresh_cache()
return result
async def remove_whitelist(
self,
word: str,
refresh_cache: bool = True
) -> bool:
"""Удаляет слово из белого списка"""
result = await self.repo.remove_whitelist(word)
if result and refresh_cache:
await self.refresh_cache()
return result
def get_whitelist_cached(self) -> Set[str]:
"""Получает белый список из кэша"""
if self._cache_whitelist is None:
logger.warning("Кэш whitelist не инициализирован", log_type="DATABASE")
return set()
return self._cache_whitelist
def is_whitelisted(self, text: str) -> bool:
"""
Проверяет, содержит ли текст слово из белого списка.
Args:
text: Текст для проверки (lowercase)
Returns:
bool: True если найдено исключение
"""
whitelist = self.get_whitelist_cached()
return any(word in text for word in whitelist)
# === ADMINS ===
async def add_admin(
self,
user_id: int,
added_by: Optional[int] = None,
refresh_cache: bool = True
) -> bool:
"""Добавляет администратора"""
result = await self.repo.add_admin(user_id, added_by)
if result and refresh_cache:
await self.refresh_cache()
return result
async def remove_admin(
self,
user_id: int,
refresh_cache: bool = True
) -> bool:
"""Удаляет администратора"""
result = await self.repo.remove_admin(user_id)
if result and refresh_cache:
await self.refresh_cache()
return result
def get_admins_cached(self) -> Set[int]:
"""Получает список админов из кэша"""
if self._cache_admins is None:
logger.warning("Кэш админов не инициализирован", log_type="DATABASE")
return set()
return self._cache_admins
def is_admin_cached(self, user_id: int) -> bool:
"""
Проверяет, является ли пользователь админом (из кэша).
Args:
user_id: Telegram ID
Returns:
bool: True если админ
"""
return user_id in self.get_admins_cached()
async def is_admin(self, user_id: int) -> bool:
"""Проверяет админа напрямую из БД"""
return await self.repo.is_admin(user_id)
# === SETTINGS (режимы silence/conflict) ===
async def set_silence_mode(self, minutes: int) -> datetime:
"""
Включает режим тишины на указанное время.
Args:
minutes: Длительность в минутах
Returns:
datetime: Время окончания режима
"""
expires_at = datetime.now().timestamp() + (minutes * 60)
await self.repo.set_setting("silence_until", str(expires_at))
logger.info(
f"Режим тишины активирован на {minutes} мин",
log_type="SILENCE"
)
return datetime.fromtimestamp(expires_at)
async def disable_silence_mode(self) -> None:
"""Отключает режим тишины"""
await self.repo.delete_setting("silence_until")
logger.info("Режим тишины отключён", log_type="SILENCE")
async def is_silence_active(self) -> bool:
"""Проверяет, активен ли режим тишины"""
silence_until_str = await self.repo.get_setting("silence_until")
if not silence_until_str:
return False
try:
silence_until = float(silence_until_str)
now = datetime.now().timestamp()
if now >= silence_until:
await self.disable_silence_mode()
return False
return True
except (ValueError, TypeError):
return False
async def set_conflict_mode(self, minutes: int) -> datetime:
"""
Включает режим антиконфликта на указанное время.
Args:
minutes: Длительность в минутах
Returns:
datetime: Время окончания режима
"""
expires_at = datetime.now().timestamp() + (minutes * 60)
await self.repo.set_setting("conflict_until", str(expires_at))
logger.info(
f"Режим антиконфликта активирован на {minutes} мин",
log_type="CONFLICT"
)
return datetime.fromtimestamp(expires_at)
async def disable_conflict_mode(self) -> None:
"""Отключает режим антиконфликта"""
await self.repo.delete_setting("conflict_until")
logger.info("Режим антиконфликта отключён", log_type="CONFLICT")
async def is_conflict_active(self) -> bool:
"""Проверяет, активен ли режим антиконфликта"""
conflict_until_str = await self.repo.get_setting("conflict_until")
if not conflict_until_str:
return False
try:
conflict_until = float(conflict_until_str)
now = datetime.now().timestamp()
if now >= conflict_until:
await self.disable_conflict_mode()
return False
return True
except (ValueError, TypeError):
return False
# === STATISTICS ===
async def log_spam(
self,
user_id: int,
username: str,
chat_id: int,
message_text: str,
matched_word: str,
match_type: str
) -> None:
"""Логирует удаление спам-сообщения"""
await self.repo.log_spam_deletion(
user_id=user_id,
username=username,
chat_id=chat_id,
message_text=message_text,
matched_word=matched_word,
match_type=match_type
)
async def get_spam_stats(
self,
limit: int = 100,
user_id: Optional[int] = None
) -> List[SpamStat]:
"""Получает статистику удалений"""
return await self.repo.get_spam_stats(limit, user_id)
async def get_user_spam_count(self, user_id: int) -> int:
"""Получает количество удалённых сообщений пользователя"""
return await self.repo.get_user_spam_count(user_id)
async def get_top_spammers(self, limit: int = 10) -> List[tuple[int, int]]:
"""Получает топ спамеров"""
return await self.repo.get_top_spammers(limit)
# === INFO ===
async def get_stats(self) -> dict:
"""Получает общую статистику"""
db_stats = await self.repo.get_stats()
cache_info = {
'cache_active': self._cache_banwords is not None,
'cache_updated_at': self._cache_updated_at.isoformat() if self._cache_updated_at else None
}
return {**db_stats, **cache_info}
async def get_all_words_list(self) -> dict:
"""
Получает все слова для команды /listwords.
Returns:
dict: Словарь со всеми категориями слов
"""
banwords = await self.repo.get_all_banwords()
temp_banwords = await self.repo.get_all_temp_banwords()
whitelist = await self.repo.get_whitelist()
admins = await self.repo.get_admins()
return {
'substring': banwords.get(BanWordType.SUBSTRING, set()),
'lemma': banwords.get(BanWordType.LEMMA, set()),
'part': banwords.get(BanWordType.PART, set()),
'conflict_substring': banwords.get(BanWordType.CONFLICT_SUBSTRING, set()),
'conflict_lemma': banwords.get(BanWordType.CONFLICT_LEMMA, set()),
'temp_substring': temp_banwords.get(BanWordType.SUBSTRING, set()),
'temp_lemma': temp_banwords.get(BanWordType.LEMMA, set()),
'whitelist': whitelist,
'admins': admins
}
async def get_top_words(self, limit: int = 10) -> List[Dict[str, Any]]:
"""
Получает топ N самых часто срабатывающих слов.
Args:
limit: Количество слов в топе
Returns:
List[Dict]: Список словарей с данными:
- word: слово
- count: количество срабатываний
- type: тип проверки
"""
async with self.db.get_session() as session:
try:
query = select(
SpamLog.matched_word,
SpamLog.match_type,
func.count(SpamLog.id).label('count')
).where(
SpamLog.matched_word.isnot(None)
).group_by(
SpamLog.matched_word,
SpamLog.match_type
).order_by(
desc('count')
).limit(limit)
result = await session.execute(query)
rows = result.all()
top_words = []
for row in rows:
top_words.append({
'word': row.matched_word,
'type': row.match_type,
'count': row.count
})
logger.debug(
f"Получен топ-{limit} слов: {len(top_words)} записей",
log_type="DATABASE"
)
return top_words
except Exception as e:
logger.error(
f"Ошибка получения топ-слов: {e}",
log_type="DATABASE"
)
return []
async def cleanup_expired_temp_words(self) -> int:
"""
Удаляет истёкшие временные банворды.
Returns:
int: Количество удалённых слов
"""
async with self.db.get_session() as session:
try:
now = datetime.now(timezone.utc)
query = select(TempBanWord).where(
TempBanWord.expires_at < now
)
result = await session.execute(query)
expired_words = result.scalars().all()
if not expired_words:
return 0
expired_info = []
for word in expired_words:
expired_info.append({
'word': word.word,
'type': word.type.value, # ← ИСПРАВЛЕНО: было word.word_type.value
'expires_at': word.expires_at
})
await session.delete(word)
await session.commit()
await self.refresh_cache()
logger.info(
f"Удалено {len(expired_words)} истёкших временных банвордов",
log_type="DATABASE"
)
for info in expired_info:
logger.debug(
f" └─ {info['type']}: '{info['word']}' (истёк: {info['expires_at']})",
log_type="DATABASE"
)
return len(expired_words)
except Exception as e:
logger.error(
f"Ошибка удаления истёкших временных слов: {e}",
log_type="DATABASE"
)
await session.rollback()
return 0
async def get_total_spam_count(self) -> int:
"""
Получает общее количество удалённых сообщений.
Returns:
int: Количество записей в SpamLog
"""
async with self.db.get_session() as session:
try:
query = select(func.count(SpamLog.id))
result = await session.execute(query)
count = result.scalar_one()
return count
except Exception as e:
logger.error(
f"Ошибка подсчёта спам-логов: {e}",
log_type="DATABASE"
)
return 0
async def reset_spam_stats(self) -> bool:
"""
Очищает всю статистику спама.
Returns:
bool: True если успешно
"""
async with self.db.get_session() as session:
try:
await session.execute(delete(SpamLog))
await session.commit()
logger.info("Статистика спама сброшена", log_type="DATABASE")
return True
except Exception as e:
logger.error(
f"Ошибка сброса статистики: {e}",
log_type="DATABASE"
)
await session.rollback()
return False
# === AUTO COMMENTS ===
async def get_auto_comment_settings(self, channel_id: int) -> dict:
"""
Получает настройки автокомментариев для канала.
ВАЖНО: возвращает сохранённые поля даже когда is_enabled=False,
чтобы UI/preview показывали реальную конфигурацию.
"""
from configs import settings
auto_comment = await self.repo.get_auto_comment(channel_id)
defaults = {
'text': settings.AUTO_COMMENT_TEXT,
'button_text': settings.AUTO_COMMENT_BUTTON_TEXT,
'button_url': settings.AUTO_COMMENT_BUTTON_URL,
'photo_url': settings.AUTO_COMMENT_PHOTO_URL,
'is_enabled': False,
}
if not auto_comment:
return defaults
return {
'text': auto_comment.text if auto_comment.text is not None else defaults['text'],
'button_text': auto_comment.button_text if auto_comment.button_text is not None else defaults['button_text'],
'button_url': auto_comment.button_url if auto_comment.button_url is not None else defaults['button_url'],
'photo_url': auto_comment.photo_url if auto_comment.photo_url is not None else defaults['photo_url'],
'is_enabled': bool(auto_comment.is_enabled),
}
async def save_auto_comment_settings(
self,
channel_id: int,
text: str,
button_text: str,
button_url: str,
photo_url: str,
updated_by: Optional[int] = None
) -> bool:
"""Сохраняет настройки автокомментариев"""
return await self.repo.set_auto_comment(
channel_id=channel_id,
text=text,
button_text=button_text,
button_url=button_url,
photo_url=photo_url,
updated_by=updated_by,
is_enabled=True
)
async def update_auto_comment_text(
self,
channel_id: int,
text: str,
updated_by: Optional[int] = None
) -> bool:
"""Обновляет текст автокомментария"""
return await self.repo.update_auto_comment_field(
channel_id, 'text', text, updated_by
)
async def update_auto_comment_button(
self,
channel_id: int,
button_text: str,
button_url: str,
updated_by: Optional[int] = None
) -> bool:
"""Обновляет кнопку автокомментария"""
success_text = await self.repo.update_auto_comment_field(
channel_id, 'button_text', button_text, updated_by
)
success_url = await self.repo.update_auto_comment_field(
channel_id, 'button_url', button_url, updated_by
)
return success_text and success_url
async def update_auto_comment_photo(
self,
channel_id: int,
photo_url: str,
updated_by: Optional[int] = None
) -> bool:
"""Обновляет фото автокомментария"""
return await self.repo.update_auto_comment_field(
channel_id, 'photo_url', photo_url, updated_by
)
async def log_report(
self,
report_id: str,
reporter_id: int,
reporter_username: Optional[str],
reported_user_id: int,
reported_username: Optional[str],
chat_id: int,
chat_title: Optional[str],
message_id: int,
message_thread_id: Optional[int],
message_text: Optional[str],
reason: str
) -> bool:
"""Логирует репорт в БД"""
return await self.repo.log_report(
report_id=report_id,
reporter_id=reporter_id,
reporter_username=reporter_username,
reported_user_id=reported_user_id,
reported_username=reported_username,
chat_id=chat_id,
chat_title=chat_title,
message_id=message_id,
message_thread_id=message_thread_id,
message_text=message_text,
reason=reason
)
# ==================== ✅ КАНАЛЫ АВТОКОММЕНТАРИЕВ ====================
async def add_auto_comment_channel(self, channel_id: int, added_by: int) -> bool:
"""✅ Добавляет новый канал в БД"""
async with self.db.get_session() as session:
try:
new_channel = AutoComment(
channel_id=channel_id,
text="",
button_text="",
button_url="",
photo_url="",
is_enabled=False,
updated_by=added_by,
)
session.add(new_channel)
await session.commit()
await session.refresh(new_channel)
logger.info(f"✅ Канал добавлен: {channel_id}", log_type="CHANNEL")
return True
except Exception as e:
await session.rollback()
logger.error(f"Ошибка добавления канала {channel_id}: {e}", log_type="CHANNEL")
return False
async def get_auto_comment_channels(self) -> List[int]:
"""✅ Возвращает все channel_id из БД"""
async with self.db.get_session() as session:
result = await session.execute(select(AutoComment.channel_id).distinct())
return [row[0] for row in result.fetchall()]
async def delete_auto_comment(self, channel_id: int) -> bool:
"""✅ Удаляет настройки канала"""
async with self.db.get_session() as session:
try:
result = await session.execute(
delete(AutoComment).where(AutoComment.channel_id == channel_id)
)
if result.rowcount > 0:
await session.commit()
logger.info(f"✅ Канал удален: {channel_id}", log_type="CHANNEL")
return True
await session.rollback()
return False
except Exception as e:
await session.rollback()
logger.error(f"Ошибка удаления канала {channel_id}: {e}", log_type="CHANNEL")
return False
# ==================== BOT SETTINGS (замена .env) ====================
async def get_bot_settings(self) -> dict:
"""Получает все настройки бота из БД"""
settings = {
'admin_chat_id': await self.repo.get_setting("admin_chat_id"),
'admin_thread_id': await self.repo.get_setting("admin_thread_id"),
'report_chat_id': await self.repo.get_setting("report_chat_id"),
'report_thread_id': await self.repo.get_setting("report_thread_id"),
}
return {k: v for k, v in settings.items() if v is not None}
async def set_bot_setting(self, key: str, value: Optional[str]) -> bool:
"""
Сохраняет настройку бота в БД
Args:
key: admin_chat_id, admin_thread_id, report_chat_id, report_thread_id
value: str или None/null
Returns:
bool: True если сохранено
"""
if value is None:
return await self.repo.delete_setting(key)
else:
return await self.repo.set_setting(key, value)
async def get_bot_setting(self, key: str) -> Optional[str]:
"""Получает ОДНУ настройку бота"""
settings = await self.get_bot_settings()
return settings.get(key)
async def init_default_bot_settings(self) -> None:
"""Инициализирует настройки по умолчанию из .env, только если они ещё не установлены"""
try:
from configs import settings
defaults = {
"admin_chat_id": getattr(settings, 'ADMIN_CHAT_ID', None),
"admin_thread_id": str(getattr(settings, 'ADMIN_THREAD_ID', None)) if getattr(settings, 'ADMIN_THREAD_ID', None) else None,
"report_chat_id": getattr(settings, 'REPORT_CHAT_ID', None),
"report_thread_id": str(getattr(settings, 'REPORT_THREAD_ID', None)) if getattr(settings, 'REPORT_THREAD_ID', None) else None,
}
for key, value in defaults.items():
if value is not None: # В .env значение задано
existing = await self.get_bot_setting(key)
if existing is None:
await self.set_bot_setting(key, str(value))
logger.debug(f"Установлена настройка {key} из .env", log_type="SETTINGS")
else:
logger.debug(f"Настройка {key} уже существует ({existing}), пропускаем", log_type="SETTINGS")
logger.info("✅ Настройки бота инициализированы из .env (существующие сохранены)", log_type="SETTINGS")
except Exception as e:
logger.warning(f"Не удалось инициализировать настройки из .env: {e}", log_type="SETTINGS")
# Глобальный экземпляр менеджера
_manager_instance: Optional[BanWordsManager] = None
def get_manager() -> BanWordsManager:
"""
Возвращает глобальный экземпляр BanWordsManager (Singleton).
Returns:
BanWordsManager: Менеджер банвордов
"""
global _manager_instance
if _manager_instance is None:
_manager_instance = BanWordsManager()
return _manager_instance