Версия 1.0

This commit is contained in:
Whyverum
2025-05-20 09:12:05 +07:00
commit 0b3b957c0a
34 changed files with 1964 additions and 0 deletions

4
BotCode/__init__.py Normal file
View File

@@ -0,0 +1,4 @@
from .config import *
from .handlers import *
from .utils import *
from .config import *

19
BotCode/config.py Normal file
View File

@@ -0,0 +1,19 @@
# BotCode/config.py
from os import getenv
from ast import literal_eval
from dotenv import load_dotenv
# Загружаем переменные из файла .env
load_dotenv()
BOT_TOKEN: str|None = getenv('BOT_TOKEN', None)
BOT_DEBUG_TOKEN: str|None = getenv('BOT_DEBUG_TOKEN', None)
ADMIN_ID: tuple[int] = literal_eval(getenv('ADMIN_ID', '[6751720805]'))
PARSE_MODE: str = getenv('PARSE_MODE', "HTML")
LOGGING_TO_CONSOLE: bool = getenv('LOGGING_TO_CONSOLE', "False").lower() == 'true'
DEBUG_MODE: bool = getenv('DEBUG_MODE', "False").lower() == 'true'
POSTS_DIR: str = getenv('POSTS_DIR', "posts")

0
BotCode/core/__init__.py Normal file
View File

234
BotCode/core/storage.py Normal file
View File

