This commit is contained in:
admin
2025-08-10 22:22:38 +07:00
parent 0b3b957c0a
commit 6073b4b3c9
52 changed files with 1981 additions and 810 deletions

3
bot/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
from .handlers import *
from .utils import *
from .bots import *

203
bot/bots.py Normal file
View File

@@ -0,0 +1,203 @@
from aiogram import Bot, Dispatcher
from aiogram.client.default import DefaultBotProperties
from aiogram.fsm.storage.memory import MemoryStorage
from aiogram.types import User, ChatAdministratorRights, BotDescription, BotShortDescription
from aiogram.utils.i18n import ConstI18nMiddleware, I18n
from loggers.logs import loggers
from configs.config import BotSettings, BotEdit, Webhook
from middleware.loggers import log
# Экспортируем объекты модуля
__all__ = ("dp", "bot", "BotInfo", "i18n",)
# Инициализация i18n
i18n: I18n = I18n(path="locales", default_locale="ru", domain="bot")
# Диспетчер бота, языковых настроек и его хранилища
storage: MemoryStorage = MemoryStorage()
dp: Dispatcher = Dispatcher(storage=storage)
dp.message.outer_middleware(ConstI18nMiddleware(locale='ru', i18n=i18n))
dp["is_active"]: bool = True
# Экземпляр бота с настройками по умолчанию
bot: Bot = Bot(token=BotSettings.BOT_TOKEN,
default=DefaultBotProperties(
parse_mode=BotSettings.PARSE_MODE,
disable_notification=BotSettings.DISABLE_NOTIFICATION,
protect_content=BotSettings.PROTECT_CONTENT,
allow_sending_without_reply=BotSettings.ALLOW_SENDING_WITHOUT_REPLY,
link_preview_is_disabled=BotSettings.LINK_PREVIEW_IS_DISABLED,
link_preview_prefer_small_media=BotSettings.LINK_PREVIEW_PREFER_SMALL_MEDIA,
link_preview_prefer_large_media=BotSettings.LINK_PREVIEW_PREFER_LARGE_MEDIA,
link_preview_show_above_text=BotSettings.LINK_PREVIEW_SHOW_ABOVE_TEXT,
show_caption_above_media=BotSettings.SHOW_CAPTION_ABOVE_MEDIA
)
)
class BotInfo:
"""Класс для хранения и инициализации данных бота."""
id: int = None
url: str = None
first_name: str = None
last_name: str = None
username: str = None
description: str = None
short_description: str = None
language_code: str = BotSettings.BOT_LANGUAGE
prefix: str = BotSettings.PREFIX
bot_owner: str = BotSettings.OWNER
added_to_attachment_menu: bool = False
supports_inline_queries: bool = False
can_connect_to_business: bool = False
has_main_web_app: bool = False
can_join_groups: bool = False
can_read_all_group_messages: bool = False
@classmethod
@log(level='INFO', log_type='BOT', text='Настройка вебхука бота')
async def webhook(cls, bots: Bot = bot, delete_webhook: bool = Webhook.WEBHOOK) -> None:
"""
Удаление или установка вебхука.
:param bots: Объект бота для управления.
:param delete_webhook: Статус удаления, поумолчанию (true).
"""
if delete_webhook:
await bots.delete_webhook()
@classmethod
@log(level='INFO', log_type='BOT', text='Получение информации о боте')
async def info(cls, bots: Bot = bot) -> dict:
"""
Получает и сохраняет информацию о боте.
:param bots: Объект бота для управления.
:return: Словарь с персональными данными о боте.
"""
bot_info: User = await bots.get_me()
cls.id = bot_info.id
cls.url = f'tg://user?id={cls.id}'
cls.first_name = bot_info.first_name
cls.last_name = bot_info.last_name
cls.username = bot_info.username
cls.description = getattr(bot_info, 'description', '')
cls.short_description = getattr(bot_info, 'short_description', '')
cls.language_code = bot_info.language_code
cls.is_premium = bot_info.is_premium
cls.added_to_attachment_menu = bot_info.added_to_attachment_menu
cls.supports_inline_queries = bot_info.supports_inline_queries
cls.can_connect_to_business = bot_info.can_connect_to_business
cls.has_main_web_app = bot_info.has_main_web_app
cls.can_join_groups = getattr(bot_info, 'can_join_groups', False)
cls.can_read_all_group_messages = getattr(bot_info, 'can_read_all_group_messages', False)
return {
'id': cls.id,
'url': cls.url,
'first_name': cls.first_name,
'last_name': cls.last_name,
'username': cls.username,
'description': cls.description,
'short_description': cls.short_description,
'language_code': cls.language_code,
'prefix': cls.prefix,
'bot_owner': cls.bot_owner,
'is_premium': cls.is_premium,
'added_to_attachment_menu': cls.added_to_attachment_menu,
'supports_inline_queries': cls.supports_inline_queries,
'can_connect_to_business': cls.can_connect_to_business,
'has_main_web_app': cls.has_main_web_app,
'can_join_groups': cls.can_join_groups,
'can_read_all_group_messages': cls.can_read_all_group_messages,
}
@staticmethod
@log(level='INFO', log_type='BOT', text='Установка прав администратора')
async def set_administrator_rights(bots: Bot = bot, rights: ChatAdministratorRights = BotEdit.RIGHTS) -> None:
"""
Устанавливает права администратора по умолчанию.
:param bots: Объект бота для управления.
:param rights: Заданные права администратора бота, по умолчанию словарь из конфигов.
"""
bot_rights: ChatAdministratorRights = await bots.get_my_default_administrator_rights()
if bot_rights != rights:
await bots.set_my_default_administrator_rights(rights)
@staticmethod
@log(level='INFO', log_type='BOT', text='Обновление имени бота')
async def set_name(bots: Bot = bot, new_name: str = BotEdit.NAME) -> None:
"""
Устанавливает имя бота из конфига.
:param bots: Объект бота для управления.
:param new_name: Новое имя бота, по умолчанию из конфигов.
"""
current_name: str = (await bots.get_me()).first_name
if not (1 <= len(new_name) <= 32):
raise ValueError("Имя бота должно быть от 1 до 32 символов.")
if current_name != new_name:
await bots.set_my_name(new_name)
@staticmethod
@log(level='INFO', log_type='BOT', text='Обновление описания бота')
async def set_description(bots: Bot = bot, new_description: str = BotEdit.DESCRIPTION) -> None:
"""
Устанавливает полное описание бота.
:param bots: Объект бота для управления.
:param new_description: Новое описание бота, по умолчанию из конфигов.
"""
current_description: BotDescription = await bots.get_my_description()
if not (0 < len(new_description) <= 255):
raise ValueError("Описание должно быть от 1 до 255 символов.")
if current_description != new_description:
await bots.set_my_description(description=new_description)
@staticmethod
@log(level='INFO', log_type='BOT', text='Обновление короткого описания бота')
async def set_short_description(bots: Bot = bot, new_short: str = BotEdit.SHORT_DESCRIPTION) -> None:
"""
Устанавливает короткое описание виджета.
:param bots: Объект бота для управления.
:param new_short: Новое короткое описание бота, по умолчанию из конфигов.
"""
current_short: BotShortDescription = await bots.get_my_short_description()
if not (0 < len(new_short) <= 512):
raise ValueError("Короткое описание должно быть от 1 до 512 символов.")
if current_short != new_short:
await bots.set_my_short_description(short_description=new_short)
@classmethod
@log(level='INFO', log_type='START', text=f'Процесс запуска бота!!!!!')
async def setup(cls, bots: Bot = bot):
"""
Выполняет полную настройку бота.
:param bots: Объект бота для управления.
"""
await cls.webhook(bots=bots)
await cls.info(bots=bots)
await cls.set_administrator_rights(bots=bots)
await cls.set_description(bots=bots)
await cls.set_short_description(bots=bots)
await cls.set_name(bots=bots)
loggers.info(text=f"Бот @{BotInfo.username} запущен!!!")

