diff --git a/database/manager.py b/database/manager.py new file mode 100644 index 0000000..f29b405 --- /dev/null +++ b/database/manager.py @@ -0,0 +1,855 @@ +""" +Высокоуровневый менеджер для работы с банвордами. +Упрощает использование repository в handlers и middleware. +""" +from typing import Set, Optional, List, Dict, Any +from datetime import datetime, timezone + +from middleware.loggers import logger +from .database import Database, get_db +from .repository import BanWordsRepository +from .models import BanWordType, SpamStat, SpamLog, TempBanWord, AutoComment + +from sqlalchemy import select, delete, func, desc + +__all__ = ("BanWordsManager", "get_manager") + + +class BanWordsManager: + """ + Менеджер для удобной работы с банвордами. + + Предоставляет упрощённый API для handlers и middleware. + + Attributes: + db: Экземпляр Database + repo: Repository для CRUD операций + """ + + def __init__(self, db: Optional[Database] = None): + """ + Args: + db: Экземпляр Database (если None, берётся глобальный) + """ + self.db = db or get_db() + self.repo = BanWordsRepository(self.db) + + # Кэш для часто используемых данных + self._cache_banwords: Optional[dict] = None + self._cache_whitelist: Optional[Set[str]] = None + self._cache_admins: Optional[Set[int]] = None + self._cache_updated_at: Optional[datetime] = None + + async def init(self) -> None: + """Инициализирует базу данных и загружает кэш""" + await self.db.init() + await self.init_default_bot_settings() # ← добавлено + await self.refresh_cache() + logger.info("BanWordsManager инициализирован", log_type="DATABASE") + + async def close(self) -> None: + """Закрывает соединение с БД""" + await self.db.close() + + # === CACHE MANAGEMENT === + + async def refresh_cache(self) -> None: + """Обновляет кэш из БД""" + try: + self._cache_banwords = await self.repo.get_all_banwords() + temp_banwords = await self.repo.get_all_temp_banwords() + + # Объединяем постоянные и временные банворды + for word_type, words in temp_banwords.items(): + if word_type in self._cache_banwords: + self._cache_banwords[word_type] |= words + + self._cache_whitelist = await self.repo.get_whitelist() + self._cache_admins = await self.repo.get_admins() + self._cache_updated_at = datetime.now() + + logger.debug("Кэш банвордов обновлён", log_type="DATABASE") + + except Exception as e: + logger.error(f"Ошибка обновления кэша: {e}", log_type="DATABASE") + + def invalidate_cache(self) -> None: + """Сбрасывает кэш (требует refresh_cache)""" + self._cache_banwords = None + self._cache_whitelist = None + self._cache_admins = None + self._cache_updated_at = None + + # === BANWORDS (с кэшем) === + + async def add_banword( + self, + word: str, + word_type: BanWordType, + added_by: Optional[int] = None, + reason: Optional[str] = None, + refresh_cache: bool = True + ) -> bool: + """ + Добавляет банворд и обновляет кэш. + + Args: + word: Слово + word_type: Тип + added_by: ID админа + reason: Причина + refresh_cache: Обновить кэш после добавления + + Returns: + bool: True если добавлен + """ + result = await self.repo.add_banword(word, word_type, added_by, reason) + + if result and refresh_cache: + await self.refresh_cache() + + return result + + async def remove_banword( + self, + word: str, + word_type: BanWordType, + refresh_cache: bool = True + ) -> bool: + """Удаляет банворд и обновляет кэш""" + result = await self.repo.remove_banword(word, word_type) + + if result and refresh_cache: + await self.refresh_cache() + + return result + + def get_banwords_cached(self, word_type: BanWordType) -> Set[str]: + """ + Получает банворды из кэша (быстро). + + Args: + word_type: Тип банвордов + + Returns: + Set[str]: Набор слов из кэша + """ + if self._cache_banwords is None: + logger.warning("Кэш не инициализирован", log_type="DATABASE") + return set() + + return self._cache_banwords.get(word_type, set()) + + async def get_banwords(self, word_type: BanWordType) -> Set[str]: + """Получает банворды напрямую из БД (без кэша)""" + return await self.repo.get_banwords(word_type) + + # === TEMPORARY BANWORDS === + + async def add_temp_banword( + self, + word: str, + word_type: BanWordType, + minutes: int, + added_by: Optional[int] = None, + refresh_cache: bool = True + ) -> bool: + """Добавляет временный банворд""" + result = await self.repo.add_temp_banword( + word, word_type, minutes, added_by + ) + + if result and refresh_cache: + await self.refresh_cache() + + return result + + async def remove_temp_banword( + self, + word: str, + word_type: BanWordType, + refresh_cache: bool = True + ) -> bool: + """Удаляет временный банворд""" + result = await self.repo.remove_temp_banword(word, word_type) + + if result and refresh_cache: + await self.refresh_cache() + + return result + + async def cleanup_expired(self) -> int: + """ + Очищает истёкшие временные банворды. + Вызывается периодически (например, раз в минуту). + + Returns: + int: Количество удалённых записей + """ + deleted = await self.repo.cleanup_expired_temp_banwords() + + if deleted > 0: + await self.refresh_cache() + + return deleted + + # === WHITELIST === + + async def add_whitelist( + self, + word: str, + added_by: Optional[int] = None, + reason: Optional[str] = None, + refresh_cache: bool = True + ) -> bool: + """Добавляет слово в белый список""" + result = await self.repo.add_whitelist(word, added_by, reason) + + if result and refresh_cache: + await self.refresh_cache() + + return result + + async def remove_whitelist( + self, + word: str, + refresh_cache: bool = True + ) -> bool: + """Удаляет слово из белого списка""" + result = await self.repo.remove_whitelist(word) + + if result and refresh_cache: + await self.refresh_cache() + + return result + + def get_whitelist_cached(self) -> Set[str]: + """Получает белый список из кэша""" + if self._cache_whitelist is None: + logger.warning("Кэш whitelist не инициализирован", log_type="DATABASE") + return set() + + return self._cache_whitelist + + def is_whitelisted(self, text: str) -> bool: + """ + Проверяет, содержит ли текст слово из белого списка. + + Args: + text: Текст для проверки (lowercase) + + Returns: + bool: True если найдено исключение + """ + whitelist = self.get_whitelist_cached() + return any(word in text for word in whitelist) + + # === ADMINS === + + async def add_admin( + self, + user_id: int, + added_by: Optional[int] = None, + refresh_cache: bool = True + ) -> bool: + """Добавляет администратора""" + result = await self.repo.add_admin(user_id, added_by) + + if result and refresh_cache: + await self.refresh_cache() + + return result + + async def remove_admin( + self, + user_id: int, + refresh_cache: bool = True + ) -> bool: + """Удаляет администратора""" + result = await self.repo.remove_admin(user_id) + + if result and refresh_cache: + await self.refresh_cache() + + return result + + def get_admins_cached(self) -> Set[int]: + """Получает список админов из кэша""" + if self._cache_admins is None: + logger.warning("Кэш админов не инициализирован", log_type="DATABASE") + return set() + + return self._cache_admins + + def is_admin_cached(self, user_id: int) -> bool: + """ + Проверяет, является ли пользователь админом (из кэша). + + Args: + user_id: Telegram ID + + Returns: + bool: True если админ + """ + return user_id in self.get_admins_cached() + + async def is_admin(self, user_id: int) -> bool: + """Проверяет админа напрямую из БД""" + return await self.repo.is_admin(user_id) + + # === SETTINGS (режимы silence/conflict) === + + async def set_silence_mode(self, minutes: int) -> datetime: + """ + Включает режим тишины на указанное время. + + Args: + minutes: Длительность в минутах + + Returns: + datetime: Время окончания режима + """ + expires_at = datetime.now().timestamp() + (minutes * 60) + await self.repo.set_setting("silence_until", str(expires_at)) + + logger.info( + f"Режим тишины активирован на {minutes} мин", + log_type="SILENCE" + ) + + return datetime.fromtimestamp(expires_at) + + async def disable_silence_mode(self) -> None: + """Отключает режим тишины""" + await self.repo.delete_setting("silence_until") + logger.info("Режим тишины отключён", log_type="SILENCE") + + async def is_silence_active(self) -> bool: + """Проверяет, активен ли режим тишины""" + silence_until_str = await self.repo.get_setting("silence_until") + + if not silence_until_str: + return False + + try: + silence_until = float(silence_until_str) + now = datetime.now().timestamp() + + if now >= silence_until: + await self.disable_silence_mode() + return False + + return True + + except (ValueError, TypeError): + return False + + async def set_conflict_mode(self, minutes: int) -> datetime: + """ + Включает режим антиконфликта на указанное время. + + Args: + minutes: Длительность в минутах + + Returns: + datetime: Время окончания режима + """ + expires_at = datetime.now().timestamp() + (minutes * 60) + await self.repo.set_setting("conflict_until", str(expires_at)) + + logger.info( + f"Режим антиконфликта активирован на {minutes} мин", + log_type="CONFLICT" + ) + + return datetime.fromtimestamp(expires_at) + + async def disable_conflict_mode(self) -> None: + """Отключает режим антиконфликта""" + await self.repo.delete_setting("conflict_until") + logger.info("Режим антиконфликта отключён", log_type="CONFLICT") + + async def is_conflict_active(self) -> bool: + """Проверяет, активен ли режим антиконфликта""" + conflict_until_str = await self.repo.get_setting("conflict_until") + + if not conflict_until_str: + return False + + try: + conflict_until = float(conflict_until_str) + now = datetime.now().timestamp() + + if now >= conflict_until: + await self.disable_conflict_mode() + return False + + return True + + except (ValueError, TypeError): + return False + + # === STATISTICS === + + async def log_spam( + self, + user_id: int, + username: str, + chat_id: int, + message_text: str, + matched_word: str, + match_type: str + ) -> None: + """Логирует удаление спам-сообщения""" + await self.repo.log_spam_deletion( + user_id=user_id, + username=username, + chat_id=chat_id, + message_text=message_text, + matched_word=matched_word, + match_type=match_type + ) + + async def get_spam_stats( + self, + limit: int = 100, + user_id: Optional[int] = None + ) -> List[SpamStat]: + """Получает статистику удалений""" + return await self.repo.get_spam_stats(limit, user_id) + + async def get_user_spam_count(self, user_id: int) -> int: + """Получает количество удалённых сообщений пользователя""" + return await self.repo.get_user_spam_count(user_id) + + async def get_top_spammers(self, limit: int = 10) -> List[tuple[int, int]]: + """Получает топ спамеров""" + return await self.repo.get_top_spammers(limit) + + # === INFO === + + async def get_stats(self) -> dict: + """Получает общую статистику""" + db_stats = await self.repo.get_stats() + + cache_info = { + 'cache_active': self._cache_banwords is not None, + 'cache_updated_at': self._cache_updated_at.isoformat() if self._cache_updated_at else None + } + + return {**db_stats, **cache_info} + + async def get_all_words_list(self) -> dict: + """ + Получает все слова для команды /listwords. + + Returns: + dict: Словарь со всеми категориями слов + """ + banwords = await self.repo.get_all_banwords() + temp_banwords = await self.repo.get_all_temp_banwords() + whitelist = await self.repo.get_whitelist() + admins = await self.repo.get_admins() + + return { + 'substring': banwords.get(BanWordType.SUBSTRING, set()), + 'lemma': banwords.get(BanWordType.LEMMA, set()), + 'part': banwords.get(BanWordType.PART, set()), + 'conflict_substring': banwords.get(BanWordType.CONFLICT_SUBSTRING, set()), + 'conflict_lemma': banwords.get(BanWordType.CONFLICT_LEMMA, set()), + 'temp_substring': temp_banwords.get(BanWordType.SUBSTRING, set()), + 'temp_lemma': temp_banwords.get(BanWordType.LEMMA, set()), + 'whitelist': whitelist, + 'admins': admins + } + + async def get_top_words(self, limit: int = 10) -> List[Dict[str, Any]]: + """ + Получает топ N самых часто срабатывающих слов. + + Args: + limit: Количество слов в топе + + Returns: + List[Dict]: Список словарей с данными: + - word: слово + - count: количество срабатываний + - type: тип проверки + """ + async with self.db.get_session() as session: + try: + query = select( + SpamLog.matched_word, + SpamLog.match_type, + func.count(SpamLog.id).label('count') + ).where( + SpamLog.matched_word.isnot(None) + ).group_by( + SpamLog.matched_word, + SpamLog.match_type + ).order_by( + desc('count') + ).limit(limit) + + result = await session.execute(query) + rows = result.all() + + top_words = [] + for row in rows: + top_words.append({ + 'word': row.matched_word, + 'type': row.match_type, + 'count': row.count + }) + + logger.debug( + f"Получен топ-{limit} слов: {len(top_words)} записей", + log_type="DATABASE" + ) + + return top_words + + except Exception as e: + logger.error( + f"Ошибка получения топ-слов: {e}", + log_type="DATABASE" + ) + return [] + + async def cleanup_expired_temp_words(self) -> int: + """ + Удаляет истёкшие временные банворды. + + Returns: + int: Количество удалённых слов + """ + async with self.db.get_session() as session: + try: + now = datetime.now(timezone.utc) + + query = select(TempBanWord).where( + TempBanWord.expires_at < now + ) + result = await session.execute(query) + expired_words = result.scalars().all() + + if not expired_words: + return 0 + + expired_info = [] + for word in expired_words: + expired_info.append({ + 'word': word.word, + 'type': word.type.value, # ← ИСПРАВЛЕНО: было word.word_type.value + 'expires_at': word.expires_at + }) + await session.delete(word) + + await session.commit() + await self.refresh_cache() + + logger.info( + f"Удалено {len(expired_words)} истёкших временных банвордов", + log_type="DATABASE" + ) + + for info in expired_info: + logger.debug( + f" └─ {info['type']}: '{info['word']}' (истёк: {info['expires_at']})", + log_type="DATABASE" + ) + + return len(expired_words) + + except Exception as e: + logger.error( + f"Ошибка удаления истёкших временных слов: {e}", + log_type="DATABASE" + ) + await session.rollback() + return 0 + + async def get_total_spam_count(self) -> int: + """ + Получает общее количество удалённых сообщений. + + Returns: + int: Количество записей в SpamLog + """ + async with self.db.get_session() as session: + try: + query = select(func.count(SpamLog.id)) + result = await session.execute(query) + count = result.scalar_one() + return count + except Exception as e: + logger.error( + f"Ошибка подсчёта спам-логов: {e}", + log_type="DATABASE" + ) + return 0 + + async def reset_spam_stats(self) -> bool: + """ + Очищает всю статистику спама. + + Returns: + bool: True если успешно + """ + async with self.db.get_session() as session: + try: + await session.execute(delete(SpamLog)) + await session.commit() + + logger.info("Статистика спама сброшена", log_type="DATABASE") + return True + + except Exception as e: + logger.error( + f"Ошибка сброса статистики: {e}", + log_type="DATABASE" + ) + await session.rollback() + return False + + # === AUTO COMMENTS === + + async def get_auto_comment_settings(self, channel_id: int) -> dict: + """ + Получает настройки автокомментариев для канала. + + ВАЖНО: возвращает сохранённые поля даже когда is_enabled=False, + чтобы UI/preview показывали реальную конфигурацию. + """ + from configs import settings + + auto_comment = await self.repo.get_auto_comment(channel_id) + + defaults = { + 'text': settings.AUTO_COMMENT_TEXT, + 'button_text': settings.AUTO_COMMENT_BUTTON_TEXT, + 'button_url': settings.AUTO_COMMENT_BUTTON_URL, + 'photo_url': settings.AUTO_COMMENT_PHOTO_URL, + 'is_enabled': False, + } + + if not auto_comment: + return defaults + + return { + 'text': auto_comment.text if auto_comment.text is not None else defaults['text'], + 'button_text': auto_comment.button_text if auto_comment.button_text is not None else defaults['button_text'], + 'button_url': auto_comment.button_url if auto_comment.button_url is not None else defaults['button_url'], + 'photo_url': auto_comment.photo_url if auto_comment.photo_url is not None else defaults['photo_url'], + 'is_enabled': bool(auto_comment.is_enabled), + } + + async def save_auto_comment_settings( + self, + channel_id: int, + text: str, + button_text: str, + button_url: str, + photo_url: str, + updated_by: Optional[int] = None + ) -> bool: + """Сохраняет настройки автокомментариев""" + return await self.repo.set_auto_comment( + channel_id=channel_id, + text=text, + button_text=button_text, + button_url=button_url, + photo_url=photo_url, + updated_by=updated_by, + is_enabled=True + ) + + async def update_auto_comment_text( + self, + channel_id: int, + text: str, + updated_by: Optional[int] = None + ) -> bool: + """Обновляет текст автокомментария""" + return await self.repo.update_auto_comment_field( + channel_id, 'text', text, updated_by + ) + + async def update_auto_comment_button( + self, + channel_id: int, + button_text: str, + button_url: str, + updated_by: Optional[int] = None + ) -> bool: + """Обновляет кнопку автокомментария""" + success_text = await self.repo.update_auto_comment_field( + channel_id, 'button_text', button_text, updated_by + ) + success_url = await self.repo.update_auto_comment_field( + channel_id, 'button_url', button_url, updated_by + ) + return success_text and success_url + + async def update_auto_comment_photo( + self, + channel_id: int, + photo_url: str, + updated_by: Optional[int] = None + ) -> bool: + """Обновляет фото автокомментария""" + return await self.repo.update_auto_comment_field( + channel_id, 'photo_url', photo_url, updated_by + ) + + async def log_report( + self, + report_id: str, + reporter_id: int, + reporter_username: Optional[str], + reported_user_id: int, + reported_username: Optional[str], + chat_id: int, + chat_title: Optional[str], + message_id: int, + message_thread_id: Optional[int], + message_text: Optional[str], + reason: str + ) -> bool: + """Логирует репорт в БД""" + return await self.repo.log_report( + report_id=report_id, + reporter_id=reporter_id, + reporter_username=reporter_username, + reported_user_id=reported_user_id, + reported_username=reported_username, + chat_id=chat_id, + chat_title=chat_title, + message_id=message_id, + message_thread_id=message_thread_id, + message_text=message_text, + reason=reason + ) + + # ==================== ✅ КАНАЛЫ АВТОКОММЕНТАРИЕВ ==================== + + async def add_auto_comment_channel(self, channel_id: int, added_by: int) -> bool: + """✅ Добавляет новый канал в БД""" + async with self.db.get_session() as session: + try: + new_channel = AutoComment( + channel_id=channel_id, + text="", + button_text="", + button_url="", + photo_url="", + is_enabled=False, + updated_by=added_by, + ) + session.add(new_channel) + await session.commit() + await session.refresh(new_channel) + logger.info(f"✅ Канал добавлен: {channel_id}", log_type="CHANNEL") + return True + except Exception as e: + await session.rollback() + logger.error(f"Ошибка добавления канала {channel_id}: {e}", log_type="CHANNEL") + return False + + async def get_auto_comment_channels(self) -> List[int]: + """✅ Возвращает все channel_id из БД""" + async with self.db.get_session() as session: + result = await session.execute(select(AutoComment.channel_id).distinct()) + return [row[0] for row in result.fetchall()] + + async def delete_auto_comment(self, channel_id: int) -> bool: + """✅ Удаляет настройки канала""" + async with self.db.get_session() as session: + try: + result = await session.execute( + delete(AutoComment).where(AutoComment.channel_id == channel_id) + ) + if result.rowcount > 0: + await session.commit() + logger.info(f"✅ Канал удален: {channel_id}", log_type="CHANNEL") + return True + await session.rollback() + return False + except Exception as e: + await session.rollback() + logger.error(f"Ошибка удаления канала {channel_id}: {e}", log_type="CHANNEL") + return False + + # ==================== BOT SETTINGS (замена .env) ==================== + async def get_bot_settings(self) -> dict: + """Получает все настройки бота из БД""" + settings = { + 'admin_chat_id': await self.repo.get_setting("admin_chat_id"), + 'admin_thread_id': await self.repo.get_setting("admin_thread_id"), + 'report_chat_id': await self.repo.get_setting("report_chat_id"), + 'report_thread_id': await self.repo.get_setting("report_thread_id"), + } + return {k: v for k, v in settings.items() if v is not None} + + async def set_bot_setting(self, key: str, value: Optional[str]) -> bool: + """ + Сохраняет настройку бота в БД + + Args: + key: admin_chat_id, admin_thread_id, report_chat_id, report_thread_id + value: str или None/null + + Returns: + bool: True если сохранено + """ + if value is None: + return await self.repo.delete_setting(key) + else: + return await self.repo.set_setting(key, value) + + async def get_bot_setting(self, key: str) -> Optional[str]: + """Получает ОДНУ настройку бота""" + settings = await self.get_bot_settings() + return settings.get(key) + + async def init_default_bot_settings(self) -> None: + """Инициализирует настройки по умолчанию из .env, только если они ещё не установлены""" + try: + from configs import settings + + defaults = { + "admin_chat_id": getattr(settings, 'ADMIN_CHAT_ID', None), + "admin_thread_id": str(getattr(settings, 'ADMIN_THREAD_ID', None)) if getattr(settings, 'ADMIN_THREAD_ID', None) else None, + "report_chat_id": getattr(settings, 'REPORT_CHAT_ID', None), + "report_thread_id": str(getattr(settings, 'REPORT_THREAD_ID', None)) if getattr(settings, 'REPORT_THREAD_ID', None) else None, + } + + for key, value in defaults.items(): + if value is not None: # В .env значение задано + existing = await self.get_bot_setting(key) + if existing is None: + await self.set_bot_setting(key, str(value)) + logger.debug(f"Установлена настройка {key} из .env", log_type="SETTINGS") + else: + logger.debug(f"Настройка {key} уже существует ({existing}), пропускаем", log_type="SETTINGS") + + logger.info("✅ Настройки бота инициализированы из .env (существующие сохранены)", log_type="SETTINGS") + except Exception as e: + logger.warning(f"Не удалось инициализировать настройки из .env: {e}", log_type="SETTINGS") + + +# Глобальный экземпляр менеджера +_manager_instance: Optional[BanWordsManager] = None + + +def get_manager() -> BanWordsManager: + """ + Возвращает глобальный экземпляр BanWordsManager (Singleton). + + Returns: + BanWordsManager: Менеджер банвордов + """ + global _manager_instance + if _manager_instance is None: + _manager_instance = BanWordsManager() + return _manager_instance