Files
PrimoGuardBot/database/repository.py

1392 lines
47 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Repository для работы с банвордами через SQLAlchemy ORM.
"""
from typing import Set, List, Optional
from datetime import datetime, timedelta, timezone
from sqlalchemy import select, delete, func, and_
from middleware.loggers import logger
from .database import Database
from .models import (
BanWord,
TempBanWord,
WhitelistWord,
Admin,
Setting,
SpamStat,
BanWordType,
AutoComment
)
__all__ = ("BanWordsRepository",)
class BanWordsRepository:
"""
Repository для CRUD операций с банвордами.
Все методы работают через SQLAlchemy ORM.
"""
def __init__(self, db: Database):
"""
Args:
db: Экземпляр Database
"""
self.db = db
# === BANWORDS ===
async def add_banword(
self,
word: str,
word_type: BanWordType,
added_by: Optional[int] = None,
reason: Optional[str] = None
) -> bool:
"""
Добавляет постоянный банворд.
Args:
word: Слово для блокировки
word_type: Тип банворда
added_by: ID админа, который добавил
reason: Причина добавления
Returns:
bool: True если добавлен, False если уже существует
"""
try:
async with self.db.get_session() as session:
# Проверяем, существует ли уже
existing = await session.execute(
select(BanWord).where(
and_(
BanWord.word == word.lower(),
BanWord.type == word_type
)
)
)
if existing.scalar_one_or_none():
return False
# Добавляем новый
banword = BanWord(
word=word.lower(),
type=word_type,
added_by=added_by,
reason=reason
)
session.add(banword)
await session.commit()
logger.info(
f"Добавлен банворд: '{word}' ({word_type.value})",
log_type="DATABASE"
)
return True
except Exception as e:
logger.error(
f"Ошибка добавления банворда: {e}",
log_type="DATABASE"
)
return False
async def remove_banword(self, word: str, word_type: BanWordType) -> bool:
"""
Удаляет банворд.
Args:
word: Слово
word_type: Тип
Returns:
bool: True если удалён
"""
try:
async with self.db.get_session() as session:
result = await session.execute(
delete(BanWord).where(
and_(
BanWord.word == word.lower(),
BanWord.type == word_type
)
)
)
await session.commit()
deleted = result.rowcount > 0
if deleted:
logger.info(
f"Удалён банворд: '{word}' ({word_type.value})",
log_type="DATABASE"
)
return deleted
except Exception as e:
logger.error(
f"Ошибка удаления банворда: {e}",
log_type="DATABASE"
)
return False
async def get_banwords(self, word_type: BanWordType) -> Set[str]:
"""
Получает все банворды определённого типа.
Args:
word_type: Тип банвордов
Returns:
Set[str]: Набор слов
"""
try:
async with self.db.get_session() as session:
result = await session.execute(
select(BanWord.word).where(BanWord.type == word_type)
)
return set(result.scalars().all())
except Exception as e:
logger.error(
f"Ошибка получения банвордов: {e}",
log_type="DATABASE"
)
return set()
async def get_all_banwords(self) -> dict[BanWordType, Set[str]]:
result = {
BanWordType.WORD: set(),
BanWordType.LEMMA: set(),
BanWordType.PART: set(),
BanWordType.CONFLICT_WORD: set(),
BanWordType.CONFLICT_LEMMA: set(),
BanWordType.CONFLICT_PART: set(),
}
try:
async with self.db.get_session() as session:
banwords = await session.execute(select(BanWord))
loaded = 0
for banword in banwords.scalars():
try:
word_type = (
banword.type
if isinstance(banword.type, BanWordType)
else BanWordType(banword.type.casefold())
)
if word_type in result:
result[word_type].add(banword.word)
loaded += 1
except ValueError:
logger.warning(
f"Неизвестный тип: '{banword.type}' для '{banword.word}'",
log_type="DATABASE"
)
logger.info(f"✅ Кэш загружен: {loaded} банвордов", log_type="DATABASE")
except Exception as e:
logger.error(f"❌ get_all_banwords: {e}", log_type="DATABASE")
return result
async def search_banwords(self, query: str, limit: int = 50) -> List[BanWord]:
"""
Поиск банвордов по частичному совпадению.
Args:
query: Поисковый запрос
limit: Максимум результатов
Returns:
List[BanWord]: Найденные банворды
"""
try:
async with self.db.get_session() as session:
result = await session.execute(
select(BanWord)
.where(BanWord.word.contains(query.lower()))
.limit(limit)
)
return list(result.scalars().all())
except Exception as e:
logger.error(
f"Ошибка поиска банвордов: {e}",
log_type="DATABASE"
)
return []
# === TEMPORARY BANWORDS ===
async def add_temp_banword(
self,
word: str,
word_type: BanWordType,
minutes: int,
added_by: Optional[int] = None
) -> bool:
"""
Добавляет временный банворд.
Args:
word: Слово
word_type: Тип
minutes: Длительность в минутах
added_by: ID админа
Returns:
bool: True если добавлен
"""
try:
async with self.db.get_session() as session:
# Проверяем существование
existing = await session.execute(
select(TempBanWord).where(
and_(
TempBanWord.word == word.lower(),
TempBanWord.type == word_type
)
)
)
if existing.scalar_one_or_none():
return False
# Добавляем
expires_at = datetime.now() + timedelta(minutes=minutes)
temp_banword = TempBanWord(
word=word.lower(),
type=word_type,
added_by=added_by,
expires_at=expires_at
)
session.add(temp_banword)
await session.commit()
logger.info(
f"Добавлен временный банворд: '{word}' на {minutes} мин",
log_type="DATABASE"
)
return True
except Exception as e:
logger.error(
f"Ошибка добавления временного банворда: {e}",
log_type="DATABASE"
)
return False
async def remove_temp_banword(self, word: str, word_type: BanWordType) -> bool:
"""Удаляет временный банворд досрочно"""
try:
async with self.db.get_session() as session:
result = await session.execute(
delete(TempBanWord).where(
and_(
TempBanWord.word == word.lower(),
TempBanWord.type == word_type
)
)
)
await session.commit()
deleted = result.rowcount > 0
if deleted:
logger.info(
f"Удалён временный банворд: '{word}'",
log_type="DATABASE"
)
return deleted
except Exception as e:
logger.error(
f"Ошибка удаления временного банворда: {e}",
log_type="DATABASE"
)
return False
async def get_temp_banwords(self, word_type: BanWordType) -> Set[str]:
"""
Получает активные (не истёкшие) временные банворды.
Args:
word_type: Тип банвордов
Returns:
Set[str]: Набор активных временных слов
"""
try:
async with self.db.get_session() as session:
result = await session.execute(
select(TempBanWord.word).where(
and_(
TempBanWord.type == word_type,
TempBanWord.expires_at > datetime.now()
)
)
)
return set(result.scalars().all())
except Exception as e:
logger.error(
f"Ошибка получения временных банвордов: {e}",
log_type="DATABASE"
)
return set()
async def get_all_temp_banwords(self) -> dict[BanWordType, Set[str]]:
"""Получает все активные временные банворды по типам"""
result = {
BanWordType.WORD: set(),
BanWordType.LEMMA: set(),
}
try:
async with self.db.get_session() as session:
temp_banwords = await session.execute(
select(TempBanWord).where(
TempBanWord.expires_at > datetime.now()
)
)
for temp_banword in temp_banwords.scalars():
word_type = (
temp_banword.type
if isinstance(temp_banword.type, BanWordType)
else BanWordType(temp_banword.type.casefold())
)
if word_type in result:
result[word_type].add(temp_banword.word)
except Exception as e:
logger.error(
f"Ошибка получения всех временных банвордов: {e}",
log_type="DATABASE"
)
return result
async def cleanup_expired_temp_banwords(self) -> int:
"""
Удаляет истёкшие временные банворды.
Returns:
int: Количество удалённых записей
"""
try:
async with self.db.get_session() as session:
result = await session.execute(
delete(TempBanWord).where(
TempBanWord.expires_at <= datetime.now()
)
)
await session.commit()
deleted = result.rowcount
if deleted > 0:
logger.info(
f"Удалено {deleted} истёкших временных банвордов",
log_type="DATABASE"
)
return deleted
except Exception as e:
logger.error(
f"Ошибка очистки временных банвордов: {e}",
log_type="DATABASE"
)
return 0
# === WHITELIST ===
async def add_whitelist(
self,
word: str,
added_by: Optional[int] = None,
reason: Optional[str] = None
) -> bool:
"""Добавляет слово в белый список (исключение)"""
try:
async with self.db.get_session() as session:
# Проверяем существование
existing = await session.execute(
select(WhitelistWord).where(
WhitelistWord.word == word.lower()
)
)
if existing.scalar_one_or_none():
return False
# Добавляем
whitelist_word = WhitelistWord(
word=word.lower(),
added_by=added_by,
reason=reason
)
session.add(whitelist_word)
await session.commit()
logger.info(
f"Добавлено исключение: '{word}'",
log_type="DATABASE"
)
return True
except Exception as e:
logger.error(
f"Ошибка добавления исключения: {e}",
log_type="DATABASE"
)
return False
async def remove_whitelist(self, word: str) -> bool:
"""Удаляет слово из белого списка"""
try:
async with self.db.get_session() as session:
result = await session.execute(
delete(WhitelistWord).where(
WhitelistWord.word == word.lower()
)
)
await session.commit()
deleted = result.rowcount > 0
if deleted:
logger.info(
f"Удалено исключение: '{word}'",
log_type="DATABASE"
)
return deleted
except Exception as e:
logger.error(
f"Ошибка удаления исключения: {e}",
log_type="DATABASE"
)
return False
async def get_whitelist(self) -> Set[str]:
"""Получает все слова из белого списка"""
try:
async with self.db.get_session() as session:
result = await session.execute(select(WhitelistWord.word))
return set(result.scalars().all())
except Exception as e:
logger.error(
f"Ошибка получения whitelist: {e}",
log_type="DATABASE"
)
return set()
# === ADMINS ===
async def add_admin(
self,
user_id: int,
added_by: Optional[int] = None
) -> bool:
"""Добавляет администратора"""
try:
async with self.db.get_session() as session:
# Проверяем существование
existing = await session.execute(
select(Admin).where(Admin.user_id == user_id)
)
if existing.scalar_one_or_none():
return False
# Добавляем
admin = Admin(user_id=user_id, added_by=added_by)
session.add(admin)
await session.commit()
logger.info(
f"Добавлен админ: {user_id}",
log_type="DATABASE"
)
return True
except Exception as e:
logger.error(
f"Ошибка добавления админа: {e}",
log_type="DATABASE"
)
return False
async def remove_admin(self, user_id: int) -> bool:
"""Удаляет администратора"""
try:
async with self.db.get_session() as session:
result = await session.execute(
delete(Admin).where(Admin.user_id == user_id)
)
await session.commit()
deleted = result.rowcount > 0
if deleted:
logger.info(
f"Удалён админ: {user_id}",
log_type="DATABASE"
)
return deleted
except Exception as e:
logger.error(
f"Ошибка удаления админа: {e}",
log_type="DATABASE"
)
return False
async def get_admins(self) -> Set[int]:
"""Получает всех администраторов"""
try:
async with self.db.get_session() as session:
result = await session.execute(select(Admin.user_id))
return set(result.scalars().all())
except Exception as e:
logger.error(
f"Ошибка получения админов: {e}",
log_type="DATABASE"
)
return set()
async def is_admin(self, user_id: int) -> bool:
"""Проверяет, является ли пользователь админом"""
try:
async with self.db.get_session() as session:
result = await session.execute(
select(Admin).where(Admin.user_id == user_id)
)
return result.scalar_one_or_none() is not None
except Exception as e:
logger.error(
f"Ошибка проверки админа: {e}",
log_type="DATABASE"
)
return False
# === SETTINGS ===
async def set_setting(self, key: str, value: str) -> None:
"""
Сохраняет настройку (или обновляет существующую).
Args:
key: Ключ настройки
value: Значение (строка или JSON)
"""
try:
async with self.db.get_session() as session:
# Проверяем существование
existing = await session.execute(
select(Setting).where(Setting.key == key)
)
setting = existing.scalar_one_or_none()
if setting:
# Обновляем существующую
setting.value = value
setting.updated_at = datetime.now()
else:
# Создаём новую
setting = Setting(key=key, value=value)
session.add(setting)
await session.commit()
except Exception as e:
logger.error(
f"Ошибка сохранения настройки: {e}",
log_type="DATABASE"
)
async def get_setting(
self,
key: str,
default: Optional[str] = None
) -> Optional[str]:
"""
Получает значение настройки.
Args:
key: Ключ настройки
default: Значение по умолчанию
Returns:
Optional[str]: Значение или default
"""
try:
async with self.db.get_session() as session:
result = await session.execute(
select(Setting.value).where(Setting.key == key)
)
value = result.scalar_one_or_none()
return value if value is not None else default
except Exception as e:
logger.error(
f"Ошибка получения настройки: {e}",
log_type="DATABASE"
)
return default
async def delete_setting(self, key: str) -> bool:
"""Удаляет настройку"""
try:
async with self.db.get_session() as session:
result = await session.execute(
delete(Setting).where(Setting.key == key)
)
await session.commit()
return result.rowcount > 0
except Exception as e:
logger.error(
f"Ошибка удаления настройки: {e}",
log_type="DATABASE"
)
return False
# === STATISTICS ===
async def log_spam_deletion(
self,
user_id: int,
username: str,
chat_id: int,
message_text: str,
matched_word: str,
match_type: str
) -> None:
"""
Записывает статистику удалённого спам-сообщения.
Args:
user_id: Telegram ID отправителя
username: Username отправителя
chat_id: ID чата
message_text: Текст сообщения (обрезается до 500 символов)
matched_word: Слово, по которому сработал фильтр
match_type: Тип проверки (substring/lemma/part/silence/conflict)
"""
try:
async with self.db.get_session() as session:
spam_stat = SpamStat(
user_id=user_id,
username=username,
chat_id=chat_id,
message_text=message_text[:500],
matched_word=matched_word,
match_type=match_type
)
session.add(spam_stat)
await session.commit()
except Exception as e:
logger.error(
f"Ошибка логирования статистики: {e}",
log_type="DATABASE"
)
async def get_spam_stats(
self,
limit: int = 100,
user_id: Optional[int] = None
) -> List[SpamStat]:
"""
Получает последнюю статистику удалений.
Args:
limit: Максимум записей
user_id: Фильтр по пользователю (опционально)
Returns:
List[SpamStat]: Список записей статистики
"""
try:
async with self.db.get_session() as session:
query = select(SpamStat).order_by(SpamStat.deleted_at.desc())
if user_id:
query = query.where(SpamStat.user_id == user_id)
query = query.limit(limit)
result = await session.execute(query)
return list(result.scalars().all())
except Exception as e:
logger.error(
f"Ошибка получения статистики: {e}",
log_type="DATABASE"
)
return []
async def get_user_spam_count(self, user_id: int) -> int:
"""Получает количество удалённых сообщений пользователя"""
try:
async with self.db.get_session() as session:
result = await session.execute(
select(func.count(SpamStat.id)).where(
SpamStat.user_id == user_id
)
)
return result.scalar_one()
except Exception as e:
logger.error(
f"Ошибка подсчёта спама: {e}",
log_type="DATABASE"
)
return 0
async def get_top_spammers(self, limit: int = 10) -> List[tuple[int, int]]:
"""
Получает топ спамеров.
Args:
limit: Количество записей
Returns:
List[tuple[int, int]]: [(user_id, count), ...]
"""
try:
async with self.db.get_session() as session:
result = await session.execute(
select(
SpamStat.user_id,
func.count(SpamStat.id).label('count')
)
.group_by(SpamStat.user_id)
.order_by(func.count(SpamStat.id).desc())
.limit(limit)
)
return [(row.user_id, row.count) for row in result]
except Exception as e:
logger.error(
f"Ошибка получения топ спамеров: {e}",
log_type="DATABASE"
)
return []
# === GENERAL ===
async def get_stats(self) -> dict:
"""Получает общую статистику БД"""
try:
async with self.db.get_session() as session:
banwords_count = await session.execute(
select(func.count(BanWord.id))
)
temp_banwords_count = await session.execute(
select(func.count(TempBanWord.id))
)
whitelist_count = await session.execute(
select(func.count(WhitelistWord.id))
)
admins_count = await session.execute(
select(func.count(Admin.id))
)
spam_stats_count = await session.execute(
select(func.count(SpamStat.id))
)
return {
'banwords': banwords_count.scalar_one(),
'temp_banwords': temp_banwords_count.scalar_one(),
'whitelist': whitelist_count.scalar_one(),
'admins': admins_count.scalar_one(),
'spam_deletions': spam_stats_count.scalar_one(),
}
except Exception as e:
logger.error(
f"Ошибка получения статистики: {e}",
log_type="DATABASE"
)
return {}
# === AUTO COMMENTS ===
async def get_auto_comment(self, channel_id: int) -> Optional['AutoComment']:
"""
Получает настройки автокомментариев для канала.
Args:
channel_id: ID канала
Returns:
AutoComment или None
"""
try:
async with self.db.get_session() as session:
result = await session.execute(
select(AutoComment).where(AutoComment.channel_id == channel_id)
)
return result.scalar_one_or_none()
except Exception as e:
logger.error(
f"Ошибка получения автокомментария: {e}",
log_type="DATABASE"
)
return None
async def set_auto_comment(
self,
channel_id: int,
text: str,
button_text: str,
button_url: str,
photo_url: str,
updated_by: Optional[int] = None,
is_enabled: bool = True
) -> bool:
"""
Сохраняет или обновляет настройки автокомментариев.
Args:
channel_id: ID канала
text: Текст комментария
button_text: Текст кнопки
button_url: URL кнопки
photo_url: URL фото
updated_by: ID админа
is_enabled: Включены ли комментарии
Returns:
bool: True если успешно
"""
try:
async with self.db.get_session() as session:
# Проверяем существование
result = await session.execute(
select(AutoComment).where(AutoComment.channel_id == channel_id)
)
auto_comment = result.scalar_one_or_none()
if auto_comment:
# Обновляем существующую
auto_comment.text = text
auto_comment.button_text = button_text
auto_comment.button_url = button_url
auto_comment.photo_url = photo_url
auto_comment.is_enabled = is_enabled
auto_comment.updated_by = updated_by
auto_comment.updated_at = datetime.now(timezone.utc)
else:
# Создаём новую
auto_comment = AutoComment(
channel_id=channel_id,
text=text,
button_text=button_text,
button_url=button_url,
photo_url=photo_url,
is_enabled=is_enabled,
updated_by=updated_by
)
session.add(auto_comment)
await session.commit()
logger.info(
f"Автокомментарий для канала {channel_id} обновлён",
log_type="DATABASE"
)
return True
except Exception as e:
logger.error(
f"Ошибка сохранения автокомментария: {e}",
log_type="DATABASE"
)
return False
async def update_auto_comment_field(
self,
channel_id: int,
field: str,
value: str,
updated_by: Optional[int] = None
) -> bool:
"""
Обновляет одно поле автокомментария.
Args:
channel_id: ID канала
field: Имя поля (text, button_text, button_url, photo_url)
value: Новое значение
updated_by: ID админа
Returns:
bool: True если успешно
"""
try:
async with self.db.get_session() as session:
result = await session.execute(
select(AutoComment).where(AutoComment.channel_id == channel_id)
)
auto_comment = result.scalar_one_or_none()
if not auto_comment:
return False
# Обновляем поле
if hasattr(auto_comment, field):
setattr(auto_comment, field, value)
auto_comment.updated_by = updated_by
auto_comment.updated_at = datetime.now(timezone.utc)
await session.commit()
logger.info(
f"Поле '{field}' автокомментария для канала {channel_id} обновлено",
log_type="DATABASE"
)
return True
else:
logger.error(
f"Поле '{field}' не существует в AutoComment",
log_type="DATABASE"
)
return False
except Exception as e:
logger.error(
f"Ошибка обновления поля автокомментария: {e}",
log_type="DATABASE"
)
return False
async def toggle_auto_comment(
self,
channel_id: int,
is_enabled: bool,
updated_by: Optional[int] = None
) -> bool:
"""
Включает/выключает автокомментарии для канала.
Args:
channel_id: ID канала
is_enabled: True - включить, False - выключить
updated_by: ID админа
Returns:
bool: True если успешно
"""
try:
async with self.db.get_session() as session:
result = await session.execute(
select(AutoComment).where(AutoComment.channel_id == channel_id)
)
auto_comment = result.scalar_one_or_none()
if not auto_comment:
return False
auto_comment.is_enabled = is_enabled
auto_comment.updated_by = updated_by
auto_comment.updated_at = datetime.now(timezone.utc)
await session.commit()
logger.info(
f"Автокомментарии для канала {channel_id} {'включены' if is_enabled else 'выключены'}",
log_type="DATABASE"
)
return True
except Exception as e:
logger.error(
f"Ошибка переключения автокомментария: {e}",
log_type="DATABASE"
)
return False
async def get_all_auto_comments(self) -> list['AutoComment']:
"""
Получает все настройки автокомментариев.
Returns:
List[AutoComment]: Список всех автокомментариев
"""
try:
async with self.db.get_session() as session:
result = await session.execute(select(AutoComment))
return list(result.scalars().all())
except Exception as e:
logger.error(
f"Ошибка получения всех автокомментариев: {e}",
log_type="DATABASE"
)
return []
async def delete_auto_comment(self, channel_id: int) -> bool:
"""
Удаляет настройки автокомментариев для канала.
Args:
channel_id: ID канала
Returns:
bool: True если удалено
"""
try:
async with self.db.get_session() as session:
result = await session.execute(
delete(AutoComment).where(AutoComment.channel_id == channel_id)
)
await session.commit()
deleted = result.rowcount > 0
if deleted:
logger.info(
f"Автокомментарий для канала {channel_id} удалён",
log_type="DATABASE"
)
return deleted
except Exception as e:
logger.error(
f"Ошибка удаления автокомментария: {e}",
log_type="DATABASE"
)
return False
# === REPORTS ===
async def log_report(
self,
report_id: str,
reporter_id: int,
reporter_username: Optional[str],
reported_user_id: int,
reported_username: Optional[str],
chat_id: int,
chat_title: Optional[str],
message_id: int,
message_thread_id: Optional[int],
message_text: Optional[str],
reason: str
) -> bool:
"""
Сохраняет репорт в БД.
Args:
report_id: Уникальный ID репорта
reporter_id: ID жалобщика
reporter_username: Username жалобщика
reported_user_id: ID нарушителя
reported_username: Username нарушителя
chat_id: ID чата
chat_title: Название чата
message_id: ID сообщения
message_thread_id: ID топика
message_text: Текст сообщения
reason: Причина жалобы
Returns:
bool: True если успешно
"""
try:
from .models import Report # Импорт здесь, чтобы избежать циклических импортов
async with self.db.get_session() as session:
report = Report(
report_id=report_id,
reporter_id=reporter_id,
reporter_username=reporter_username,
reported_user_id=reported_user_id,
reported_username=reported_username,
chat_id=chat_id,
chat_title=chat_title,
message_id=message_id,
message_thread_id=message_thread_id,
message_text=message_text[:500] if message_text else None,
reason=reason
)
session.add(report)
await session.commit()
logger.info(
f"Репорт #{report_id} сохранён в БД",
log_type="DATABASE"
)
return True
except Exception as e:
logger.error(
f"Ошибка сохранения репорта: {e}",
log_type="DATABASE"
)
return False
async def update_report_status(
self,
report_id: str,
status: str,
processed_by: int
) -> bool:
"""
Обновляет статус репорта.
Args:
report_id: ID репорта
status: Новый статус (closed, banned, deleted)
processed_by: ID админа
Returns:
bool: True если успешно
"""
try:
from .models import Report
async with self.db.get_session() as session:
result = await session.execute(
select(Report).where(Report.report_id == report_id)
)
report = result.scalar_one_or_none()
if not report:
return False
report.status = status
report.processed_by = processed_by
report.processed_at = datetime.now(timezone.utc)
await session.commit()
logger.info(
f"Репорт #{report_id} обновлён: статус={status}",
log_type="DATABASE"
)
return True
except Exception as e:
logger.error(
f"Ошибка обновления статуса репорта: {e}",
log_type="DATABASE"
)
return False
async def get_report_stats(self) -> dict:
"""
Получает общую статистику по репортам.
Returns:
dict: Статистика
"""
try:
from .models import Report
async with self.db.get_session() as session:
# Всего репортов
total_reports = await session.execute(
select(func.count(Report.id))
)
# По статусам
pending_reports = await session.execute(
select(func.count(Report.id)).where(Report.status == "pending")
)
closed_reports = await session.execute(
select(func.count(Report.id)).where(Report.status == "closed")
)
banned_reports = await session.execute(
select(func.count(Report.id)).where(Report.status == "banned")
)
deleted_reports = await session.execute(
select(func.count(Report.id)).where(Report.status == "deleted")
)
return {
'total': total_reports.scalar_one(),
'pending': pending_reports.scalar_one(),
'closed': closed_reports.scalar_one(),
'banned': banned_reports.scalar_one(),
'deleted': deleted_reports.scalar_one(),
}
except Exception as e:
logger.error(
f"Ошибка получения статистики репортов: {e}",
log_type="DATABASE"
)
return {}
async def get_top_reporters(self, limit: int = 10) -> List[tuple[int, str, int]]:
"""
Получает топ жалобщиков.
Args:
limit: Количество записей
Returns:
List[tuple[int, str, int]]: [(user_id, username, count), ...]
"""
try:
from .models import Report
async with self.db.get_session() as session:
result = await session.execute(
select(
Report.reporter_id,
Report.reporter_username,
func.count(Report.id).label('count')
)
.group_by(Report.reporter_id, Report.reporter_username)
.order_by(func.count(Report.id).desc())
.limit(limit)
)
return [
(row.reporter_id, row.reporter_username or f"id{row.reporter_id}", row.count)
for row in result
]
except Exception as e:
logger.error(
f"Ошибка получения топ жалобщиков: {e}",
log_type="DATABASE"
)
return []
async def get_top_reported_users(self, limit: int = 10) -> List[tuple[int, str, int]]:
"""
Получает топ нарушителей.
Args:
limit: Количество записей
Returns:
List[tuple[int, str, int]]: [(user_id, username, count), ...]
"""
try:
from .models import Report
async with self.db.get_session() as session:
result = await session.execute(
select(
Report.reported_user_id,
Report.reported_username,
func.count(Report.id).label('count')
)
.group_by(Report.reported_user_id, Report.reported_username)
.order_by(func.count(Report.id).desc())
.limit(limit)
)
return [
(row.reported_user_id, row.reported_username or f"id{row.reported_user_id}", row.count)
for row in result
]
except Exception as e:
logger.error(
f"Ошибка получения топ нарушителей: {e}",
log_type="DATABASE"
)
return []
async def get_recent_reports(self, limit: int = 20) -> List:
"""
Получает последние репорты.
Args:
limit: Количество записей
Returns:
List[Report]: Список репортов
"""
try:
from .models import Report
async with self.db.get_session() as session:
result = await session.execute(
select(Report)
.order_by(Report.created_at.desc())
.limit(limit)
)
return list(result.scalars().all())
except Exception as e:
logger.error(
f"Ошибка получения последних репортов: {e}",
log_type="DATABASE"
)
return []
async def get_user_report_count(self, user_id: int, as_reporter: bool = True) -> int:
"""
Получает количество репортов пользователя.
Args:
user_id: ID пользователя
as_reporter: True - как жалобщик, False - как нарушитель
Returns:
int: Количество репортов
"""
try:
from .models import Report
async with self.db.get_session() as session:
if as_reporter:
result = await session.execute(
select(func.count(Report.id)).where(
Report.reporter_id == user_id
)
)
else:
result = await session.execute(
select(func.count(Report.id)).where(
Report.reported_user_id == user_id
)
)
return result.scalar_one()
except Exception as e:
logger.error(
f"Ошибка подсчёта репортов пользователя: {e}",
log_type="DATABASE"
)
return 0
async def get_setting(self, key: str) -> Optional[str]:
"""Получает значение настройки"""
async with self.db.get_session() as session:
result = await session.get(Setting, key)
return result.value if result else None
async def set_setting(self, key: str, value: str) -> bool:
"""Устанавливает значение настройки"""
async with self.db.get_session() as session:
try:
setting = await session.get(Setting, key)
if setting:
setting.value = value
setting.updated_at = datetime.now()
else:
setting = Setting(key=key, value=value)
session.add(setting)
await session.commit()
return True
except Exception as e:
await session.rollback()
logger.error(f"set_setting {key} failed: {e}", log_type="DATABASE")
return False
async def delete_setting(self, key: str) -> bool:
"""Удаляет настройку"""
async with self.db.get_session() as session:
try:
result = await session.execute(delete(Setting).where(Setting.key == key))
await session.commit()
return result.rowcount > 0
except Exception as e:
await session.rollback()
logger.error(f"delete_setting {key} failed: {e}", log_type="DATABASE")
return False