@@ -0,0 +1,234 @@
import json
from os import path, makedirs, listdir
from typing import Any, Dict, List, Optional
from BotCode.config import POSTS_DIR
from BotCode.loggers import logs
class PostStorage:
"""Класс для управления хранением постов и связанных уведомлений."""
def __init__(self, posts_dir: str = POSTS_DIR):
self.posts_dir = posts_dir
self.global_posts: Dict[str, Dict[str, Any]] = {}
self.notifications: Dict[str, Dict[str, Any]] = {}
self.alert_texts: Dict[str, Dict[str, Any]] = {}
self._ensure_posts_dir()
self.load_all_posts()
def _ensure_posts_dir(self, directory: Optional[str] = None) -> None:
"""Создаёт директорию для хранения постов, если она не существует."""
dir_path = directory or self.posts_dir
if not path.isdir(dir_path):
makedirs(dir_path, exist_ok=True)
logs.info(
f"Created posts directory: {dir_path}",
log_type="STORAGE",
)
def _get_user_posts_file(self, user_id: int) -> str:
"""Возвращает путь к файлу с постами пользователя."""
return path.join(self.posts_dir, f"posts_{user_id}.json")
def _update_button_notifications(self, callback_data: str, notification_data: Dict[str, Any]) -> None:
"""Регистрирует данные уведомления кнопки во внутренних хранилищах."""
if not callback_data:
return
self.alert_texts[callback_data] = notification_data
self.notifications[callback_data] = notification_data
def _process_buttons(self, post_id: str, buttons: List[Any]) -> None:
"""
Обрабатывает кнопки поста, нормализует callback_data и регистрирует уведомления.
Поддерживает различные типы кнопок: callback, url, copy, inline.
"""
if not buttons:
return
for row_idx, row in enumerate(buttons):
btns = row if isinstance(row, list) else [row]
for col_idx, button in enumerate(btns):
if not isinstance(button, dict):
continue
if 'callback_data' in button:
cb_data = button['callback_data']
if not cb_data or not (cb_data.startswith('bt_') or cb_data.startswith('show_alert_')):
prefix = 'show_alert_' if button.get('show_alert') else 'bt_'
button['callback_data'] = f"{prefix}{post_id}_{row_idx}_{col_idx}"
cb_data = button['callback_data']
if 'notification' in button:
notification = {
'text': button['notification'],
'show_alert': button.get('show_alert', False),
'allowed_ids': button.get('allowed_ids'),
'unauthorized_message': button.get('unauthorized_message')
}
self._update_button_notifications(cb_data, notification)
logs.debug(
f"Registered notification for {cb_data}",
log_type="STORAGE",
)
def load_user_posts(self, user_id: int) -> Dict[str, Any]:
"""Загружает посты пользователя из файла."""
file_path = self._get_user_posts_file(user_id)
try:
if path.isfile(file_path):
with open(file_path, 'r', encoding='utf-8') as f:
posts = json.load(f)
if isinstance(posts, dict):
return posts
logs.warning(
f"Invalid posts format in {file_path}",
log_type="STORAGE",
)
except json.JSONDecodeError as e:
logs.error(
f"JSON decode error in {file_path}: {str(e)}",
log_type="STORAGE",
)
except Exception as e:
logs.error(
f"Error loading posts from {file_path}: {str(e)}",
log_type="STORAGE",
)
return {}
def save_user_posts(self, user_id: int, posts: Dict[str, Any]) -> None:
"""
Сохраняет посты пользователя в файл и обновляет внутренние хранилища.
Обрабатывает кнопки и уведомления перед сохранением.
"""
if not isinstance(posts, dict):
logs.error(
"Invalid posts format, expected dict",
log_type="STORAGE",
)
return
for post_id, post in posts.items():
if isinstance(post, dict) and 'buttons' in post:
self._process_buttons(post_id, post['buttons'])
file_path = self._get_user_posts_file(user_id)
try:
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(posts, f, ensure_ascii=False, indent=4)
logs.info(
f"Saved posts for user {user_id}",
log_type="STORAGE",
)
except Exception as e:
logs.error(
f"Error saving posts to {file_path}: {str(e)}",
log_type="STORAGE",
)
return
# Обновление кэша: перезагружаем записи этого пользователя
# Удаляем старые записи
for pid in list(self.global_posts):
if pid in posts:
self.global_posts.pop(pid, None)
# Загружаем свежие
fresh = self.load_user_posts(user_id)
for pid, post in fresh.items():
if isinstance(post, dict) and 'buttons' in post:
self._process_buttons(pid, post['buttons'])
self.global_posts[pid] = post
def delete_user_post(self, user_id: int, post_id: str) -> bool:
"""Удаляет пост пользователя и связанные уведомления. Возвращает статус операции."""
user_posts = self.load_user_posts(user_id)
if post_id not in user_posts:
logs.warning(
f"Post {post_id} not found for user {user_id}",
log_type="STORAGE",
)
return False
post = user_posts.pop(post_id)
notification_count = 0
if isinstance(post.get('buttons'), list):
for row in post['buttons']:
btns = row if isinstance(row, list) else [row]
for button in btns:
if isinstance(button, dict):
cb = button.get('callback_data')
if cb and cb in self.alert_texts:
self.alert_texts.pop(cb)
self.notifications.pop(cb, None)
notification_count += 1
logs.debug(
f"Removed {notification_count} notifications for post {post_id}",
log_type="STORAGE",
)
# Сохраняем и обновляем кэш
self.save_user_posts(user_id, user_posts)
self.global_posts.pop(post_id, None)
logs.info(
f"Deleted post {post_id} for user {user_id}",
log_type="STORAGE",
)
return True
def is_post_available(self, post_id: str) -> bool:
"""Проверяет доступность идентификатора поста."""
return post_id not in self.global_posts
def load_all_posts(self) -> None:
"""Загружает все посты из файлов в рабочей директории."""
self.global_posts.clear()
self.alert_texts.clear()
self.notifications.clear()
self._ensure_posts_dir()
loaded_files = 0
loaded_posts = 0
try:
for filename in listdir(self.posts_dir):
if filename.endswith('.json'):
user_id_str = filename[len('posts_'):-len('.json')]
try:
user_id = int(user_id_str)
except ValueError:
logs.warning(
f"Invalid filename format: {filename}",
log_type="STORAGE",
)
continue
posts = self.load_user_posts(user_id)
for pid, post in posts.items():
if isinstance(post, dict) and 'buttons' in post:
self._process_buttons(pid, post['buttons'])
self.global_posts[pid] = post
loaded_posts += 1
loaded_files += 1
except Exception as e:
logs.error(
f"Error loading all posts: {str(e)}",
log_type="STORAGE",
)
logs.info(
f"Loaded {loaded_posts} posts from {loaded_files} files",
log_type="STORAGE",
)
def get_post(self, post_id: str) -> Optional[Dict[str, Any]]:
"""Возвращает пост по идентификатору или None если не найден."""
return self.global_posts.get(post_id)
def get_notification(self, callback_data: str) -> Optional[Dict[str, Any]]:
"""Возвращает данные уведомления для указанного callback."""
return self.notifications.get(callback_data)
# Инициализация хранилища при импорте модуля
storage = PostStorage()

View File

@@ -0,0 +1,16 @@
from aiogram import Router
from .post import router as post_routers
from .commands import router as cmd_routers
from .callback import router as callback_router
from .inline import router as inline_router
router = Router(name=__name__)
# Include routers with different priorities
router.include_routers(
cmd_routers,
callback_router,
post_routers,
inline_router
)

View File