1
bot/core/__init__.py Normal file
View File

@@ -0,0 +1 @@
from .storage import *

236
bot/core/storage.py Normal file
View File

@@ -0,0 +1,236 @@
import json
from os import path, makedirs, listdir
from typing import Any, Dict, List, Optional
from configs.config import Project
from bot.loggers import logs
# Настройки экспорта
__all__ = ("storage", )
class PostStorage:
"""Класс для управления хранением постов и связанных уведомлений."""
def __init__(self, posts_dir: str = Project.POSTS_DIR):
self.posts_dir = posts_dir
self.global_posts: Dict[str, Dict[str, Any]] = {}
self.notifications: Dict[str, Dict[str, Any]] = {}
self.alert_texts: Dict[str, Dict[str, Any]] = {}
self._ensure_posts_dir()
self.load_all_posts()
def _ensure_posts_dir(self, directory: Optional[str] = None) -> None:
"""Создаёт директорию для хранения постов, если она не существует."""
dir_path = directory or self.posts_dir
if not path.isdir(dir_path):
makedirs(dir_path, exist_ok=True)
logs.info(
f"Created posts directory: {dir_path}",
log_type="STORAGE",
)
def _get_user_posts_file(self, user_id: int) -> str:
"""Возвращает путь к файлу с постами пользователя."""
return path.join(self.posts_dir, f"posts_{user_id}.json")
def _update_button_notifications(self, callback_data: str, notification_data: Dict[str, Any]) -> None:
"""Регистрирует данные уведомления кнопки во внутренних хранилищах."""
if not callback_data:
return
self.alert_texts[callback_data] = notification_data
self.notifications[callback_data] = notification_data
def _process_buttons(self, post_id: str, buttons: List[Any]) -> None:
"""
Обрабатывает кнопки поста, нормализует callback_data и регистрирует уведомления.
Поддерживает различные типы кнопок: callback, url, copy, inline.
"""
if not buttons:
return
for row_idx, row in enumerate(buttons):
btns = row if isinstance(row, list) else [row]
for col_idx, button in enumerate(btns):
if not isinstance(button, dict):
continue
if 'callback_data' in button:
cb_data = button['callback_data']
if not cb_data or not (cb_data.startswith('bt_') or cb_data.startswith('show_alert_')):
prefix = 'show_alert_' if button.get('show_alert') else 'bt_'
button['callback_data'] = f"{prefix}{post_id}_{row_idx}_{col_idx}"
cb_data = button['callback_data']
if 'notification' in button:
notification = {
'text': button['notification'],
'show_alert': button.get('show_alert', False),
'allowed_ids': button.get('allowed_ids'),
'unauthorized_message': button.get('unauthorized_message')
}
self._update_button_notifications(cb_data, notification)
logs.debug(
f"Registered notification for {cb_data}",
log_type="STORAGE",
)
def load_user_posts(self, user_id: int) -> Dict[str, Any]:
"""Загружает посты пользователя из файла."""
file_path = self._get_user_posts_file(user_id)
try:
if path.isfile(file_path):
with open(file_path, 'r', encoding='utf-8') as f:
posts = json.load(f)
if isinstance(posts, dict):
return posts
logs.warning(
f"Invalid posts format in {file_path}",
log_type="STORAGE",
)
except json.JSONDecodeError as e:
logs.error(
f"JSON decode error in {file_path}: {str(e)}",
log_type="STORAGE",
)
except Exception as e:
logs.error(
f"Error loading posts from {file_path}: {str(e)}",
log_type="STORAGE",
)
return {}
def save_user_posts(self, user_id: int, posts: Dict[str, Any]) -> None:
"""
Сохраняет посты пользователя в файл и обновляет внутренние хранилища.
Обрабатывает кнопки и уведомления перед сохранением.
"""
if not isinstance(posts, dict):
logs.error(
"Invalid posts format, expected dict",
log_type="STORAGE",
)
return
for post_id, post in posts.items():
if isinstance(post, dict) and 'buttons' in post:
self._process_buttons(post_id, post['buttons'])
file_path = self._get_user_posts_file(user_id)
try:
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(posts, f, ensure_ascii=False, indent=4)
logs.info(
f"Saved posts for user {user_id}",
log_type="STORAGE",
)
except Exception as e:
logs.error(
f"Error saving posts to {file_path}: {str(e)}",
log_type="STORAGE",
)
return
# Обновление кэша: перезагружаем записи этого пользователя
# Удаляем старые записи
for pid in list(self.global_posts):
if pid in posts:
self.global_posts.pop(pid, None)
# Загружаем свежие
fresh = self.load_user_posts(user_id)
for pid, post in fresh.items():
if isinstance(post, dict) and 'buttons' in post:
self._process_buttons(pid, post['buttons'])
self.global_posts[pid] = post
def delete_user_post(self, user_id: int, post_id: str) -> bool:
"""Удаляет пост пользователя и связанные уведомления. Возвращает статус операции."""
user_posts = self.load_user_posts(user_id)
if post_id not in user_posts:
logs.warning(
f"Post {post_id} not found for user {user_id}",
log_type="STORAGE",
)
return False
post = user_posts.pop(post_id)
notification_count = 0
if isinstance(post.get('buttons'), list):
for row in post['buttons']:
btns = row if isinstance(row, list) else [row]
for button in btns:
if isinstance(button, dict):
cb = button.get('callback_data')
if cb and cb in self.alert_texts:
self.alert_texts.pop(cb)
self.notifications.pop(cb, None)
notification_count += 1
logs.debug(
f"Removed {notification_count} notifications for post {post_id}",
log_type="STORAGE",
)
# Сохраняем и обновляем кэш
self.save_user_posts(user_id, user_posts)
self.global_posts.pop(post_id, None)
logs.info(
f"Deleted post {post_id} for user {user_id}",
log_type="STORAGE",
)
return True
def is_post_available(self, post_id: str) -> bool:
"""Проверяет доступность идентификатора поста."""
return post_id not in self.global_posts
def load_all_posts(self) -> None:
"""Загружает все посты из файлов в рабочей директории."""
self.global_posts.clear()
self.alert_texts.clear()
self.notifications.clear()
self._ensure_posts_dir()
loaded_files = 0
loaded_posts = 0
try:
for filename in listdir(self.posts_dir):
if filename.endswith('.json'):
user_id_str = filename[len('posts_'):-len('.json')]
try:
user_id = int(user_id_str)
except ValueError:
logs.warning(
f"Invalid filename format: {filename}",
log_type="STORAGE",
)
continue
posts = self.load_user_posts(user_id)
for pid, post in posts.items():
if isinstance(post, dict) and 'buttons' in post:
self._process_buttons(pid, post['buttons'])
self.global_posts[pid] = post
loaded_posts += 1
loaded_files += 1
except Exception as e:
logs.error(
f"Error loading all posts: {str(e)}",
log_type="STORAGE",
)
logs.info(
f"Loaded {loaded_posts} posts from {loaded_files} files",
log_type="STORAGE",
)
def get_post(self, post_id: str) -> Optional[Dict[str, Any]]:
"""Возвращает пост по идентификатору или None если не найден."""
return self.global_posts.get(post_id)
def get_notification(self, callback_data: str) -> Optional[Dict[str, Any]]:
"""Возвращает данные уведомления для указанного callback."""
return self.notifications.get(callback_data)
# Инициализация хранилища при импорте модуля
storage: PostStorage = PostStorage()

