""" Repository для работы с банвордами через SQLAlchemy ORM. """ from typing import Set, List, Optional from datetime import datetime, timedelta, timezone from sqlalchemy import select, delete, func, and_ from middleware.loggers import logger from .database import Database from .models import ( BanWord, TempBanWord, WhitelistWord, Admin, Setting, SpamStat, BanWordType, AutoComment ) __all__ = ("BanWordsRepository",) class BanWordsRepository: """ Repository для CRUD операций с банвордами. Все методы работают через SQLAlchemy ORM. """ def __init__(self, db: Database): """ Args: db: Экземпляр Database """ self.db = db # === BANWORDS === async def add_banword( self, word: str, word_type: BanWordType, added_by: Optional[int] = None, reason: Optional[str] = None ) -> bool: """ Добавляет постоянный банворд. Args: word: Слово для блокировки word_type: Тип банворда added_by: ID админа, который добавил reason: Причина добавления Returns: bool: True если добавлен, False если уже существует """ try: async with self.db.get_session() as session: # Проверяем, существует ли уже existing = await session.execute( select(BanWord).where( and_( BanWord.word == word.lower(), BanWord.type == word_type ) ) ) if existing.scalar_one_or_none(): return False # Добавляем новый banword = BanWord( word=word.lower(), type=word_type, added_by=added_by, reason=reason ) session.add(banword) await session.commit() logger.info( f"Добавлен банворд: '{word}' ({word_type.value})", log_type="DATABASE" ) return True except Exception as e: logger.error( f"Ошибка добавления банворда: {e}", log_type="DATABASE" ) return False async def remove_banword(self, word: str, word_type: BanWordType) -> bool: """ Удаляет банворд. Args: word: Слово word_type: Тип Returns: bool: True если удалён """ try: async with self.db.get_session() as session: result = await session.execute( delete(BanWord).where( and_( BanWord.word == word.lower(), BanWord.type == word_type ) ) ) await session.commit() deleted = result.rowcount > 0 if deleted: logger.info( f"Удалён банворд: '{word}' ({word_type.value})", log_type="DATABASE" ) return deleted except Exception as e: logger.error( f"Ошибка удаления банворда: {e}", log_type="DATABASE" ) return False async def get_banwords(self, word_type: BanWordType) -> Set[str]: """ Получает все банворды определённого типа. Args: word_type: Тип банвордов Returns: Set[str]: Набор слов """ try: async with self.db.get_session() as session: result = await session.execute( select(BanWord.word).where(BanWord.type == word_type) ) return set(result.scalars().all()) except Exception as e: logger.error( f"Ошибка получения банвордов: {e}", log_type="DATABASE" ) return set() async def get_all_banwords(self) -> dict[BanWordType, Set[str]]: result = { BanWordType.WORD: set(), BanWordType.LEMMA: set(), BanWordType.PART: set(), BanWordType.CONFLICT_WORD: set(), BanWordType.CONFLICT_LEMMA: set(), BanWordType.CONFLICT_PART: set(), } try: async with self.db.get_session() as session: banwords = await session.execute(select(BanWord)) loaded = 0 for banword in banwords.scalars(): try: word_type = ( banword.type if isinstance(banword.type, BanWordType) else BanWordType(banword.type.casefold()) ) if word_type in result: result[word_type].add(banword.word) loaded += 1 except ValueError: logger.warning( f"Неизвестный тип: '{banword.type}' для '{banword.word}'", log_type="DATABASE" ) logger.info(f"✅ Кэш загружен: {loaded} банвордов", log_type="DATABASE") except Exception as e: logger.error(f"❌ get_all_banwords: {e}", log_type="DATABASE") return result async def search_banwords(self, query: str, limit: int = 50) -> List[BanWord]: """ Поиск банвордов по частичному совпадению. Args: query: Поисковый запрос limit: Максимум результатов Returns: List[BanWord]: Найденные банворды """ try: async with self.db.get_session() as session: result = await session.execute( select(BanWord) .where(BanWord.word.contains(query.lower())) .limit(limit) ) return list(result.scalars().all()) except Exception as e: logger.error( f"Ошибка поиска банвордов: {e}", log_type="DATABASE" ) return [] # === TEMPORARY BANWORDS === async def add_temp_banword( self, word: str, word_type: BanWordType, minutes: int, added_by: Optional[int] = None ) -> bool: """ Добавляет временный банворд. Args: word: Слово word_type: Тип minutes: Длительность в минутах added_by: ID админа Returns: bool: True если добавлен """ try: async with self.db.get_session() as session: # Проверяем существование existing = await session.execute( select(TempBanWord).where( and_( TempBanWord.word == word.lower(), TempBanWord.type == word_type ) ) ) if existing.scalar_one_or_none(): return False # Добавляем expires_at = datetime.now() + timedelta(minutes=minutes) temp_banword = TempBanWord( word=word.lower(), type=word_type, added_by=added_by, expires_at=expires_at ) session.add(temp_banword) await session.commit() logger.info( f"Добавлен временный банворд: '{word}' на {minutes} мин", log_type="DATABASE" ) return True except Exception as e: logger.error( f"Ошибка добавления временного банворда: {e}", log_type="DATABASE" ) return False async def remove_temp_banword(self, word: str, word_type: BanWordType) -> bool: """Удаляет временный банворд досрочно""" try: async with self.db.get_session() as session: result = await session.execute( delete(TempBanWord).where( and_( TempBanWord.word == word.lower(), TempBanWord.type == word_type ) ) ) await session.commit() deleted = result.rowcount > 0 if deleted: logger.info( f"Удалён временный банворд: '{word}'", log_type="DATABASE" ) return deleted except Exception as e: logger.error( f"Ошибка удаления временного банворда: {e}", log_type="DATABASE" ) return False async def get_temp_banwords(self, word_type: BanWordType) -> Set[str]: """ Получает активные (не истёкшие) временные банворды. Args: word_type: Тип банвордов Returns: Set[str]: Набор активных временных слов """ try: async with self.db.get_session() as session: result = await session.execute( select(TempBanWord.word).where( and_( TempBanWord.type == word_type, TempBanWord.expires_at > datetime.now() ) ) ) return set(result.scalars().all()) except Exception as e: logger.error( f"Ошибка получения временных банвордов: {e}", log_type="DATABASE" ) return set() async def get_all_temp_banwords(self) -> dict[BanWordType, Set[str]]: """Получает все активные временные банворды по типам""" result = { BanWordType.WORD: set(), BanWordType.LEMMA: set(), } try: async with self.db.get_session() as session: temp_banwords = await session.execute( select(TempBanWord).where( TempBanWord.expires_at > datetime.now() ) ) for temp_banword in temp_banwords.scalars(): word_type = ( temp_banword.type if isinstance(temp_banword.type, BanWordType) else BanWordType(temp_banword.type.casefold()) ) if word_type in result: result[word_type].add(temp_banword.word) except Exception as e: logger.error( f"Ошибка получения всех временных банвордов: {e}", log_type="DATABASE" ) return result async def cleanup_expired_temp_banwords(self) -> int: """ Удаляет истёкшие временные банворды. Returns: int: Количество удалённых записей """ try: async with self.db.get_session() as session: result = await session.execute( delete(TempBanWord).where( TempBanWord.expires_at <= datetime.now() ) ) await session.commit() deleted = result.rowcount if deleted > 0: logger.info( f"Удалено {deleted} истёкших временных банвордов", log_type="DATABASE" ) return deleted except Exception as e: logger.error( f"Ошибка очистки временных банвордов: {e}", log_type="DATABASE" ) return 0 # === WHITELIST === async def add_whitelist( self, word: str, added_by: Optional[int] = None, reason: Optional[str] = None ) -> bool: """Добавляет слово в белый список (исключение)""" try: async with self.db.get_session() as session: # Проверяем существование existing = await session.execute( select(WhitelistWord).where( WhitelistWord.word == word.lower() ) ) if existing.scalar_one_or_none(): return False # Добавляем whitelist_word = WhitelistWord( word=word.lower(), added_by=added_by, reason=reason ) session.add(whitelist_word) await session.commit() logger.info( f"Добавлено исключение: '{word}'", log_type="DATABASE" ) return True except Exception as e: logger.error( f"Ошибка добавления исключения: {e}", log_type="DATABASE" ) return False async def remove_whitelist(self, word: str) -> bool: """Удаляет слово из белого списка""" try: async with self.db.get_session() as session: result = await session.execute( delete(WhitelistWord).where( WhitelistWord.word == word.lower() ) ) await session.commit() deleted = result.rowcount > 0 if deleted: logger.info( f"Удалено исключение: '{word}'", log_type="DATABASE" ) return deleted except Exception as e: logger.error( f"Ошибка удаления исключения: {e}", log_type="DATABASE" ) return False async def get_whitelist(self) -> Set[str]: """Получает все слова из белого списка""" try: async with self.db.get_session() as session: result = await session.execute(select(WhitelistWord.word)) return set(result.scalars().all()) except Exception as e: logger.error( f"Ошибка получения whitelist: {e}", log_type="DATABASE" ) return set() # === ADMINS === async def add_admin( self, user_id: int, added_by: Optional[int] = None ) -> bool: """Добавляет администратора""" try: async with self.db.get_session() as session: # Проверяем существование existing = await session.execute( select(Admin).where(Admin.user_id == user_id) ) if existing.scalar_one_or_none(): return False # Добавляем admin = Admin(user_id=user_id, added_by=added_by) session.add(admin) await session.commit() logger.info( f"Добавлен админ: {user_id}", log_type="DATABASE" ) return True except Exception as e: logger.error( f"Ошибка добавления админа: {e}", log_type="DATABASE" ) return False async def remove_admin(self, user_id: int) -> bool: """Удаляет администратора""" try: async with self.db.get_session() as session: result = await session.execute( delete(Admin).where(Admin.user_id == user_id) ) await session.commit() deleted = result.rowcount > 0 if deleted: logger.info( f"Удалён админ: {user_id}", log_type="DATABASE" ) return deleted except Exception as e: logger.error( f"Ошибка удаления админа: {e}", log_type="DATABASE" ) return False async def get_admins(self) -> Set[int]: """Получает всех администраторов""" try: async with self.db.get_session() as session: result = await session.execute(select(Admin.user_id)) return set(result.scalars().all()) except Exception as e: logger.error( f"Ошибка получения админов: {e}", log_type="DATABASE" ) return set() async def is_admin(self, user_id: int) -> bool: """Проверяет, является ли пользователь админом""" try: async with self.db.get_session() as session: result = await session.execute( select(Admin).where(Admin.user_id == user_id) ) return result.scalar_one_or_none() is not None except Exception as e: logger.error( f"Ошибка проверки админа: {e}", log_type="DATABASE" ) return False # === SETTINGS === async def set_setting(self, key: str, value: str) -> None: """ Сохраняет настройку (или обновляет существующую). Args: key: Ключ настройки value: Значение (строка или JSON) """ try: async with self.db.get_session() as session: # Проверяем существование existing = await session.execute( select(Setting).where(Setting.key == key) ) setting = existing.scalar_one_or_none() if setting: # Обновляем существующую setting.value = value setting.updated_at = datetime.now() else: # Создаём новую setting = Setting(key=key, value=value) session.add(setting) await session.commit() except Exception as e: logger.error( f"Ошибка сохранения настройки: {e}", log_type="DATABASE" ) async def get_setting( self, key: str, default: Optional[str] = None ) -> Optional[str]: """ Получает значение настройки. Args: key: Ключ настройки default: Значение по умолчанию Returns: Optional[str]: Значение или default """ try: async with self.db.get_session() as session: result = await session.execute( select(Setting.value).where(Setting.key == key) ) value = result.scalar_one_or_none() return value if value is not None else default except Exception as e: logger.error( f"Ошибка получения настройки: {e}", log_type="DATABASE" ) return default async def delete_setting(self, key: str) -> bool: """Удаляет настройку""" try: async with self.db.get_session() as session: result = await session.execute( delete(Setting).where(Setting.key == key) ) await session.commit() return result.rowcount > 0 except Exception as e: logger.error( f"Ошибка удаления настройки: {e}", log_type="DATABASE" ) return False # === STATISTICS === async def log_spam_deletion( self, user_id: int, username: str, chat_id: int, message_text: str, matched_word: str, match_type: str ) -> None: """ Записывает статистику удалённого спам-сообщения. Args: user_id: Telegram ID отправителя username: Username отправителя chat_id: ID чата message_text: Текст сообщения (обрезается до 500 символов) matched_word: Слово, по которому сработал фильтр match_type: Тип проверки (substring/lemma/part/silence/conflict) """ try: async with self.db.get_session() as session: spam_stat = SpamStat( user_id=user_id, username=username, chat_id=chat_id, message_text=message_text[:500], matched_word=matched_word, match_type=match_type ) session.add(spam_stat) await session.commit() except Exception as e: logger.error( f"Ошибка логирования статистики: {e}", log_type="DATABASE" ) async def get_spam_stats( self, limit: int = 100, user_id: Optional[int] = None ) -> List[SpamStat]: """ Получает последнюю статистику удалений. Args: limit: Максимум записей user_id: Фильтр по пользователю (опционально) Returns: List[SpamStat]: Список записей статистики """ try: async with self.db.get_session() as session: query = select(SpamStat).order_by(SpamStat.deleted_at.desc()) if user_id: query = query.where(SpamStat.user_id == user_id) query = query.limit(limit) result = await session.execute(query) return list(result.scalars().all()) except Exception as e: logger.error( f"Ошибка получения статистики: {e}", log_type="DATABASE" ) return [] async def get_user_spam_count(self, user_id: int) -> int: """Получает количество удалённых сообщений пользователя""" try: async with self.db.get_session() as session: result = await session.execute( select(func.count(SpamStat.id)).where( SpamStat.user_id == user_id ) ) return result.scalar_one() except Exception as e: logger.error( f"Ошибка подсчёта спама: {e}", log_type="DATABASE" ) return 0 async def get_top_spammers(self, limit: int = 10) -> List[tuple[int, int]]: """ Получает топ спамеров. Args: limit: Количество записей Returns: List[tuple[int, int]]: [(user_id, count), ...] """ try: async with self.db.get_session() as session: result = await session.execute( select( SpamStat.user_id, func.count(SpamStat.id).label('count') ) .group_by(SpamStat.user_id) .order_by(func.count(SpamStat.id).desc()) .limit(limit) ) return [(row.user_id, row.count) for row in result] except Exception as e: logger.error( f"Ошибка получения топ спамеров: {e}", log_type="DATABASE" ) return [] # === GENERAL === async def get_stats(self) -> dict: """Получает общую статистику БД""" try: async with self.db.get_session() as session: banwords_count = await session.execute( select(func.count(BanWord.id)) ) temp_banwords_count = await session.execute( select(func.count(TempBanWord.id)) ) whitelist_count = await session.execute( select(func.count(WhitelistWord.id)) ) admins_count = await session.execute( select(func.count(Admin.id)) ) spam_stats_count = await session.execute( select(func.count(SpamStat.id)) ) return { 'banwords': banwords_count.scalar_one(), 'temp_banwords': temp_banwords_count.scalar_one(), 'whitelist': whitelist_count.scalar_one(), 'admins': admins_count.scalar_one(), 'spam_deletions': spam_stats_count.scalar_one(), } except Exception as e: logger.error( f"Ошибка получения статистики: {e}", log_type="DATABASE" ) return {} # === AUTO COMMENTS === async def get_auto_comment(self, channel_id: int) -> Optional['AutoComment']: """ Получает настройки автокомментариев для канала. Args: channel_id: ID канала Returns: AutoComment или None """ try: async with self.db.get_session() as session: result = await session.execute( select(AutoComment).where(AutoComment.channel_id == channel_id) ) return result.scalar_one_or_none() except Exception as e: logger.error( f"Ошибка получения автокомментария: {e}", log_type="DATABASE" ) return None async def set_auto_comment( self, channel_id: int, text: str, button_text: str, button_url: str, photo_url: str, updated_by: Optional[int] = None, is_enabled: bool = True ) -> bool: """ Сохраняет или обновляет настройки автокомментариев. Args: channel_id: ID канала text: Текст комментария button_text: Текст кнопки button_url: URL кнопки photo_url: URL фото updated_by: ID админа is_enabled: Включены ли комментарии Returns: bool: True если успешно """ try: async with self.db.get_session() as session: # Проверяем существование result = await session.execute( select(AutoComment).where(AutoComment.channel_id == channel_id) ) auto_comment = result.scalar_one_or_none() if auto_comment: # Обновляем существующую auto_comment.text = text auto_comment.button_text = button_text auto_comment.button_url = button_url auto_comment.photo_url = photo_url auto_comment.is_enabled = is_enabled auto_comment.updated_by = updated_by auto_comment.updated_at = datetime.now(timezone.utc) else: # Создаём новую auto_comment = AutoComment( channel_id=channel_id, text=text, button_text=button_text, button_url=button_url, photo_url=photo_url, is_enabled=is_enabled, updated_by=updated_by ) session.add(auto_comment) await session.commit() logger.info( f"Автокомментарий для канала {channel_id} обновлён", log_type="DATABASE" ) return True except Exception as e: logger.error( f"Ошибка сохранения автокомментария: {e}", log_type="DATABASE" ) return False async def update_auto_comment_field( self, channel_id: int, field: str, value: str, updated_by: Optional[int] = None ) -> bool: """ Обновляет одно поле автокомментария. Args: channel_id: ID канала field: Имя поля (text, button_text, button_url, photo_url) value: Новое значение updated_by: ID админа Returns: bool: True если успешно """ try: async with self.db.get_session() as session: result = await session.execute( select(AutoComment).where(AutoComment.channel_id == channel_id) ) auto_comment = result.scalar_one_or_none() if not auto_comment: return False # Обновляем поле if hasattr(auto_comment, field): setattr(auto_comment, field, value) auto_comment.updated_by = updated_by auto_comment.updated_at = datetime.now(timezone.utc) await session.commit() logger.info( f"Поле '{field}' автокомментария для канала {channel_id} обновлено", log_type="DATABASE" ) return True else: logger.error( f"Поле '{field}' не существует в AutoComment", log_type="DATABASE" ) return False except Exception as e: logger.error( f"Ошибка обновления поля автокомментария: {e}", log_type="DATABASE" ) return False async def toggle_auto_comment( self, channel_id: int, is_enabled: bool, updated_by: Optional[int] = None ) -> bool: """ Включает/выключает автокомментарии для канала. Args: channel_id: ID канала is_enabled: True - включить, False - выключить updated_by: ID админа Returns: bool: True если успешно """ try: async with self.db.get_session() as session: result = await session.execute( select(AutoComment).where(AutoComment.channel_id == channel_id) ) auto_comment = result.scalar_one_or_none() if not auto_comment: return False auto_comment.is_enabled = is_enabled auto_comment.updated_by = updated_by auto_comment.updated_at = datetime.now(timezone.utc) await session.commit() logger.info( f"Автокомментарии для канала {channel_id} {'включены' if is_enabled else 'выключены'}", log_type="DATABASE" ) return True except Exception as e: logger.error( f"Ошибка переключения автокомментария: {e}", log_type="DATABASE" ) return False async def get_all_auto_comments(self) -> list['AutoComment']: """ Получает все настройки автокомментариев. Returns: List[AutoComment]: Список всех автокомментариев """ try: async with self.db.get_session() as session: result = await session.execute(select(AutoComment)) return list(result.scalars().all()) except Exception as e: logger.error( f"Ошибка получения всех автокомментариев: {e}", log_type="DATABASE" ) return [] async def delete_auto_comment(self, channel_id: int) -> bool: """ Удаляет настройки автокомментариев для канала. Args: channel_id: ID канала Returns: bool: True если удалено """ try: async with self.db.get_session() as session: result = await session.execute( delete(AutoComment).where(AutoComment.channel_id == channel_id) ) await session.commit() deleted = result.rowcount > 0 if deleted: logger.info( f"Автокомментарий для канала {channel_id} удалён", log_type="DATABASE" ) return deleted except Exception as e: logger.error( f"Ошибка удаления автокомментария: {e}", log_type="DATABASE" ) return False # === REPORTS === async def log_report( self, report_id: str, reporter_id: int, reporter_username: Optional[str], reported_user_id: int, reported_username: Optional[str], chat_id: int, chat_title: Optional[str], message_id: int, message_thread_id: Optional[int], message_text: Optional[str], reason: str ) -> bool: """ Сохраняет репорт в БД. Args: report_id: Уникальный ID репорта reporter_id: ID жалобщика reporter_username: Username жалобщика reported_user_id: ID нарушителя reported_username: Username нарушителя chat_id: ID чата chat_title: Название чата message_id: ID сообщения message_thread_id: ID топика message_text: Текст сообщения reason: Причина жалобы Returns: bool: True если успешно """ try: from .models import Report # Импорт здесь, чтобы избежать циклических импортов async with self.db.get_session() as session: report = Report( report_id=report_id, reporter_id=reporter_id, reporter_username=reporter_username, reported_user_id=reported_user_id, reported_username=reported_username, chat_id=chat_id, chat_title=chat_title, message_id=message_id, message_thread_id=message_thread_id, message_text=message_text[:500] if message_text else None, reason=reason ) session.add(report) await session.commit() logger.info( f"Репорт #{report_id} сохранён в БД", log_type="DATABASE" ) return True except Exception as e: logger.error( f"Ошибка сохранения репорта: {e}", log_type="DATABASE" ) return False async def update_report_status( self, report_id: str, status: str, processed_by: int ) -> bool: """ Обновляет статус репорта. Args: report_id: ID репорта status: Новый статус (closed, banned, deleted) processed_by: ID админа Returns: bool: True если успешно """ try: from .models import Report async with self.db.get_session() as session: result = await session.execute( select(Report).where(Report.report_id == report_id) ) report = result.scalar_one_or_none() if not report: return False report.status = status report.processed_by = processed_by report.processed_at = datetime.now(timezone.utc) await session.commit() logger.info( f"Репорт #{report_id} обновлён: статус={status}", log_type="DATABASE" ) return True except Exception as e: logger.error( f"Ошибка обновления статуса репорта: {e}", log_type="DATABASE" ) return False async def get_report_stats(self) -> dict: """ Получает общую статистику по репортам. Returns: dict: Статистика """ try: from .models import Report async with self.db.get_session() as session: # Всего репортов total_reports = await session.execute( select(func.count(Report.id)) ) # По статусам pending_reports = await session.execute( select(func.count(Report.id)).where(Report.status == "pending") ) closed_reports = await session.execute( select(func.count(Report.id)).where(Report.status == "closed") ) banned_reports = await session.execute( select(func.count(Report.id)).where(Report.status == "banned") ) deleted_reports = await session.execute( select(func.count(Report.id)).where(Report.status == "deleted") ) return { 'total': total_reports.scalar_one(), 'pending': pending_reports.scalar_one(), 'closed': closed_reports.scalar_one(), 'banned': banned_reports.scalar_one(), 'deleted': deleted_reports.scalar_one(), } except Exception as e: logger.error( f"Ошибка получения статистики репортов: {e}", log_type="DATABASE" ) return {} async def get_top_reporters(self, limit: int = 10) -> List[tuple[int, str, int]]: """ Получает топ жалобщиков. Args: limit: Количество записей Returns: List[tuple[int, str, int]]: [(user_id, username, count), ...] """ try: from .models import Report async with self.db.get_session() as session: result = await session.execute( select( Report.reporter_id, Report.reporter_username, func.count(Report.id).label('count') ) .group_by(Report.reporter_id, Report.reporter_username) .order_by(func.count(Report.id).desc()) .limit(limit) ) return [ (row.reporter_id, row.reporter_username or f"id{row.reporter_id}", row.count) for row in result ] except Exception as e: logger.error( f"Ошибка получения топ жалобщиков: {e}", log_type="DATABASE" ) return [] async def get_top_reported_users(self, limit: int = 10) -> List[tuple[int, str, int]]: """ Получает топ нарушителей. Args: limit: Количество записей Returns: List[tuple[int, str, int]]: [(user_id, username, count), ...] """ try: from .models import Report async with self.db.get_session() as session: result = await session.execute( select( Report.reported_user_id, Report.reported_username, func.count(Report.id).label('count') ) .group_by(Report.reported_user_id, Report.reported_username) .order_by(func.count(Report.id).desc()) .limit(limit) ) return [ (row.reported_user_id, row.reported_username or f"id{row.reported_user_id}", row.count) for row in result ] except Exception as e: logger.error( f"Ошибка получения топ нарушителей: {e}", log_type="DATABASE" ) return [] async def get_recent_reports(self, limit: int = 20) -> List: """ Получает последние репорты. Args: limit: Количество записей Returns: List[Report]: Список репортов """ try: from .models import Report async with self.db.get_session() as session: result = await session.execute( select(Report) .order_by(Report.created_at.desc()) .limit(limit) ) return list(result.scalars().all()) except Exception as e: logger.error( f"Ошибка получения последних репортов: {e}", log_type="DATABASE" ) return [] async def get_user_report_count(self, user_id: int, as_reporter: bool = True) -> int: """ Получает количество репортов пользователя. Args: user_id: ID пользователя as_reporter: True - как жалобщик, False - как нарушитель Returns: int: Количество репортов """ try: from .models import Report async with self.db.get_session() as session: if as_reporter: result = await session.execute( select(func.count(Report.id)).where( Report.reporter_id == user_id ) ) else: result = await session.execute( select(func.count(Report.id)).where( Report.reported_user_id == user_id ) ) return result.scalar_one() except Exception as e: logger.error( f"Ошибка подсчёта репортов пользователя: {e}", log_type="DATABASE" ) return 0 async def get_setting(self, key: str) -> Optional[str]: """Получает значение настройки""" async with self.db.get_session() as session: result = await session.get(Setting, key) return result.value if result else None async def set_setting(self, key: str, value: str) -> bool: """Устанавливает значение настройки""" async with self.db.get_session() as session: try: setting = await session.get(Setting, key) if setting: setting.value = value setting.updated_at = datetime.now() else: setting = Setting(key=key, value=value) session.add(setting) await session.commit() return True except Exception as e: await session.rollback() logger.error(f"set_setting {key} failed: {e}", log_type="DATABASE") return False async def delete_setting(self, key: str) -> bool: """Удаляет настройку""" async with self.db.get_session() as session: try: result = await session.execute(delete(Setting).where(Setting.key == key)) await session.commit() return result.rowcount > 0 except Exception as e: await session.rollback() logger.error(f"delete_setting {key} failed: {e}", log_type="DATABASE") return False