@@ -0,0 +1,48 @@
# BotCode/handlers/callback.py
from aiogram import Router, F
from aiogram.types import CallbackQuery
from BotCode.core.storage import storage
router = Router(name="callback_router")
@router.callback_query(F.data.startswith("bt_"))
@router.callback_query(F.data.startswith("show_alert_"))
async def handle_button_alert(callback_query: CallbackQuery) -> None:
key = callback_query.data
user_id = callback_query.from_user.id
# Получаем уведомление через хранилище
notif = storage.get_notification(key)
if not notif:
await callback_query.answer()
return
# Проверяем права доступа
allowed = notif.get("allowed_ids")
if allowed and user_id not in allowed:
msg = notif.get("unauthorized_message", "У вас нет доступа к этому уведомлению.")
await callback_query.answer(text=msg, show_alert=True)
return
text = notif.get("text", "")
show_alert = notif.get("show_alert", False)
try:
await callback_query.answer(text=text, show_alert=show_alert)
except Exception as e:
try:
await callback_query.answer(text="Произошла ошибка при отображении уведомления.", show_alert=True)
except:
pass
@router.callback_query(F.data == "void")
async def handle_void_callback(callback_query: CallbackQuery) -> None:
"""
Обработка пустых callback-запросов (void).
Просто отвечает на callback без уведомления.
"""
try:
await callback_query.answer()
except Exception as e:
return

View File

@@ -0,0 +1,7 @@
from aiogram import Router
from .start_cmd import router as start_cmd_router
__all__ = ('router',)
router = Router(name="post_router")
router.include_routers(start_cmd_router,)

View File

@@ -0,0 +1,36 @@
# BotCode/handlers/commands/start_cmd.py
from aiogram import Router, types
from aiogram.filters import CommandStart
router = Router(name=__name__)
__all__ = ("router",)
@router.message(CommandStart())
async def start_cmd(message: types.Message) -> None:
"""
Обработчик команды /start.
:param message: Объект сообщения и информации о нем.
:return: Вывод сообщения для администратора, о выборе режимов работы.
"""
from BotCode.loggers import logs
from BotCode.utils import textmd2
logs.info(text="использовал(а) команду /start", log_type="Start", message=message)
if message.from_user.id:
# Создаем клавиатурный билдер
from aiogram.utils.keyboard import ReplyKeyboardBuilder
rkb: ReplyKeyboardBuilder = ReplyKeyboardBuilder()
rkb.row(types.KeyboardButton(text="Создать пост📔"))
rkb.row(types.KeyboardButton(text="Посмотреть список📋"))
# Отправка фотографии с текстом и клавиатурой
from aiogram.types.input_file import FSInputFile
await message.reply_photo(
photo=FSInputFile('assets/start.jpg'),
caption=textmd2("Добро пожаловать в систему, Босс!"),
reply_markup=rkb.as_markup(resize_keyboard=True)
)
else:
await message.reply(text=textmd2("Простите, вы не мой Босс!❌\nОбратитесь к @verdise!"))

179
BotCode/handlers/inline.py Normal file
View File