17
bot/handlers/__init__.py Normal file
View File

@@ -0,0 +1,17 @@
from aiogram import Router
from .post import router as post_routers
from .commands import router as cmd_routers
from .callback import router as callback_router
from .inline import router as inline_router
# Настройка экспорта и роутера
__all__ = ("router",)
router: Router = Router(name="handlers_router")
# Подключение роутеров
router.include_routers(
cmd_routers,
callback_router,
post_routers,
inline_router
)

46
bot/handlers/callback.py Normal file
View File

@@ -0,0 +1,46 @@
from typing import Optional
from aiogram import Router, F
from aiogram.types import CallbackQuery
from bot.core import storage
router: Router = Router(name="callback_router")
@router.callback_query(F.data.startswith("bt_"))
@router.callback_query(F.data.startswith("show_alert_"))
async def handle_button_alert(callback_query: CallbackQuery) -> None:
key: Optional[str] = callback_query.data
user_id: int = callback_query.from_user.id
# Получаем уведомление через хранилище
notif = storage.get_notification(key)
if not notif:
await callback_query.answer()
return
# Проверяем права доступа
allowed = notif.get("allowed_ids")
if allowed and user_id not in allowed:
msg = notif.get("unauthorized_message", "У вас нет доступа к этому уведомлению.")
await callback_query.answer(text=msg, show_alert=True)
return
text = notif.get("text", "")
show_alert = notif.get("show_alert", False)
try:
await callback_query.answer(text=text, show_alert=show_alert)
except Exception:
await callback_query.answer(text="Произошла ошибка при отображении уведомления.", show_alert=True)
@router.callback_query(F.data == "void")
async def handle_void_callback(callback_query: CallbackQuery) -> None:
"""
Обработка пустых callback-запросов (void).
Просто отвечает на callback без уведомления.
"""
try:
await callback_query.answer()
except Exception:
return

View File

@@ -0,0 +1,12 @@
from aiogram import Router
from .start import router as start_cmd_router
from .help import router as help_cmd_router
# Настройка экспорта и роутера
__all__ = ('router',)
router: Router = Router(name="cmd_router")
# Подготовка роутера команд
router.include_routers(start_cmd_router,
help_cmd_router,
)

View File

@@ -0,0 +1,47 @@
from aiogram import Router, F
from aiogram.filters import Command
from aiogram.fsm.context import FSMContext
from aiogram.types import Message, CallbackQuery, KeyboardButton
from aiogram.utils.keyboard import ReplyKeyboardBuilder
from aiogram.utils.i18n import gettext as _
from bot.templates import msg_photo
from bot.utils.interesting_facts import interesting_fact
from middleware.loggers import log
from bot.bots import BotInfo
from configs import COMMANDS, BotEdit
# Настройки экспорта и роутера
__all__ = ("router",)
CMD: str = "help".lower()
router: Router = Router(name=f"{CMD}_cmd_router")
@router.callback_query(F.data == CMD)
@router.message(Command(*COMMANDS[CMD], prefix=BotInfo.prefix, ignore_case=True))
@log(level='INFO', log_type=CMD.upper(), text=f"использовал команду /{CMD}")
async def help_cmd(message: Message | CallbackQuery, state: FSMContext) -> None:
"""
Обработчик команды /help
Args:
message (Message | CallbackQuery): Сообщение или callback-запрос от пользователя.
state (FSMContext): Состояние пользователя бота.
"""
await state.clear()
# Создаем клавиатуру с кнопками
rkb: ReplyKeyboardBuilder = ReplyKeyboardBuilder()
rkb.row(KeyboardButton(text=_("Создать пост📔")))
rkb.row(KeyboardButton(text=_("Посмотреть список📋")))
# Формируем приветственное сообщение
text: str = _(
"""Добро пожаловать, <a href="{url}">{name}</a>!"""
).format(
url=message.from_user.url if message.from_user else "",
name=message.from_user.first_name if message.from_user else "пользователь",
)
# Отправляем сообщение
await msg_photo(message=message, text=text, file='assets/start.jpg', markup=rkb)

View File

@@ -0,0 +1,57 @@
from aiogram import Router, F
from aiogram.filters import Command
from aiogram.fsm.context import FSMContext
from aiogram.types import Message, CallbackQuery, KeyboardButton
from aiogram.utils.keyboard import ReplyKeyboardBuilder
from aiogram.utils.i18n import gettext as _
from bot.templates import msg_photo
from bot.utils.interesting_facts import interesting_fact
from middleware.loggers import log
from bot.bots import BotInfo
from configs import COMMANDS, BotEdit
# Настройки экспорта и роутера
__all__ = ("router",)
CMD: str = "start".lower()
router: Router = Router(name=f"{CMD}_cmd_router")
@router.callback_query(F.data == CMD)
@router.message(Command(*COMMANDS[CMD], prefix=BotInfo.prefix, ignore_case=True))
@log(level='INFO', log_type=CMD.upper(), text=f"использовал команду /{CMD}")
async def start_cmd(message: Message | CallbackQuery, state: FSMContext) -> None:
"""
Обработчик команды /start
Args:
message (Message | CallbackQuery): Сообщение или callback-запрос от пользователя.
state (FSMContext): Состояние пользователя бота.
"""
await state.clear()
# Создаем клавиатуру с кнопками
rkb: ReplyKeyboardBuilder = ReplyKeyboardBuilder()
rkb.row(KeyboardButton(text=_("Создать пост📔")))
rkb.row(KeyboardButton(text=_("Посмотреть список📋")))
# Формируем приветственное сообщение
text: str = _(
"""Добро пожаловать, <a href="{url}">{name}</a>!
Мое имя - <b>{bot_name}</b>! Я искусственный интеллект и сказитель ваших историй!
Моя цель — помочь вам сориентироваться и сделать ваши истории куда интереснее!
Надеюсь, я смогу вам помочь! Пожалуйста, выберите нужную функцию на клавиатуре!
Интересный факт:
<blockquote>{fact}</blockquote>
"""
).format(
url=message.from_user.url if message.from_user else "",
name=message.from_user.first_name if message.from_user else "пользователь",
bot_name=BotEdit.PROJECT_NAME,
fact=interesting_fact(),
)
# Отправляем сообщение
await msg_photo(message=message, text=text, file='assets/start.jpg', markup=rkb)

174
bot/handlers/inline.py Normal file
View File

@@ -0,0 +1,174 @@
# BotCode/handlers/inline.py
from aiogram import Router
from aiogram.types import (
InlineKeyboardButton,
InlineKeyboardMarkup,
InlineQuery,
InputTextMessageContent,
InlineQueryResultArticle,
SwitchInlineQueryChosenChat,
CopyTextButton,
)
from aiogram.utils.markdown import hide_link
from bot.core import storage
from bot.loggers import logs
router: Router = Router(name="inline_send")
def build_markup(buttons_def: list[list[dict]]) -> InlineKeyboardMarkup | None:
"""
Создаёт InlineKeyboardMarkup из списка описаний кнопок.
Поддерживает URL, callback, inline-моды.
Обрабатывает "void"-кнопки как callback_data="void".
Для switch_inline_query_chosen_chat устанавливает хотя бы один allow_* True.
"""
if not buttons_def:
return None
rows: list[list[InlineKeyboardButton]] = []
for row_idx, row in enumerate(buttons_def):
if not isinstance(row, list):
logs.warning(f"Некорректный формат ряда кнопок: {row}")
continue
kb_row: list[InlineKeyboardButton] = []
for col_idx, b in enumerate(row):
if not isinstance(b, dict):
logs.warning(f"Некорректный формат кнопки в ряду {row_idx}: {b}")
continue
text = b.get("text", "")
if not text:
logs.warning(f"Пустой текст кнопки в ряду {row_idx}, колонке {col_idx}")
continue
btn = None
try:
if "url" in b:
url = b["url"]
if url.lower().endswith("void"):
btn = InlineKeyboardButton(text=text, callback_data="void")
else:
btn = InlineKeyboardButton(text=text, url=url)
elif "switch_inline_query" in b:
btn = InlineKeyboardButton(
text=text,
switch_inline_query=b["switch_inline_query"]
)
elif "switch_inline_query_current_chat" in b:
btn = InlineKeyboardButton(
text=text,
switch_inline_query_current_chat=b["switch_inline_query_current_chat"]
)
elif "switch_inline_query_chosen_chat" in b:
query = b["switch_inline_query_chosen_chat"]
if isinstance(query, dict):
siqcc = SwitchInlineQueryChosenChat(
query=query.get("query", ""),
allow_user_chats=query.get("allow_user_chats", True),
allow_group_chats=query.get("allow_group_chats", True),
allow_channel_chats=query.get("allow_channel_chats", True),
allow_bot_chats=query.get("allow_bot_chats", False),
)
else:
siqcc = SwitchInlineQueryChosenChat(
query=query,
allow_user_chats=True,
allow_group_chats=True,
allow_channel_chats=True,
allow_bot_chats=False,
)
btn = InlineKeyboardButton(
text=text,
switch_inline_query_chosen_chat=siqcc
)
elif "copy_text" in b:
btn = InlineKeyboardButton(
text=text,
copy_text=CopyTextButton(text=b["copy_text"])
)
elif "callback_data" in b:
btn = InlineKeyboardButton(
text=text,
callback_data=b["callback_data"]
)
except Exception as e:
logs.error(f"Ошибка при создании кнопки в ряду {row_idx}, колонке {col_idx}: {e}")
continue
if btn:
kb_row.append(btn)
if kb_row:
rows.append(kb_row)
if not rows:
return None
return InlineKeyboardMarkup(inline_keyboard=rows)
@router.inline_query()
async def inline_query_handler(inline_query: InlineQuery):
"""
Обрабатывает инлайн-запросы для поиска и отправки постов.
Фильтрует посты по приватности и поисковому запросу.
"""
# Перезагружаем все посты из файлов на случай изменений
storage.load_all_posts()
query = inline_query.query or ""
user_id = inline_query.from_user.id
username = inline_query.from_user.username or f"user_{user_id}"
logs.debug(f"Получен инлайн-запрос от {username} (ID: {user_id}): {query}")
results = []
for post_id, post in storage.global_posts.items():
try:
# Проверка приватности
if post.get("private") and post.get("user_id") != user_id:
continue
# Проверка поискового запроса
if query and query.lower() not in post_id.lower():
continue
# Тело сообщения
text = post.get("text", "")
image = post.get("image", "")
if image and image.startswith("http"):
text = f"{hide_link(image)}{text}"
# Клавиатура
markup = build_markup(post.get("buttons", []))
results.append(
InlineQueryResultArticle(
id=post_id,
title=f"Пост {post_id}",
description=(post.get("text", "")[:100] + "...") if len(post.get("text", "")) > 100 else post.get(
"text", ""),
input_message_content=InputTextMessageContent(message_text=text),
reply_markup=markup
)
)
except Exception as e:
logs.error(f"Ошибка при обработке поста {post_id}: {e}")
continue
logs.info(f"Отправлено {len(results)} результатов для запроса '{query}' от {username} (ID: {user_id})")
try:
await inline_query.answer(results, cache_time=0, is_personal=True)
except Exception as e:
logs.error(f"Ошибка при отправке результатов инлайн-запроса: {e}")
__all__ = [
'router',
'inline_query_handler'
]

View File

@@ -0,0 +1,13 @@
from aiogram import Router
from .create_posts import router as posts_router
from .post_list import router as post_list_router
# Настройки экспорта и роутера
__all__ = ("router", )
router: Router = Router(name="post_router")
# Подключение роутеров
router.include_routers(
post_list_router,
posts_router,
)

View File