@@ -0,0 +1,179 @@
# BotCode/handlers/inline.py
from aiogram import Router
from aiogram.types import (
InlineKeyboardButton,
InlineKeyboardMarkup,
InlineQuery,
InputTextMessageContent,
InlineQueryResultArticle,
SwitchInlineQueryChosenChat,
CopyTextButton,
)
from aiogram.utils.markdown import hide_link
from BotCode.core.storage import storage
from BotCode.utils import textmd2
from BotCode.config import PARSE_MODE
from BotCode.loggers import logs
router = Router(name="inline_send")
def build_markup(buttons_def: list[list[dict]]) -> InlineKeyboardMarkup | None:
"""
Создаёт InlineKeyboardMarkup из списка описаний кнопок.
Поддерживает URL, callback, inline-моды.
Обрабатывает "void"-кнопки как callback_data="void".
Для switch_inline_query_chosen_chat устанавливает хотя бы один allow_* True.
"""
if not buttons_def:
return None
rows: list[list[InlineKeyboardButton]] = []
for row_idx, row in enumerate(buttons_def):
if not isinstance(row, list):
logs.warning(f"Некорректный формат ряда кнопок: {row}")
continue
kb_row: list[InlineKeyboardButton] = []
for col_idx, b in enumerate(row):
if not isinstance(b, dict):
logs.warning(f"Некорректный формат кнопки в ряду {row_idx}: {b}")
continue
text = b.get("text", "")
if not text:
logs.warning(f"Пустой текст кнопки в ряду {row_idx}, колонке {col_idx}")
continue
btn = None
try:
if "url" in b:
url = b["url"]
if url.lower().endswith("void"):
btn = InlineKeyboardButton(text=text, callback_data="void")
else:
btn = InlineKeyboardButton(text=text, url=url)
elif "switch_inline_query" in b:
btn = InlineKeyboardButton(
text=text,
switch_inline_query=b["switch_inline_query"]
)
elif "switch_inline_query_current_chat" in b:
btn = InlineKeyboardButton(
text=text,
switch_inline_query_current_chat=b["switch_inline_query_current_chat"]
)
elif "switch_inline_query_chosen_chat" in b:
query = b["switch_inline_query_chosen_chat"]
if isinstance(query, dict):
siqcc = SwitchInlineQueryChosenChat(
query=query.get("query", ""),
allow_user_chats=query.get("allow_user_chats", True),
allow_group_chats=query.get("allow_group_chats", True),
allow_channel_chats=query.get("allow_channel_chats", True),
allow_bot_chats=query.get("allow_bot_chats", False),
)
else:
siqcc = SwitchInlineQueryChosenChat(
query=query,
allow_user_chats=True,
allow_group_chats=True,
allow_channel_chats=True,
allow_bot_chats=False,
)
btn = InlineKeyboardButton(
text=text,
switch_inline_query_chosen_chat=siqcc
)
elif "copy_text" in b:
btn = InlineKeyboardButton(
text=text,
copy_text=CopyTextButton(text=b["copy_text"])
)
elif "callback_data" in b:
btn = InlineKeyboardButton(
text=text,
callback_data=b["callback_data"]
)
except Exception as e:
logs.error(f"Ошибка при создании кнопки в ряду {row_idx}, колонке {col_idx}: {e}")
continue
if btn:
kb_row.append(btn)
if kb_row:
rows.append(kb_row)
if not rows:
return None
return InlineKeyboardMarkup(inline_keyboard=rows)
@router.inline_query()
async def inline_query_handler(inline_query: InlineQuery):
"""
Обрабатывает инлайн-запросы для поиска и отправки постов.
Фильтрует посты по приватности и поисковому запросу.
"""
# Перезагружаем все посты из файлов на случай изменений
storage.load_all_posts()
query = inline_query.query or ""
user_id = inline_query.from_user.id
username = inline_query.from_user.username or f"user_{user_id}"
logs.debug(f"Получен инлайн-запрос от {username} (ID: {user_id}): {query}")
results = []
for post_id, post in storage.global_posts.items():
try:
# Проверка приватности
if post.get("private") and post.get("user_id") != user_id:
continue
# Проверка поискового запроса
if query and query.lower() not in post_id.lower():
continue
# Тело сообщения
text = textmd2(post.get("text", ""))
image = post.get("image", "")
if image and image.startswith("http"):
text = f"{hide_link(image)}{text}"
# Клавиатура
markup = build_markup(post.get("buttons", []))
results.append(
InlineQueryResultArticle(
id=post_id,
title=f"Пост {post_id}",
description=(post.get("text", "")[:100] + "...") if len(post.get("text", "")) > 100 else post.get(
"text", ""),
input_message_content=InputTextMessageContent(
message_text=text,
parse_mode=PARSE_MODE
),
reply_markup=markup
)
)
except Exception as e:
logs.error(f"Ошибка при обработке поста {post_id}: {e}")
continue
logs.info(f"Отправлено {len(results)} результатов для запроса '{query}' от {username} (ID: {user_id})")
try:
await inline_query.answer(results, cache_time=0, is_personal=True)
except Exception as e:
logs.error(f"Ошибка при отправке результатов инлайн-запроса: {e}")
__all__ = [
'router',
'inline_query_handler'
]

View File

@@ -0,0 +1,10 @@
from aiogram import Router
from .create_posts import router as posts_router
from .post_list import router as post_list_router
router = Router(name="post_router")
router.include_routers(
posts_router,
post_list_router,
)

View File