@@ -0,0 +1,420 @@
# bot/modules/create_post.py
import re
import uuid
from threading import Lock
from aiogram import Router, F
from aiogram.types import (
Message, CallbackQuery,
InlineKeyboardButton, InlineKeyboardMarkup
)
from aiogram.fsm.state import State, StatesGroup
from aiogram.fsm.context import FSMContext
from bot.core import storage
router: Router = Router(name="create_post_router")
class PostState(StatesGroup):
waiting_for_text = State()
waiting_for_privacy = State()
waiting_for_id = State()
waiting_for_image = State()
waiting_for_buttons = State()
preview = State()
editing_choice = State()
post_id_lock: Lock = Lock()
# --- Utility functions ---
def make_inline_markup(rows: list[list[InlineKeyboardButton]]) -> InlineKeyboardMarkup:
return InlineKeyboardMarkup(inline_keyboard=rows)
def cancel_button() -> InlineKeyboardMarkup:
return make_inline_markup([[InlineKeyboardButton(text="Отмена", callback_data="cancel_creation")]])
def privacy_markup(is_private: bool) -> InlineKeyboardMarkup:
toggle = InlineKeyboardButton(
text="🔒 Приватный" if is_private else "🔓 Публичный",
callback_data="toggle_privacy"
)
cont = InlineKeyboardButton(text="Продолжить ➡️", callback_data="continue_creation")
return make_inline_markup([[toggle], [cont]])
def parse_buttons(text: str, post_id: str) -> list[list[dict]]:
"""
Поддерживается синтаксис:
Текст | msg:Только для боссов | 123,456 | msg:Для всех остальных
Текст | ntf:Без алерта | 789 | msg:Нет доступа
"""
rows: list[list[dict]] = []
button_index = 0
for line in text.splitlines():
line = line.strip()
if not line:
continue
# каждая строка может содержать несколько кнопок через ';'
btn_texts = [b.strip() for b in line.split(';') if b.strip()]
row: list[dict] = []
for raw in btn_texts:
parts = [p.strip() for p in raw.split('|')]
if len(parts) < 2:
raise ValueError(f"Неверный формат кнопки: '{raw}'")
btn = {"text": parts[0]}
primary_notification = None
primary_alert = False
allowed_ids = None
unauthorized_message = None
# обрабатываем параметры слева направо
for part in parts[1:]:
# URL / void
if part == "void":
btn["url"] = "http://void"
elif part.startswith("http") or part.startswith("tg://"):
btn["url"] = part
# первое уведомление (msg: — с алертом)
elif part.startswith("msg:") and primary_notification is None:
primary_notification = part.split(":", 1)[1]
primary_alert = True
# первое уведомление без алерта
elif part.startswith(("ntf:", "notification:")) and primary_notification is None:
primary_notification = part.split(":", 1)[1]
primary_alert = False
# список разрешённых ID
elif re.fullmatch(r'\d+(?:\s*,\s*\d+)*', part):
allowed_ids = [int(x.strip()) for x in part.split(",")]
# второе сообщение — для неавторизованных
elif part.startswith("msg:") and primary_notification is not None and allowed_ids is not None:
unauthorized_message = part.split(":", 1)[1]
# копирование текста
elif part.startswith("copy:"):
btn["callback_data"] = f"copy_{uuid.uuid4().hex}"
btn["copy_text"] = part.split(":", 1)[1]
# inline-параметры
elif part.startswith("inline:"):
btn["switch_inline_query"] = part.split(":", 1)[1]
elif part.startswith("inline_current:"):
btn["switch_inline_query_current_chat"] = part.split(":", 1)[1]
elif part.startswith("inline_chosen:"):
btn["switch_inline_query_chosen_chat"] = part.split(":", 1)[1]
# произвольный callback_data (если ещё не задан)
else:
if "callback_data" not in btn and "url" not in btn:
btn["callback_data"] = part
# если было уведомление — добавляем поля
if primary_notification is not None:
btn["callback_data"] = f"bt_{post_id}_{button_index}"
button_index += 1
btn["notification"] = primary_notification
btn["show_alert"] = primary_alert
if allowed_ids is not None:
btn["allowed_ids"] = allowed_ids
if unauthorized_message is not None:
btn["unauthorized_message"] = unauthorized_message
# финализируем кнопку
row.append(btn)
if row:
rows.append(row)
return rows
# --- Handlers ---
@router.message(F.text == "Создать пост📔")
async def start_creation(message: Message, state: FSMContext) -> None:
await state.set_state(PostState.waiting_for_text)
await state.update_data(private=False, buttons=[])
await message.reply(
text="Отправьте текст вашего поста:\n<i>Вы также можете использовать разметку</i>(<b>жирный</b>, <i>курсив</i> и <u>прочие</u>)!",
reply_markup=cancel_button()
)
@router.message(PostState.waiting_for_text)
async def got_text(message: Message, state: FSMContext) -> None:
html_text = message.html_text or message.text or message.caption or ""
await state.update_data(text=html_text)
await show_preview(message, state)
@router.callback_query(F.data == "toggle_privacy")
async def toggle_privacy(cq: CallbackQuery, state: FSMContext) -> None:
data = await state.get_data()
is_priv = not data.get('private', False)
await state.update_data(private=is_priv)
await cq.message.edit_reply_markup(reply_markup=privacy_markup(is_priv))
await cq.answer()
@router.callback_query(F.data == "continue_creation")
async def continue_to_id(cq: CallbackQuery, state: FSMContext) -> None:
await state.set_state(PostState.waiting_for_id)
await cq.message.edit_text(
"Введите уникальный ID поста (латиница, цифры, подчёрки):\n<i>Совет: инициалыРП_роль_тип_номер</i>\nПример: sgrp_dottore_post_4")
await cq.answer()
@router.message(PostState.waiting_for_id)
async def got_id(message: Message, state: FSMContext) -> None:
pid = message.text.strip()
if not pid.replace('_', '').isalnum():
await message.reply(text="ID должен содержать только латиницу, цифры и подчёркивания.",
reply_markup=cancel_button())
return
with post_id_lock:
if not storage.is_post_available(pid):
await message.reply(text="Этот ID уже занят, введите другой:", reply_markup=cancel_button())
return
# Создаем клавиатуру с кнопкой "Без изображения"
image_markup = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="🚫 Без изображения", callback_data="no_image")],
[InlineKeyboardButton(text="❌ Отмена", callback_data="cancel_creation")]
])
await state.update_data(post_id=pid)
await state.set_state(PostState.waiting_for_image)
await message.reply(
text="Отправьте ссылку на изображение:\nПример: https://img4.teletype.in/files/f2/47/...\n\nСовет! Сохраняйте фотографии в teletype, а после копируйте ссылку на фотографию!\n\nИли нажмите 'Без изображения'.",
reply_markup=image_markup
)
@router.callback_query(F.data == "no_image", PostState.waiting_for_image)
async def no_image_callback(cq: CallbackQuery, state: FSMContext):
await state.update_data(image='')
await state.set_state(PostState.waiting_for_buttons)
await cq.message.delete()
buttons_markup = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="🚫 Без кнопок", callback_data="no_buttons")],
[InlineKeyboardButton(text="❌ Отмена", callback_data="cancel_creation")]
])
await cq.message.answer(
text="""Отправьте кнопки по шаблону:
Кнопка заглушка | void
Уведомление | msg:Для вас!
Уведомление в закрепе | ntf:Сообщение
Кнопка ссылка | https://google.com
Копирование | copy:Текст для копирования
Для уведомлений с ограничением:
Уведомление | msg:Для вас! | 123,456 | msg:Для всех остальных!
Уведомление без алерта | ntf:Сообщение | 789 | msg:Нет доступа
Разделять кнопки через ;
Кнопка1 | void ; Кнопка2 | void ; ....
Или нажмите "Без кнопок".""",
reply_markup=buttons_markup,
parse_mode=None
)
await cq.answer()
@router.message(PostState.waiting_for_image)
async def got_image(message: Message, state: FSMContext) -> None:
img: str = message.text.strip()
if img.lower() in ('нет', 'no', 'none', 'без изображения'):
img: str = ''
await state.update_data(image=img)
await show_preview(message, state)
@router.callback_query(PostState.waiting_for_buttons, F.data == "no_buttons")
async def no_buttons_handler(callback: CallbackQuery, state: FSMContext) -> None:
await state.update_data(buttons=[])
await show_preview(callback.message, state)
await callback.answer()
@router.callback_query(PostState.waiting_for_buttons, F.data == "finish_buttons")
async def finish_buttons_handler(callback: CallbackQuery, state: FSMContext) -> None:
data = await state.get_data()
# Формируем финальные кнопки
final = []
for row in data.get('buttons', []):
final_row = []
for b in row:
btn = {"text": b["text"]}
if "url" in b:
btn["url"] = b["url"]
if "switch_inline_query" in b:
btn["switch_inline_query"] = b["switch_inline_query"]
if "callback_data" in b:
btn["callback_data"] = b["callback_data"]
if "notification" in b:
btn["notification"] = b["notification"]
btn["show_alert"] = b.get("show_alert", False)
final_row.append(btn)
final.append(final_row)
await state.update_data(buttons=final)
await show_preview(callback.message, state)
await callback.answer()
@router.callback_query(F.data == "cancel_creation")
async def cancel_handler(callback: CallbackQuery, state: FSMContext):
await state.clear()
await callback.message.edit_text("❌ Создание поста отменено")
await callback.answer()
# --- Preview and Edit Handlers ---
async def show_preview(message: Message, state: FSMContext) -> None:
data = await state.get_data()
text = data.get('text', '')
image = data.get('image', '')
buttons = data.get('buttons', [])
private = data.get('private', False)
post_id = data.get('post_id', '')
# Формируем текст предпросмотра
preview_text = f"<b>ПРЕДПРОСМОТР ПОСТА</b>\n\n{text}\n\n"
preview_text += f"🆔 ID: <code>{post_id}</code>\n"
preview_text += f"🔒 Приватность: {'Приватный' if private else 'Публичный'}\n"
if image:
preview_text += f"🖼 Изображение: {image}\n"
else:
preview_text += f"🖼 Изображение: отсутствует\n"
if buttons:
preview_text += "\n🔘 Кнопки:\n"
for row in buttons:
preview_text += " | ".join([btn['text'] for btn in row]) + "\n"
else:
preview_text += "\n🔘 Кнопки: отсутствуют\n"
# Клавиатура предпросмотра
preview_markup = InlineKeyboardMarkup(inline_keyboard=[
[
InlineKeyboardButton(text="Изменить", callback_data="edit_post"),
InlineKeyboardButton(text="Подтвердить", callback_data="confirm_post")
],
[
InlineKeyboardButton(text="Отменить создание", callback_data="cancel_creation")
]
])
await state.set_state(PostState.preview)
await message.answer(preview_text, reply_markup=preview_markup, disable_web_page_preview=True)
@router.callback_query(PostState.preview, F.data == "edit_post")
async def edit_post_handler(cq: CallbackQuery, state: FSMContext) -> None:
# Клавиатура выбора поля для редактирования
edit_markup = InlineKeyboardMarkup(inline_keyboard=[
[
InlineKeyboardButton(text="Текст", callback_data="edit_field:text"),
InlineKeyboardButton(text="Изображение", callback_data="edit_field:image"),
],
[
InlineKeyboardButton(text="Кнопки", callback_data="edit_field:buttons"),
InlineKeyboardButton(text="ID", callback_data="edit_field:id"),
],
[
InlineKeyboardButton(text="Приватность", callback_data="edit_field:privacy"),
InlineKeyboardButton(text="Назад", callback_data="back_to_preview"),
]
])
await cq.message.edit_text("Выберите что изменить:", reply_markup=edit_markup)
await state.set_state(PostState.editing_choice)
await cq.answer()
@router.callback_query(PostState.editing_choice, F.data == "back_to_preview")
async def back_to_preview(cq: CallbackQuery, state: FSMContext) -> None:
await show_preview(cq.message, state)
await cq.answer()
@router.callback_query(PostState.editing_choice, F.data.startswith("edit_field:"))
async def handle_field_edit(cq: CallbackQuery, state: FSMContext) -> None:
field = cq.data.split(":")[1]
if field == "text":
await state.set_state(PostState.waiting_for_text)
await cq.message.edit_text("Введите новый текст поста:", reply_markup=cancel_button())
elif field == "image":
await state.set_state(PostState.waiting_for_image)
markup = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="🚫 Без изображения", callback_data="no_image")],
[InlineKeyboardButton(text="❌ Отмена", callback_data="cancel_creation")]
])
await cq.message.edit_text(
"Отправьте новую ссылку на изображение или нажмите 'Без изображения':",
reply_markup=markup
)
elif field == "buttons":
await state.set_state(PostState.waiting_for_buttons)
markup = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(text="🚫 Без кнопок", callback_data="no_buttons")],
[InlineKeyboardButton(text="❌ Отмена", callback_data="cancel_creation")]
])
await cq.message.edit_text(
"Отправьте новые кнопки по шаблону или нажмите 'Без кнопок':",
reply_markup=markup
)
elif field == "id":
await state.set_state(PostState.waiting_for_id)
await cq.message.edit_text("Введите новый ID поста:", reply_markup=cancel_button())
elif field == "privacy":
data = await state.get_data()
await state.set_state(PostState.waiting_for_privacy)
await cq.message.edit_text(
"Измените приватность поста:",
reply_markup=privacy_markup(data.get('private', False))
)
await cq.answer()
@router.callback_query(PostState.preview, F.data == "confirm_post")
async def confirm_post_handler(cq: CallbackQuery, state: FSMContext) -> None:
data = await state.get_data()
post_id = data['post_id']
# Сохранение поста в хранилище
storage.save_post(post_id, {
'text': data['text'],
'image': data.get('image', ''),
'buttons': data.get('buttons', []),
'private': data['private'],
'post_id': post_id
})
await cq.message.edit_text(f"✅ Пост успешно создан с ID: <code>{post_id}</code>")
await state.clear()
await cq.answer()

View File

@@ -0,0 +1,209 @@
from math import ceil
from typing import Final
from aiogram import Router, F
from aiogram.fsm.context import FSMContext
from aiogram.types import (
Message, CallbackQuery,
InlineKeyboardButton, InlineKeyboardMarkup,
SwitchInlineQueryChosenChat, CopyTextButton
)
from aiogram.exceptions import TelegramBadRequest
from aiogram.utils.markdown import hide_link
from bot.core import storage
from bot.utils import pagination_btn
router: Router = Router(name="posts_manager_router")
PAGE_SIZE: Final[int] = 5
async def send_posts_list(
message: Message = None,
callback_query: CallbackQuery = None,
page: int = 0
) -> None:
"""Отправляет список постов пользователя с пагинацией."""
user_id = message.from_user.id if message else callback_query.from_user.id
posts = storage.load_user_posts(user_id)
if not posts:
msg = "Нет сохранённых постов."
if message:
await message.answer(msg)
else:
await callback_query.answer(msg, show_alert=True)
return
post_ids = list(posts.keys())
total = len(post_ids)
pages = ceil(total / PAGE_SIZE)
page = max(0, min(page, pages - 1))
start = page * PAGE_SIZE
end = start + PAGE_SIZE
current_ids = post_ids[start:end]
rows: list[list[InlineKeyboardButton]] = []
for pid in current_ids:
post = posts[pid]
priv = "🔒" if post.get("private") else "🔓"
btn = InlineKeyboardButton(
text=f"{priv} Пост {pid}",
callback_data=f"view_post_{pid}"
)
rows.append([btn])
# Пагинация
nav_buttons = pagination_btn(
action="open_post_list",
page=page,
total_posts=total,
bt_page=PAGE_SIZE
)
if nav_buttons:
rows.append(nav_buttons)
# Кнопка закрытия
rows.append([InlineKeyboardButton(text="Закрыть❌", callback_data="cancel_list")])
keyboard = InlineKeyboardMarkup(inline_keyboard=rows)
header: str = "Список ваших постов:"
try:
if callback_query:
await callback_query.message.edit_text(header, reply_markup=keyboard)
else:
await message.answer(header, reply_markup=keyboard)
except TelegramBadRequest:
if callback_query:
await callback_query.message.delete()
await callback_query.message.answer(header, reply_markup=keyboard)
else:
await message.answer(header, reply_markup=keyboard)
# --- Хендлеры списка ---
@router.message(F.text.lower() == "посмотреть список📋")
async def cmd_list(message: Message, state: FSMContext):
# Сбрасываем состояние перед показом списка
await state.clear()
await send_posts_list(message=message)
@router.callback_query(F.data == "open_post_list")
async def cb_open_list(cq: CallbackQuery, state: FSMContext):
await state.clear()
await send_posts_list(callback_query=cq)
await cq.answer()
@router.callback_query(F.data.startswith("open_post_list_page_"))
async def cb_paginate(cq: CallbackQuery, state: FSMContext):
try:
page = int(cq.data.rsplit("_", 1)[-1])
except ValueError:
await cq.answer("Некорректная страница", show_alert=True)
return
await state.clear()
await send_posts_list(callback_query=cq, page=page)
await cq.answer()
@router.callback_query(F.data == "cancel_list")
async def cb_cancel(cq: CallbackQuery):
await cq.message.delete()
await cq.answer()
@router.callback_query(F.data.startswith("view_post_"))
async def view_post_callback(cq: CallbackQuery):
"""Просмотр отдельного поста"""
pid = cq.data.replace("view_post_", "")
uid = cq.from_user.id
posts = storage.load_user_posts(uid)
if pid not in posts:
await cq.answer("Пост не найден", show_alert=True)
return
post = posts[pid]
text = post.get("text", "")
img = post.get("image", "")
if img.startswith("http"):
text = f"{hide_link(img)}{text}"
rows: list[list[InlineKeyboardButton]] = []
for row in post.get("buttons", []):
btns: list[InlineKeyboardButton] = []
for b in row:
if "copy_text" in b:
btns.append(
InlineKeyboardButton(
text=b["text"],
copy_text=CopyTextButton(text=b["copy_text"])
)
)
elif "switch_inline_query" in b:
btns.append(
InlineKeyboardButton(
text=b["text"],
switch_inline_query=b["switch_inline_query"]
)
)
elif "switch_inline_query_current_chat" in b:
btns.append(
InlineKeyboardButton(
text=b["text"],
switch_inline_query_current_chat=b["switch_inline_query_current_chat"]
)
)
elif "switch_inline_query_chosen_chat" in b:
raw = b["switch_inline_query_chosen_chat"]
cfg = raw if isinstance(raw, dict) else {
"query": raw,
"allow_user_chats": True
}
btns.append(
InlineKeyboardButton(
text=b["text"],
switch_inline_query_chosen_chat=SwitchInlineQueryChosenChat(**cfg)
)
)
elif "url" in b:
url = b["url"]
if url.lower().endswith("void"):
btns.append(
InlineKeyboardButton(text=b["text"], callback_data="void")
)
else:
btns.append(
InlineKeyboardButton(text=b["text"], url=url)
)
elif "callback_data" in b:
btns.append(
InlineKeyboardButton(text=b["text"], callback_data=b["callback_data"])
)
if btns:
rows.append(btns)
# Удалить / назад
rows.append([
InlineKeyboardButton(text="Удалить❌", callback_data=f"delete_post_{pid}"),
InlineKeyboardButton(text="Назад◀️", callback_data="open_post_list")
])
rows.append(
[InlineKeyboardButton(text="Отправить↪️", switch_inline_query=f"{pid}")])
keyboard = InlineKeyboardMarkup(inline_keyboard=rows)
await cq.message.answer(text=text, reply_markup=keyboard)
await cq.message.delete()
await cq.answer()
@router.callback_query(lambda c: c.data and c.data.startswith("delete_post_"))
async def delete_post_callback(cq: CallbackQuery, state: FSMContext):
"""Удаление поста."""
pid = cq.data.replace("delete_post_", "")
uid = cq.from_user.id
if storage.delete_user_post(uid, pid):
await cq.answer(f"Пост {pid} удалён")
await state.clear()
await send_posts_list(callback_query=cq)
else:
await cq.answer(text="Не удалось удалить пост", show_alert=True)