@@ -0,0 +1,210 @@
import uuid
from threading import Lock
from aiogram import Router, F
from aiogram.types import (
Message, CallbackQuery,
InlineKeyboardButton, InlineKeyboardMarkup
)
from aiogram.fsm.state import State, StatesGroup
from aiogram.fsm.context import FSMContext
from BotCode.core.storage import storage
from BotCode.utils import textmd2
router = Router()
class PostState(StatesGroup):
waiting_for_text = State()
waiting_for_privacy = State()
waiting_for_id = State()
waiting_for_image = State()
waiting_for_buttons = State()
post_id_lock = Lock()
# --- Utility functions ---
def make_inline_markup(rows: list[list[InlineKeyboardButton]]) -> InlineKeyboardMarkup:
return InlineKeyboardMarkup(inline_keyboard=rows)
def cancel_button() -> InlineKeyboardMarkup:
return make_inline_markup([[InlineKeyboardButton(text="Отмена", callback_data="cancel_creation")]])
def privacy_markup(is_private: bool) -> InlineKeyboardMarkup:
toggle = InlineKeyboardButton(
text="🔒 Приватный" if is_private else "🔓 Публичный",
callback_data="toggle_privacy"
)
cont = InlineKeyboardButton(text="Продолжить ➡️", callback_data="continue_creation")
return make_inline_markup([[toggle], [cont]])
def parse_buttons(text: str) -> list[list[dict]]:
rows: list[list[dict]] = []
current: list[dict] = []
for raw in text.splitlines():
line = raw.strip()
if not line:
if current:
rows.append(current)
current = []
continue
if '|' not in line:
raise ValueError(f"Неверный формат кнопки: '{line}'")
label, action = map(str.strip, line.split('|', 1))
btn: dict = {"text": label}
if action.startswith('notification:'):
btn['notification'] = action.split(':', 1)[1]
btn['show_alert'] = True
elif action.startswith('copy:'):
btn['callback_data'] = f"copy_{uuid.uuid4().hex}"
btn['copy_text'] = action.split(':', 1)[1]
elif action.startswith('switch_inline:'):
btn['switch_inline_query'] = action.split(':', 1)[1]
elif action.startswith('switch_inline_current:'):
btn['switch_inline_query_current_chat'] = action.split(':', 1)[1]
elif action.startswith('switch_inline_chosen:'):
btn['switch_inline_query_chosen_chat'] = action.split(':', 1)[1]
elif action.startswith(('http://', 'https://')):
btn['url'] = action
else:
btn['callback_data'] = action
current.append(btn)
if current:
rows.append(current)
return rows
# --- Handlers ---
@router.message(F.text == "Создать пост📔")
async def start_creation(message: Message, state: FSMContext):
await state.set_state(PostState.waiting_for_text)
await state.update_data(private=False, buttons=[])
await message.reply(
textmd2(
"""Отправьте текст вашего поста:
Тест для проверки @userbotname
<b>Жирный</b>
<i>Курсив</i>
<u>Подчёркнутый</u>
<s>Зачёркнутый</s>
<code>Моноширинный</code>
<pre>Предварительно отформатированный</pre>
<a href=\"https://example.com\">Ссылка</a>
"""
),
reply_markup=cancel_button(), parse_mode=None
)
@router.message(PostState.waiting_for_text)
async def got_text(message: Message, state: FSMContext):
await state.update_data(text=message.text or message.caption or "")
await state.set_state(PostState.waiting_for_privacy)
data = await state.get_data()
await message.reply(
"Выберите приватность поста:",
reply_markup=privacy_markup(data.get('private', False))
)
@router.callback_query(lambda c: c.data == "toggle_privacy")
async def toggle_privacy(cq: CallbackQuery, state: FSMContext):
data = await state.get_data()
is_priv = not data.get('private', False)
await state.update_data(private=is_priv)
await cq.message.edit_reply_markup(
reply_markup=privacy_markup(is_priv)
)
await cq.answer()
@router.callback_query(lambda c: c.data == "continue_creation")
async def continue_to_id(cq: CallbackQuery, state: FSMContext):
await state.set_state(PostState.waiting_for_id)
await cq.message.edit_text("Введите уникальный ID поста (латиница, цифры, подчёрки):")
await cq.answer()
@router.message(PostState.waiting_for_id)
async def got_id(message: Message, state: FSMContext):
pid = message.text.strip()
if not pid.replace('_', '').isalnum():
await message.reply(
"ID должен содержать только латиницу, цифры и подчёркивания.",
reply_markup=cancel_button()
)
return
with post_id_lock:
if not storage.is_post_available(pid):
await message.reply(
text="Этот ID уже занят, введите другой:",
reply_markup=cancel_button()
)
return
await state.update_data(post_id=pid)
await state.set_state(PostState.waiting_for_image)
await message.reply(
text="Отправьте ссылку на изображение или 'нет':\n"
"Пример: https://img4.teletype.in/files/f2/47/...",
reply_markup=cancel_button()
)
@router.message(PostState.waiting_for_image)
async def got_image(message: Message, state: FSMContext):
img = message.text.strip()
if img.lower() in ('нет', 'no', 'none'):
img = ''
await state.update_data(image=img)
await state.set_state(PostState.waiting_for_buttons)
await message.reply(
textmd2(
"""Отправьте кнопки по шаблону:
Кнопка заглушка | void
Уведомление | notification:Для вас!
Кнопка ссылка | https://google.com
Копирование | copy:Копирование текста!
Для одного | callback_data | allowed_ids=123 | unauthorized_message=Нет доступа
Пустая строка — новый ряд. /done — закончить."""
),
reply_markup=cancel_button(), parse_mode=None
)
@router.message(PostState.waiting_for_buttons)
async def got_buttons(message: Message, state: FSMContext):
text = message.text.strip()
data = await state.get_data()
uid = message.from_user.id
pid = data['post_id']
try:
if text.lower() in ('/done', 'none'):
btns = data.get('buttons', []) if text == '/done' else []
posts = storage.load_user_posts(uid)
posts[pid] = {
'user_id': uid,
'text': data['text'],
'image': data['image'],
'buttons': btns,
'private': data.get('private', False)
}
storage.save_user_posts(uid, posts)
await message.reply(
f"✅ Пост создан! ID: {pid}\n"
f"{'🔒 Приватный' if data.get('private') else '🔓 Публичный'}\n"
f"Используйте: <code>@{(await message.bot.me()).username} {pid}</code>"
)
await state.clear()
return
rows = parse_buttons(text)
existing = data.get('buttons', [])
await state.update_data(buttons=existing + rows)
await message.reply(
text="✅ Кнопки добавлены. Добавьте ещё или /done для окончания.",
reply_markup=cancel_button()
)
except ValueError as err:
await message.reply(f"{err}")
@router.callback_query(lambda c: c.data == "cancel_creation")
async def cancel(cq: CallbackQuery, state: FSMContext):
await state.clear()
await cq.message.reply(textmd2("Процесс создания поста отменён."))
await cq.answer()