View File

@@ -0,0 +1 @@
# bot/keyboards/__init__.py

View File

@@ -0,0 +1,3 @@
# bot/keyboards/inline/__init__.py
from .decision import *

View File

@@ -0,0 +1,22 @@
# bot/keyboards/decision.py
from aiogram.utils.keyboard import InlineKeyboardBuilder
from aiogram.types import InlineKeyboardMarkup, InlineKeyboardButton
# Настройка экспорта
__all__ = ("get_decision_keyboard",)
def get_decision_keyboard(thread_id: int, kind: str) -> InlineKeyboardMarkup:
"""
Создание клавиатуры принять\отклонить.
:param thread_id: Айди запроса.
:param kind: Вид предполагаемого действия.
:return: Разметку клавиатуры для сообщения бота.
"""
ikb: InlineKeyboardBuilder = InlineKeyboardBuilder()
ikb.row(
InlineKeyboardButton(text="✅ Принять", callback_data=f"{kind}:accept:{thread_id}"),
InlineKeyboardButton(text="❌ Отклонить", callback_data=f"{kind}:reject:{thread_id}")
)
return ikb.as_markup()

View File

@@ -0,0 +1,2 @@
# bot/keyboards/reply/__init__.py

5
bot/loggers/__init__.py Normal file
View File

@@ -0,0 +1,5 @@
# BotLibrary/loggers/__init__.py
# Инициализация модуля loggers, для настройки логеров
# Экспортирование модулей во внешние слои проекта
from .logs import *

147
bot/loggers/logs.py Normal file
View File

@@ -0,0 +1,147 @@
"""
Модуль логирования для Telegram-бота.
Особенности:
* Вывод логов в консоль и/или файл
* Автоматическая ротация и удержание
* Форматирование с информацией о системе, типе события и пользователе
* Удобные методы для разных уровней логирования
"""
from sys import stderr
from pathlib import Path
from typing import Optional, Final, Union
from loguru import logger
from aiogram.types import Message, User
try:
from configs.config import LogConfig
except ImportError:
class LogConfig:
"""Запасные настройки логирования, если config недоступен."""
CONSOLE: Final[bool] = True
FILE: Final[bool] = True
DIR: Final[Path] = Path('Logs')
ROTATION: Final[str] = '100 MB'
RETENTION: Final[str] = '7 days'
# Настройка экспорта в модули
__all__ = ['Logs', 'logs']
class Logs:
"""
Класс для работы с логированием через loguru.
"""
_SYSTEM_NAME: Final[str] = 'PRIMO' # Исправлено: убран обратный слэш
_LOG_FORMAT: Final[str] = (
'<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> <red>|</red> ' # Исправлено форматирование времени
'<blue>{extra[system]}-{extra[log_type]}</blue> <red>| '
'{extra[user]} |</red> <level>{message}</level>'
)
@staticmethod
def _format_user(message: Optional[Message]) -> str:
"""
Форматирует информацию о пользователе из сообщения.
"""
if not message or not message.from_user:
return '@System'
user: User = message.from_user
return f"@{user.username}" if user.username else f"id{user.id}"
@classmethod
def _log(cls,
level: Union[str, int],
text: str,
log_type: str,
message: Optional[Message] = None) -> None:
"""Внутренний метод логирования."""
user_ctx = cls._format_user(message)
logger.bind(
system=cls._SYSTEM_NAME,
user=user_ctx,
log_type=log_type,
).log(level, text)
@classmethod
def setup(cls, start: bool = True) -> None:
"""Инициализация логирования: консоль и/или файл."""
logger.remove()
# Консольный вывод
if getattr(LogConfig, 'CONSOLE', False):
logger.add(
stderr,
format=cls._LOG_FORMAT,
colorize=True,
level='DEBUG',
filter=lambda rec: rec['extra'].get('log_type') != 'DEBUG'
)
# Файловый вывод с ротацией
if getattr(LogConfig, 'FILE', False):
log_dir = getattr(LogConfig, 'DIR', Path('logs'))
log_dir.mkdir(parents=True, exist_ok=True)
logger.add(
log_dir / 'bot.log',
rotation=getattr(LogConfig, 'ROTATION', '100 MB'),
retention=getattr(LogConfig, 'RETENTION', '7 days'),
format=cls._LOG_FORMAT,
level='DEBUG',
enqueue=True,
backtrace=True,
diagnose=True
)
# Добавляем вызов start() если нужно
if start:
cls.start()
@classmethod
def start(cls, text: str = 'Запуск бота...', log_type: str = 'START') -> None:
"""Логирование старта приложения."""
cls._log(level='INFO', text=text, log_type=log_type)
@classmethod
def debug(cls,
text: str,
log_type: str = 'DEBUG',
message: Optional[Message] = None) -> None:
cls._log(level='DEBUG', text=text, log_type=log_type, message=message)
@classmethod
def info(cls,
text: str,
log_type: str = 'INFO',
message: Optional[Message] = None) -> None:
cls._log(level='INFO', text=text, log_type=log_type, message=message)
@classmethod
def warning(cls,
text: str,
log_type: str = 'WARNING',
message: Optional[Message] = None) -> None:
cls._log(level='WARNING', text=text, log_type=log_type, message=message)
@classmethod
def error(cls,
text: str,
log_type: str = 'ERROR',
message: Optional[Message] = None) -> None:
cls._log(level='ERROR', text=text, log_type=log_type, message=message)
@classmethod
def exception(cls,
text: str,
exception: Exception,
log_type: str = 'EXCEPTION',
message: Optional[Message] = None) -> None:
full_text = f"{text}\nException: {exception!r}"
cls._log(level='ERROR', text=full_text, log_type=log_type, message=message)
# Инициализация экземпляра логгера
logs = Logs()
logs.setup()