View File

@@ -0,0 +1,200 @@
from math import ceil
from aiogram import Router, F
from aiogram.types import (
Message, CallbackQuery,
InlineKeyboardButton, InlineKeyboardMarkup,
SwitchInlineQueryChosenChat, CopyTextButton
)
from aiogram.exceptions import TelegramBadRequest
from aiogram.utils.markdown import hide_link
from BotCode.core.storage import storage
from BotCode.utils.pagination import create_pagination_buttons
from BotCode.utils import textmd2
from BotCode.config import PARSE_MODE
router = Router(name="posts_manager")
PAGE_SIZE = 5
async def send_posts_list(
message: Message = None,
callback_query: CallbackQuery = None,
page: int = 0
) -> None:
"""Отправляет список постов пользователя с пагинацией."""
user_id = message.from_user.id if message else callback_query.from_user.id
posts = storage.load_user_posts(user_id)
if not posts:
msg = "Нет сохранённых постов."
if message:
await message.answer(msg)
else:
await callback_query.answer(msg, show_alert=True)
return
post_ids = list(posts.keys())
total = len(post_ids)
pages = ceil(total / PAGE_SIZE)
page = max(0, min(page, pages - 1))
start = page * PAGE_SIZE
end = start + PAGE_SIZE
current_ids = post_ids[start:end]
rows: list[list[InlineKeyboardButton]] = []
for pid in current_ids:
post = posts[pid]
priv = "🔒" if post.get("private") else "🔓"
btn = InlineKeyboardButton(
text=f"{priv} Пост {pid}",
callback_data=f"view_post_{pid}"
)
rows.append([btn])
# Пагинация
nav_buttons = create_pagination_buttons(
action="open_post_list",
page=page,
total_posts=total,
bt_page=PAGE_SIZE
)
if nav_buttons:
rows.append(nav_buttons)
# Кнопка закрытия
rows.append([InlineKeyboardButton(text="Закрыть❌", callback_data="cancel_list")])
keyboard = InlineKeyboardMarkup(inline_keyboard=rows)
header = "Список ваших постов:"
try:
if callback_query:
await callback_query.message.edit_text(header, reply_markup=keyboard)
else:
await message.answer(header, reply_markup=keyboard)
except TelegramBadRequest:
if callback_query:
await callback_query.message.delete()
await callback_query.message.answer(header, reply_markup=keyboard)
else:
await message.answer(header, reply_markup=keyboard)
# --- Хендлеры списка ---
@router.message(F.text.lower() == "посмотреть список📋")
async def cmd_list(message: Message):
await send_posts_list(message=message)
@router.callback_query(F.data == "open_post_list")
async def cb_open_list(cq: CallbackQuery):
await send_posts_list(callback_query=cq)
await cq.answer()
@router.callback_query(lambda c: c.data and c.data.startswith("open_post_list_page_"))
async def cb_paginate(cq: CallbackQuery):
try:
page = int(cq.data.rsplit("_", 1)[-1])
except ValueError:
await cq.answer("Некорректная страница", show_alert=True)
return
await send_posts_list(callback_query=cq, page=page)
await cq.answer()
@router.callback_query(F.data == "cancel_list")
async def cb_cancel(cq: CallbackQuery):
await cq.message.delete()
await cq.answer()
# --- Просмотр отдельного поста ---
@router.callback_query(lambda c: c.data and c.data.startswith("view_post_"))
async def view_post_callback(cq: CallbackQuery):
pid = cq.data.replace("view_post_", "")
uid = cq.from_user.id
posts = storage.load_user_posts(uid)
if pid not in posts:
await cq.answer("Пост не найден", show_alert=True)
return
post = posts[pid]
text = textmd2(post.get("text", ""))
img = post.get("image", "")
if img.startswith("http"):
text = f"{hide_link(img)}{text}"
rows: list[list[InlineKeyboardButton]] = []
for row in post.get("buttons", []):
btns: list[InlineKeyboardButton] = []
for b in row:
if "copy_text" in b:
btns.append(
InlineKeyboardButton(
text=b["text"],
copy_text=CopyTextButton(text=b["copy_text"])
)
)
elif "switch_inline_query" in b:
btns.append(
InlineKeyboardButton(
text=b["text"],
switch_inline_query=b["switch_inline_query"]
)
)
elif "switch_inline_query_current_chat" in b:
btns.append(
InlineKeyboardButton(
text=b["text"],
switch_inline_query_current_chat=b["switch_inline_query_current_chat"]
)
)
elif "switch_inline_query_chosen_chat" in b:
raw = b["switch_inline_query_chosen_chat"]
cfg = raw if isinstance(raw, dict) else {
"query": raw,
"allow_user_chats": True
}
btns.append(
InlineKeyboardButton(
text=b["text"],
switch_inline_query_chosen_chat=SwitchInlineQueryChosenChat(**cfg)
)
)
elif "url" in b:
url = b["url"]
if url.lower().endswith("void"):
btns.append(
InlineKeyboardButton(text=b["text"], callback_data="void")
)
else:
btns.append(
InlineKeyboardButton(text=b["text"], url=url)
)
elif "callback_data" in b:
btns.append(
InlineKeyboardButton(text=b["text"], callback_data=b["callback_data"])
)
if btns:
rows.append(btns)
# Удалить / назад
rows.append([
InlineKeyboardButton(text="Удалить❌", callback_data=f"delete_post_{pid}"),
InlineKeyboardButton(text="Назад◀️", callback_data="open_post_list")
])
keyboard = InlineKeyboardMarkup(inline_keyboard=rows)
await cq.message.answer(text=text, reply_markup=keyboard, parse_mode=PARSE_MODE)
await cq.message.delete()
await cq.answer()
# --- Удаление поста ---
@router.callback_query(lambda c: c.data and c.data.startswith("delete_post_"))
async def delete_post_callback(cq: CallbackQuery):
pid = cq.data.replace("delete_post_", "")
uid = cq.from_user.id
if storage.delete_user_post(uid, pid):
await cq.answer(f"Пост {pid} удалён")
await send_posts_list(callback_query=cq)
else:
await cq.answer("Не удалось удалить пост", show_alert=True)

View File

@@ -0,0 +1,5 @@
# BotLibrary/loggers/__init__.py
# Инициализация модуля loggers, для настройки логеров
# Экспортирование модулей во внешние слои проекта
from .logs import *

147
BotCode/loggers/logs.py Normal file
View File

@@ -0,0 +1,147 @@
"""
Модуль логирования для Telegram-бота.
Особенности:
* Вывод логов в консоль и/или файл
* Автоматическая ротация и удержание
* Форматирование с информацией о системе, типе события и пользователе
* Удобные методы для разных уровней логирования
"""
from sys import stderr
from pathlib import Path
from typing import Optional, Final, Union
from loguru import logger
from aiogram.types import Message, User
try:
from config import LogConfig
except ImportError:
class LogConfig:
"""Запасные настройки логирования, если config недоступен."""
CONSOLE: Final[bool] = True
FILE: Final[bool] = True
DIR: Final[Path] = Path('Logs')
ROTATION: Final[str] = '100 MB'
RETENTION: Final[str] = '7 days'
# Настройка экспорта в модули
__all__ = ['Logs', 'logs']
class Logs:
"""
Класс для работы с логированием через loguru.
"""
_SYSTEM_NAME: Final[str] = 'PRIMO' # Исправлено: убран обратный слэш
_LOG_FORMAT: Final[str] = (
'<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> <red>|</red> ' # Исправлено форматирование времени
'<blue>{extra[system]}-{extra[log_type]}</blue> <red>| '
'{extra[user]} |</red> <level>{message}</level>'
)
@staticmethod
def _format_user(message: Optional[Message]) -> str:
"""
Форматирует информацию о пользователе из сообщения.
"""
if not message or not message.from_user:
return '@System'
user: User = message.from_user
return f"@{user.username}" if user.username else f"id{user.id}"
@classmethod
def _log(cls,
level: Union[str, int],
text: str,
log_type: str,
message: Optional[Message] = None) -> None:
"""Внутренний метод логирования."""
user_ctx = cls._format_user(message)
logger.bind(
system=cls._SYSTEM_NAME,
user=user_ctx,
log_type=log_type,
).log(level, text)
@classmethod
def setup(cls, start: bool = True) -> None:
"""Инициализация логирования: консоль и/или файл."""
logger.remove()
# Консольный вывод
if getattr(LogConfig, 'CONSOLE', False):
logger.add(
stderr,
format=cls._LOG_FORMAT,
colorize=True,
level='DEBUG',
filter=lambda rec: rec['extra'].get('log_type') != 'DEBUG'
)
# Файловый вывод с ротацией
if getattr(LogConfig, 'FILE', False):
log_dir = getattr(LogConfig, 'DIR', Path('logs'))
log_dir.mkdir(parents=True, exist_ok=True)
logger.add(
log_dir / 'bot.log',
rotation=getattr(LogConfig, 'ROTATION', '100 MB'),
retention=getattr(LogConfig, 'RETENTION', '7 days'),
format=cls._LOG_FORMAT,
level='DEBUG',
enqueue=True,
backtrace=True,
diagnose=True
)
# Добавляем вызов start() если нужно
if start:
cls.start()
@classmethod
def start(cls, text: str = 'Запуск бота...', log_type: str = 'START') -> None:
"""Логирование старта приложения."""
cls._log(level='INFO', text=text, log_type=log_type)
@classmethod
def debug(cls,
text: str,
log_type: str = 'DEBUG',
message: Optional[Message] = None) -> None:
cls._log(level='DEBUG', text=text, log_type=log_type, message=message)
@classmethod
def info(cls,
text: str,
log_type: str = 'INFO',
message: Optional[Message] = None) -> None:
cls._log(level='INFO', text=text, log_type=log_type, message=message)
@classmethod
def warning(cls,
text: str,
log_type: str = 'WARNING',
message: Optional[Message] = None) -> None:
cls._log(level='WARNING', text=text, log_type=log_type, message=message)
@classmethod
def error(cls,
text: str,
log_type: str = 'ERROR',
message: Optional[Message] = None) -> None:
cls._log(level='ERROR', text=text, log_type=log_type, message=message)
@classmethod
def exception(cls,
text: str,
exception: Exception,
log_type: str = 'EXCEPTION',
message: Optional[Message] = None) -> None:
full_text = f"{text}\nException: {exception!r}"
cls._log(level='ERROR', text=full_text, log_type=log_type, message=message)
# Инициализация экземпляра логгера
logs = Logs()
logs.setup()