View File

@@ -0,0 +1 @@
from .message_callback import *

View File

@@ -0,0 +1,77 @@
from typing import Union
from aiogram.types import FSInputFile, CallbackQuery, Message, ReplyKeyboardMarkup, InlineKeyboardMarkup
from aiogram.utils.keyboard import ReplyKeyboardBuilder, InlineKeyboardBuilder
# Настройка экспорта
__all__ = ('msg', 'msg_photo')
async def msg(
message: Message | CallbackQuery,
text: str,
markup: Union[InlineKeyboardBuilder, ReplyKeyboardBuilder, None] = None
) -> None:
"""
Шаблон для ответа на сообщение текстом.
:param message: Объект сообщения или callback-запроса.
:param text: Текст отправного сообщения от бота.
:param markup: Кнопки сообщения (инлайн или реплай).
"""
# Преобразуем клавиатуру
reply_markup: Union[InlineKeyboardMarkup, ReplyKeyboardMarkup, None] = None
if markup:
if isinstance(markup, InlineKeyboardBuilder):
reply_markup = markup.as_markup()
elif isinstance(markup, ReplyKeyboardBuilder):
reply_markup = markup.as_markup(resize_keyboard=True)
# Обработчик ответа на сообщение
if isinstance(message, Message):
await message.reply(
text=text,
reply_markup=reply_markup
)
# Обработчик ответа на callback
else:
await message.message.reply(
text=text,
reply_markup=reply_markup
)
async def msg_photo(
message: Message | CallbackQuery,
text: str,
file: str,
markup: Union[InlineKeyboardBuilder, ReplyKeyboardBuilder, None] = None
) -> None:
"""
Шаблон для ответа на сообщение фотографией.
:param message: Объект сообщения или callback-запроса.
:param file: Путь к фотографии для ответа.
:param text: Подпись к фото.
:param markup: Кнопки сообщения (инлайн или реплай).
"""
# Преобразуем клавиатуру
reply_markup: Union[InlineKeyboardMarkup, ReplyKeyboardMarkup, None] = None
if markup:
if isinstance(markup, InlineKeyboardBuilder):
reply_markup = markup.as_markup()
elif isinstance(markup, ReplyKeyboardBuilder):
reply_markup = markup.as_markup(resize_keyboard=True)
# Обработчик ответа на сообщение
if isinstance(message, Message):
await message.reply_photo(
photo=FSInputFile(file),
caption=text,
reply_markup=reply_markup
)
# Обработчик ответа на callback
else:
await message.message.reply_photo(
photo=FSInputFile(file),
caption=text,
reply_markup=reply_markup
)

4
bot/utils/__init__.py Normal file
View File

@@ -0,0 +1,4 @@
from .interesting_facts import *
from .md2_escape import *
from .usernames import *
from .pagination import *

View File

@@ -0,0 +1,29 @@
from random import choice
from configs.config import Lists
# Настройки экспорта
__all__ = ("interesting_fact",)
def interesting_fact(mode: str = "факт", lists: list[str] = None) -> str:
"""
Возвращает случайный факт, анекдот или цитату, в зависимости от режима.
:param mode: Строка, определяющая тип контента ("факт", "анекдот", "цитата").
:param lists: Необязательный список строк, из которого можно выбирать вручную.
:return: Случайный элемент из соответствующего списка.
"""
if lists is not None:
return choice(lists)
mode: str = mode.lower()
if mode == "анекдот":
source: list[str] = Lists.jokes
elif mode == "цитата":
source: list[str] = Lists.quotes
else:
source: list[str] = Lists.facts
return choice(source)

35
bot/utils/md2_escape.py Normal file
View File

@@ -0,0 +1,35 @@
from re import sub, escape
from configs.config import BotSettings
# Настройка экспорта в модули
__all__ = ("textmd2",)
def textmd2(msg: str,
parse_mode: str = BotSettings.PARSE_MODE,
special_chars: str = r"_*[]()~`>#+-=|{}.!") -> str:
"""
Экранирует специальные символы MarkdownV2 в переданном тексте.
:param msg: Входной текст в виде строки.
:param parse_mode: Формат форматирования ('MarkdownV2' или 'HTML').
:param special_chars: Символы, которые необходимо экранировать.
:return: Экранированный текст или исходный текст, если формат HTML.
:raises TypeError: Если передан не строковый тип данных.
:raises ValueError: Если parse_mode задан некорректно.
"""
if not isinstance(msg, str):
raise TypeError(f"Ожидается строка, но получено {type(msg).__name__}")
if not isinstance(parse_mode, str):
raise TypeError(f"parse_mode должен быть строкой, но получено {type(parse_mode).__name__}")
if parse_mode.strip().lower() == "html":
return msg
elif parse_mode in {"markdownv2", "markdown"}:
return sub(rf"([{escape(special_chars)}])", r"\\\1", msg)
else:
raise ValueError(f"Недопустимое значение parse_mode: '{parse_mode}'. Ожидалось 'HTML' или 'MarkdownV2'")

28
bot/utils/pagination.py Normal file
View File

@@ -0,0 +1,28 @@
from aiogram.types import InlineKeyboardButton
# Настройка экспорта в модули
__all__ = ('pagination_btn',)
def pagination_btn(action: str,
page: int = 0,
total_posts: int = 0,
bt_page: int = 5) -> list[InlineKeyboardButton]:
"""
Создает кнопки для пагинации.
:param action: Действие в котором нужна пангинация.
:param page: Номер начальной страницы, по умолчанию 0.
:param total_posts: Количество постов.
:param bt_page: Количество кнопок на одной странице.
:return: Готовый лист списка инлайн-кнопок.
"""
navigation_buttons: list[InlineKeyboardButton] = []
if page > 0:
navigation_buttons.append(InlineKeyboardButton(
text="", callback_data=f"{action}_page_{page - 1}"
))
if (page + 1) * bt_page < total_posts:
navigation_buttons.append(InlineKeyboardButton(
text="", callback_data=f"{action}_page_{page + 1}"
))
return navigation_buttons

21
bot/utils/usernames.py Normal file
View File

@@ -0,0 +1,21 @@
from aiogram.types import Message
# Настройка экспорта в модули
__all__ = ('username', )
# Функция получения юзера или ID пользователя
def username(message: Message) -> str:
"""
Возвращает юзернейм пользователя из сообщения, или ID, если юзернейм не указан.
:param message: Объект сообщения из aiogram.
:return: Строка с юзернеймом пользователя или его ID.
:raises ValueError: Если в сообщении отсутствует информация о пользователе.
"""
try:
if message.from_user:
return f"@{message.from_user.username}" if message.from_user.username else f"@{message.from_user.id}"
raise ValueError("Информация о пользователе отсутствует в сообщении.")
except ValueError as e:
raise e # Перебрасываем ошибку выше для дальнейшей обработки