View File

@@ -0,0 +1,3 @@
from .md2_escape import *
from .usernames import *
from .pagination import *

View File

@@ -0,0 +1,36 @@
# BotCode/utils/md2_escape.py
from BotCode.config import PARSE_MODE
# Настройка экспорта в модули
__all__ = ("textmd2",)
def textmd2(msg: str,
parse_mode: str = PARSE_MODE,
special_chars: str = r"_*[]()~`>#+-=|{}.!") -> str:
"""
Экранирует специальные символы MarkdownV2 в переданном тексте.
:param msg: Входной текст в виде строки.
:param parse_mode: Формат форматирования ('MarkdownV2' или 'HTML').
:param special_chars: Символы, которые необходимо экранировать.
:return: Экранированный текст или исходный текст, если формат HTML.
:raises TypeError: Если передан не строковый тип данных.
:raises ValueError: Если parse_mode задан некорректно.
"""
from re import sub, escape
if not isinstance(msg, str):
raise TypeError(f"Ожидается строка, но получено {type(msg).__name__}")
if not isinstance(parse_mode, str):
raise TypeError(f"parse_mode должен быть строкой, но получено {type(parse_mode).__name__}")
if parse_mode.strip().lower() == "html":
return msg
elif parse_mode in {"markdownv2", "markdown"}:
return sub(rf"([{escape(special_chars)}])", r"\\\1", msg)
else:
raise ValueError(f"Недопустимое значение parse_mode: '{parse_mode}'. Ожидалось 'HTML' или 'MarkdownV2'")

View File

@@ -0,0 +1,22 @@
# BotCode/utils/pagination.py
from typing import List
from aiogram.types import InlineKeyboardButton
# Настройка экспорта в модули
__all__ = ('create_pagination_buttons',)
def create_pagination_buttons(action: str,
page: int = 0,
total_posts: int = 0,
bt_page: int = 5) -> List[InlineKeyboardButton]:
"""Создает кнопки для пагинации."""
navigation_buttons = []
if page > 0:
navigation_buttons.append(InlineKeyboardButton(
text="", callback_data=f"{action}_page_{page - 1}"
))
if (page + 1) * bt_page < total_posts:
navigation_buttons.append(InlineKeyboardButton(
text="", callback_data=f"{action}_page_{page + 1}"
))
return navigation_buttons

View File

@@ -0,0 +1,22 @@
# BotCode/utils/username.py
from aiogram.types import Message
# Настройка экспорта в модули
__all__ = ('username', )
# Функция получения юзера или ID пользователя
def username(message: Message) -> str:
"""
Возвращает юзернейм пользователя из сообщения, или ID, если юзернейм не указан.
:param message: Объект сообщения из aiogram.
:return: Строка с юзернеймом пользователя или его ID.
:raises ValueError: Если в сообщении отсутствует информация о пользователе.
"""
try:
if message.from_user:
return f"@{message.from_user.username}" if message.from_user.username else f"@{message.from_user.id}"
raise ValueError("Информация о пользователе отсутствует в сообщении.")
except ValueError as e:
raise e # Перебрасываем ошибку выше для дальнейшей обработки