Compare commits

..

10 Commits

10 changed files with 2755 additions and 0 deletions

253
bot/filters/callback.py Normal file
View File

@@ -0,0 +1,253 @@
"""
Фильтры для обработки callback-запросов
"""
import re
from typing import Union
from aiogram.filters import BaseFilter
from aiogram.types import CallbackQuery
from middleware.loggers import logger
__all__ = (
'CallbackStartsWith',
'CallbackEndsWith',
'CallbackContains',
'CallbackMatches',
'CallbackIn'
)
class CallbackStartsWith(BaseFilter):
"""
Проверяет, начинается ли callback_data с указанного префикса.
Attributes:
prefix: Префикс для проверки (строка или список строк)
ignore_case: Игнорировать регистр
Example:
```python
# Один префикс
@router.callback_query(CallbackStartsWith("menu:"))
async def menu_handler(callback: CallbackQuery):
await callback.answer("Меню")
# Несколько префиксов
@router.callback_query(CallbackStartsWith(["admin:", "mod:"]))
async def admin_handler(callback: CallbackQuery):
await callback.answer("Админ панель")
```
"""
def __init__(self, prefix: Union[str, list[str]], ignore_case: bool = True):
"""
Args:
prefix: Префикс или список префиксов
ignore_case: Игнорировать регистр букв
"""
self.prefixes = [prefix] if isinstance(prefix, str) else prefix
self.ignore_case = ignore_case
if self.ignore_case:
self.prefixes = [p.lower() for p in self.prefixes]
async def __call__(self, callback: CallbackQuery) -> Union[bool, dict]:
if not callback.data:
return False
data = callback.data.lower() if self.ignore_case else callback.data
for prefix in self.prefixes:
if data.startswith(prefix):
# Извлекаем данные после префикса
value = callback.data[len(prefix):]
logger.debug(
f"Callback с префиксом '{prefix}': {callback.data}",
log_type='CALLBACK'
)
return {
'matched': True,
'prefix': prefix,
'value': value,
'full_data': callback.data
}
return False
class CallbackEndsWith(BaseFilter):
"""
Проверяет, заканчивается ли callback_data на указанный суффикс.
Example:
```python
@router.callback_query(CallbackEndsWith(":confirm"))
async def confirm_handler(callback: CallbackQuery, matched: dict):
action = matched['value']
await callback.answer(f"Подтверждение: {action}")
```
"""
def __init__(self, suffix: Union[str, list[str]], ignore_case: bool = True):
"""
Args:
suffix: Суффикс или список суффиксов
ignore_case: Игнорировать регистр букв
"""
self.suffixes = [suffix] if isinstance(suffix, str) else suffix
self.ignore_case = ignore_case
if self.ignore_case:
self.suffixes = [s.lower() for s in self.suffixes]
async def __call__(self, callback: CallbackQuery) -> Union[bool, dict]:
if not callback.data:
return False
data = callback.data.lower() if self.ignore_case else callback.data
for suffix in self.suffixes:
if data.endswith(suffix):
# Извлекаем данные до суффикса
value = callback.data[:-len(suffix)]
return {
'matched': True,
'suffix': suffix,
'value': value,
'full_data': callback.data
}
return False
class CallbackContains(BaseFilter):
"""
Проверяет, содержит ли callback_data указанную подстроку.
Example:
```python
@router.callback_query(CallbackContains("delete"))
async def delete_handler(callback: CallbackQuery):
await callback.answer("Удаление...")
```
"""
def __init__(self, substring: Union[str, list[str]], ignore_case: bool = True):
"""
Args:
substring: Подстрока или список подстрок
ignore_case: Игнорировать регистр букв
"""
self.substrings = [substring] if isinstance(substring, str) else substring
self.ignore_case = ignore_case
if self.ignore_case:
self.substrings = [s.lower() for s in self.substrings]
async def __call__(self, callback: CallbackQuery) -> Union[bool, dict]:
if not callback.data:
return False
data = callback.data.lower() if self.ignore_case else callback.data
for substring in self.substrings:
if substring in data:
return {
'matched': True,
'substring': substring,
'full_data': callback.data
}
return False
class CallbackMatches(BaseFilter):
"""
Проверяет callback_data по regex паттерну.
Example:
```python
# Паттерн: user_123, user_456 и т.д.
@router.callback_query(CallbackMatches(r"^user_(\d+)$"))
async def user_handler(callback: CallbackQuery, matched: dict):
user_id = matched['groups']
await callback.answer(f"Пользователь {user_id}")
```
"""
def __init__(self, pattern: Union[str, re.Pattern], flags: int = 0):
"""
Args:
pattern: Regex паттерн (строка или скомпилированный Pattern)
flags: Флаги для regex (например, re.IGNORECASE)
"""
if isinstance(pattern, str):
self.pattern = re.compile(pattern, flags)
else:
self.pattern = pattern
async def __call__(self, callback: CallbackQuery) -> Union[bool, dict]:
if not callback.data:
return False
match = self.pattern.match(callback.data)
if match:
logger.debug(
f"Callback соответствует паттерну {self.pattern.pattern}: {callback.data}",
log_type='CALLBACK'
)
return {
'matched': True,
'pattern': self.pattern.pattern,
'groups': match.groups(),
'groupdict': match.groupdict(),
'full_data': callback.data
}
return False
class CallbackIn(BaseFilter):
"""
Проверяет, находится ли callback_data в списке разрешенных значений.
Example:
```python
@router.callback_query(CallbackIn(["yes", "no", "cancel"]))
async def choice_handler(callback: CallbackQuery):
choice = callback.data
await callback.answer(f"Выбрано: {choice}")
```
"""
def __init__(self, values: list[str], ignore_case: bool = True):
"""
Args:
values: Список разрешенных значений
ignore_case: Игнорировать регистр букв
"""
self.values = values
self.ignore_case = ignore_case
if self.ignore_case:
self.values = [v.lower() for v in values]
async def __call__(self, callback: CallbackQuery) -> Union[bool, dict]:
if not callback.data:
return False
data = callback.data.lower() if self.ignore_case else callback.data
if data in self.values:
return {
'matched': True,
'value': callback.data
}
return False

184
bot/filters/modes.py Normal file
View File

@@ -0,0 +1,184 @@
"""
Фильтры для проверки активных режимов бота (silence, conflict)
"""
from datetime import datetime
from typing import Optional
from aiogram.filters import BaseFilter
from aiogram.types import Message
from middleware.loggers import logger
__all__ = ('IsSilenceActive', 'IsConflictModeActive')
class IsSilenceActive(BaseFilter):
"""
Проверяет, активен ли режим тишины (silence mode).
В режиме тишины удаляются ВСЕ сообщения (кроме админов).
Attributes:
silence_until: Время до которого активен режим (None = неактивен)
Example:
```python
# В handler-файле
silence_filter = IsSilenceActive()
@router.message(silence_filter)
async def silence_mode_active(message: Message):
# Удаляем все сообщения в режиме тишины
await message.delete()
```
"""
def __init__(self, silence_until: Optional[datetime] = None):
"""
Args:
silence_until: Datetime до которого активен режим
"""
self.silence_until = silence_until
def update_silence_until(self, new_datetime: Optional[datetime]) -> None:
"""
Обновляет время окончания режима тишины.
Args:
new_datetime: Новое время окончания или None для отключения
"""
self.silence_until = new_datetime
if new_datetime:
logger.info(
f"Режим тишины активирован до {new_datetime.strftime('%H:%M:%S')}",
log_type='SILENCE'
)
else:
logger.info("Режим тишины отключен", log_type='SILENCE')
def is_active(self) -> bool:
"""
Проверяет, активен ли режим сейчас.
Returns:
bool: True если режим активен
"""
if self.silence_until is None:
return False
# Проверка истечения времени
if datetime.now() >= self.silence_until:
logger.info("Режим тишины автоматически завершен", log_type='SILENCE')
self.silence_until = None
return False
return True
async def __call__(self, event: Message) -> Optional[dict]:
"""
Проверка активности режима тишины.
Returns:
dict или None: Информация о режиме если активен, иначе None
"""
if self.is_active():
remaining = (self.silence_until - datetime.now()).total_seconds()
logger.debug(
f"Режим тишины активен (осталось {remaining:.0f}с)",
log_type='SILENCE',
message=event
)
return {
'is_active': True,
'until': self.silence_until,
'remaining_seconds': remaining
}
return None
class IsConflictModeActive(BaseFilter):
"""
Проверяет, активен ли режим антиконфликта (conflict mode).
В режиме антиконфликта удаляются сообщения с конфликтными словами.
Attributes:
conflict_until: Время до которого активен режим (None = неактивен)
Example:
```python
conflict_filter = IsConflictModeActive()
@router.message(conflict_filter)
async def conflict_mode_active(message: Message):
# Проверяем на конфликтные слова и удаляем
if has_conflict_words(message.text):
await message.delete()
```
"""
def __init__(self, conflict_until: Optional[datetime] = None):
"""
Args:
conflict_until: Datetime до которого активен режим
"""
self.conflict_until = conflict_until
def update_conflict_until(self, new_datetime: Optional[datetime]) -> None:
"""
Обновляет время окончания режима антиконфликта.
Args:
new_datetime: Новое время окончания или None для отключения
"""
self.conflict_until = new_datetime
if new_datetime:
logger.info(
f"Режим антиконфликта активирован до {new_datetime.strftime('%H:%M:%S')}",
log_type='CONFLICT'
)
else:
logger.info("Режим антиконфликта отключен", log_type='CONFLICT')
def is_active(self) -> bool:
"""
Проверяет, активен ли режим сейчас.
Returns:
bool: True если режим активен
"""
if self.conflict_until is None:
return False
# Проверка истечения времени
if datetime.now() >= self.conflict_until:
logger.info("Режим антиконфликта автоматически завершен", log_type='CONFLICT')
self.conflict_until = None
return False
return True
async def __call__(self, event: Message) -> Optional[dict]:
"""
Проверка активности режима антиконфликта.
Returns:
dict или None: Информация о режиме если активен, иначе None
"""
if self.is_active():
remaining = (self.conflict_until - datetime.now()).total_seconds()
logger.debug(
f"Режим антиконфликта активен (осталось {remaining:.0f}с)",
log_type='CONFLICT',
message=event
)
return {
'is_active': True,
'until': self.conflict_until,
'remaining_seconds': remaining
}
return None

395
bot/filters/msg_content.py Normal file
View File

@@ -0,0 +1,395 @@
"""
Фильтры для проверки содержимого сообщений
"""
import re
from typing import Optional, Union
from aiogram.filters import BaseFilter
from aiogram.types import Message, ContentType
from middleware.loggers import logger
__all__ = (
'IsReply',
'IsForwarded',
'HasMedia',
'ContainsURL',
'HasText',
'HasCaption',
'HasEntities',
'MediaType'
)
class IsReply(BaseFilter):
"""
Проверяет, является ли сообщение ответом на другое сообщение.
Example:
```python
@router.message(IsReply())
async def handle_reply(message: Message):
original = message.reply_to_message
await message.answer(f"Это ответ на: {original.text}")
```
"""
async def __call__(self, message: Message) -> Union[bool, dict]:
is_reply = message.reply_to_message is not None
if is_reply:
return {
'is_reply': True,
'reply_to_message': message.reply_to_message,
'reply_to_user_id': message.reply_to_message.from_user.id if message.reply_to_message.from_user else None
}
return False
class IsForwarded(BaseFilter):
"""
Проверяет, является ли сообщение пересланным.
Поддерживает:
- Пересылку от пользователей (forward_from)
- Пересылку из каналов/групп (forward_from_chat)
- Скрытую пересылку (forward_sender_name)
Example:
```python
@router.message(IsForwarded())
async def handle_forwarded(message: Message, forward_info: dict):
await message.answer(f"Переслано из: {forward_info['origin']}")
```
"""
async def __call__(self, message: Message) -> Union[bool, dict]:
# Проверка различных типов пересылки
is_forwarded = (
message.forward_origin is not None or # Новый API (aiogram 3.x)
message.forward_from is not None or
message.forward_from_chat is not None or
message.forward_sender_name is not None
)
if is_forwarded:
origin = "неизвестно"
if message.forward_from:
origin = f"пользователь @{message.forward_from.username or message.forward_from.id}"
elif message.forward_from_chat:
origin = f"чат {message.forward_from_chat.title or message.forward_from_chat.id}"
elif message.forward_sender_name:
origin = f"скрытый пользователь ({message.forward_sender_name})"
logger.debug(
f"Обнаружено пересланное сообщение из: {origin}",
log_type='FORWARD',
message=message
)
return {
'is_forwarded': True,
'origin': origin,
'forward_date': message.forward_date
}
return False
class HasMedia(BaseFilter):
"""
Проверяет, содержит ли сообщение медиа-контент.
Attributes:
media_types: Список типов медиа для проверки (если None, проверяются все)
Example:
```python
# Любое медиа
@router.message(HasMedia())
async def handle_media(message: Message):
await message.answer("Получено медиа!")
# Только фото и видео
@router.message(HasMedia(['photo', 'video']))
async def handle_visual(message: Message):
await message.answer("Фото или видео!")
```
"""
def __init__(self, media_types: Optional[list[str]] = None):
"""
Args:
media_types: Список типов медиа ('photo', 'video', 'document', и т.д.)
Если None, проверяются все типы
"""
self.media_types = media_types
async def __call__(self, message: Message) -> Union[bool, dict]:
# Все возможные типы медиа
media_checks = {
'photo': message.photo,
'video': message.video,
'document': message.document,
'audio': message.audio,
'voice': message.voice,
'video_note': message.video_note,
'sticker': message.sticker,
'animation': message.animation,
}
# Если указаны конкретные типы, проверяем только их
if self.media_types:
has_media = any(
media_checks[media_type]
for media_type in self.media_types
if media_type in media_checks
)
detected_type = next(
(media_type for media_type in self.media_types if media_checks.get(media_type)),
None
)
else:
# Проверяем все типы
has_media = any(media_checks.values())
detected_type = next(
(media_type for media_type, value in media_checks.items() if value),
None
)
if has_media:
return {
'has_media': True,
'media_type': detected_type,
'content': media_checks[detected_type]
}
return False
class ContainsURL(BaseFilter):
"""
Проверяет, содержит ли сообщение ссылки.
Поддерживает:
- HTTP/HTTPS ссылки
- Telegram ссылки (t.me, tg://)
- Проверку через entities (более точная)
Attributes:
strict: Использовать строгую проверку через entities
Example:
```python
@router.message(ContainsURL())
async def handle_url(message: Message, url_info: dict):
urls = url_info['urls']
await message.answer(f"Обнаружено {len(urls)} ссылок")
```
"""
def __init__(self, strict: bool = False):
"""
Args:
strict: Если True, проверяет через entities (игнорирует текст в коде/pre)
"""
self.strict = strict
# Паттерн для поиска URL
self.url_pattern = re.compile(
r'https?://[^\s]+|' # http(s)://
r't\.me/[^\s]+|' # t.me/
r'tg://[^\s]+', # tg://
re.IGNORECASE
)
async def __call__(self, message: Message) -> Union[bool, dict]:
if not message.text and not message.caption:
return False
text = message.text or message.caption
if self.strict and message.entities:
# Строгая проверка через entities
url_entities = [
entity for entity in message.entities
if entity.type in ('url', 'text_link')
]
if url_entities:
urls = []
for entity in url_entities:
if entity.type == 'url':
url = text[entity.offset:entity.offset + entity.length]
urls.append(url)
elif entity.type == 'text_link':
urls.append(entity.url)
return {
'contains_url': True,
'urls': urls,
'url_count': len(urls)
}
else:
# Простая проверка через regex
urls = self.url_pattern.findall(text)
if urls:
return {
'contains_url': True,
'urls': urls,
'url_count': len(urls)
}
return False
class HasText(BaseFilter):
"""
Проверяет, содержит ли сообщение текст.
Attributes:
min_length: Минимальная длина текста (по умолчанию 1)
max_length: Максимальная длина текста (по умолчанию None)
Example:
```python
# Любой текст
@router.message(HasText())
async def handle_text(message: Message):
await message.answer("Получен текст!")
# Текст от 10 до 100 символов
@router.message(HasText(min_length=10, max_length=100))
async def handle_medium_text(message: Message):
await message.answer("Текст подходящей длины!")
```
"""
def __init__(self, min_length: int = 1, max_length: Optional[int] = None):
self.min_length = min_length
self.max_length = max_length
async def __call__(self, message: Message) -> Union[bool, dict]:
if not message.text:
return False
text_length = len(message.text)
# Проверка длины
if text_length < self.min_length:
return False
if self.max_length and text_length > self.max_length:
return False
return {
'has_text': True,
'text_length': text_length,
'text': message.text
}
class HasCaption(BaseFilter):
"""
Проверяет, есть ли у медиа подпись.
Example:
```python
@router.message(HasCaption())
async def handle_caption(message: Message):
await message.answer(f"Подпись: {message.caption}")
```
"""
async def __call__(self, message: Message) -> Union[bool, dict]:
if message.caption:
return {
'has_caption': True,
'caption': message.caption,
'caption_length': len(message.caption)
}
return False
class HasEntities(BaseFilter):
"""
Проверяет наличие entities (упоминания, хештеги, команды и т.д.).
Attributes:
entity_types: Список типов entities для проверки
Example:
```python
# Любые entities
@router.message(HasEntities())
async def handle_entities(message: Message):
pass
# Только упоминания и хештеги
@router.message(HasEntities(['mention', 'hashtag']))
async def handle_mentions(message: Message):
pass
```
"""
def __init__(self, entity_types: Optional[list[str]] = None):
"""
Args:
entity_types: Список типов ('mention', 'hashtag', 'bot_command', и т.д.)
"""
self.entity_types = entity_types
async def __call__(self, message: Message) -> Union[bool, dict]:
if not message.entities:
return False
if self.entity_types:
# Фильтруем по типам
matching_entities = [
entity for entity in message.entities
if entity.type in self.entity_types
]
if matching_entities:
return {
'has_entities': True,
'entities': matching_entities,
'entity_count': len(matching_entities)
}
else:
# Любые entities
return {
'has_entities': True,
'entities': message.entities,
'entity_count': len(message.entities)
}
return False
class MediaType(BaseFilter):
"""
Проверяет точный тип контента сообщения.
Attributes:
content_type: Тип контента из ContentType enum
Example:
```python
@router.message(MediaType(ContentType.PHOTO))
async def handle_photo(message: Message):
await message.answer("Это фото!")
```
"""
def __init__(self, content_type: Union[ContentType, str]):
"""
Args:
content_type: Тип контента (ContentType enum или строка)
"""
self.content_type = content_type if isinstance(content_type, str) else content_type.value
async def __call__(self, message: Message) -> bool:
return message.content_type == self.content_type

View File

@@ -0,0 +1,77 @@
from asyncio import create_task
from aiogram import Router, F
from aiogram.filters import Command
from aiogram.fsm.context import FSMContext
from aiogram.types import Message, CallbackQuery
from bot.core.bots import BotInfo, bot
from bot.filters import IsOwner
from bot.templates import msg
from bot.utils import status_clear
from bot.utils.auto_delete import auto_delete_message
from configs import COMMANDS
__all__ = ("router",)
CMD: str = "pin".lower()
router: Router = Router(name=f"{CMD}_cmd_router")
@router.message(Command(*COMMANDS[CMD], prefix=BotInfo.prefix, ignore_case=True), IsOwner())
async def pin_cmd(message: Message, state: FSMContext) -> None:
"""
Обработчик команды /pin для закрепления последнего сообщения или ответа.
"""
# Если есть reply → закрепляем его, иначе закрепляем предыдущее сообщение
if message.reply_to_message:
target_message_id = message.reply_to_message.message_id
else:
# Закрепляем предыдущее сообщение (команда - 1)
target_message_id = message.message_id - 1
try:
await bot.pin_chat_message(
chat_id=message.chat.id,
message_id=target_message_id,
disable_notification=False
)
# Автоудаление через 7 суток (удаляем закрепленное сообщение)
create_task(
auto_delete_message(
chat_id=message.chat.id,
message_id=target_message_id,
delay=604800
)
)
await msg(update=message, text="✅ Сообщение успешно закреплено", state=state)
except Exception as e:
await msg(update=message, text=f"❌ Ошибка закрепления: {e}", state=state)
@router.callback_query(F.data.casefold().isin(COMMANDS[CMD]), IsOwner())
async def pin_callback(callback: CallbackQuery, state: FSMContext) -> None:
"""
Обработчик кнопки с callback_data="pin".
"""
await status_clear(update=callback.message, state=state)
try:
await bot.pin_chat_message(
chat_id=callback.message.chat.id,
message_id=callback.message.message_id,
disable_notification=False
)
create_task(
auto_delete_message(
chat_id=callback.message.chat.id,
message_id=callback.message.message_id,
delay=604800
)
)
await callback.answer("✅ Сообщение закреплено")
except Exception as e:
await callback.answer(f"❌ Ошибка: {e}", show_alert=True)

View File

@@ -0,0 +1,118 @@
"""
Обработчики callback-кнопок уведомлений о спаме
"""
from aiogram import Router, F
from aiogram.types import CallbackQuery
from aiogram.exceptions import TelegramBadRequest
from bot.filters.admin import IsAdmin
from database import get_manager
from middleware.loggers import logger
__all__ = ("router",)
router: Router = Router(name="spam_notifications_router")
# ================= ЗАКРЫТИЕ УВЕДОМЛЕНИЯ =================
@router.callback_query(F.data == "spam_close", IsAdmin())
async def spam_close_callback(callback: CallbackQuery) -> None:
"""
Закрывает (удаляет) уведомление о спаме.
"""
try:
await callback.message.delete()
await callback.answer("✅ Уведомление закрыто")
logger.debug(
f"Уведомление о спаме закрыто админом {callback.from_user.id}",
log_type="SPAM_NOTIFICATION"
)
except TelegramBadRequest as e:
logger.error(f"Ошибка удаления уведомления: {e}", log_type="ERROR")
await callback.answer("Не удалось удалить уведомление", show_alert=True)
# ================= БАН ПОЛЬЗОВАТЕЛЯ =================
@router.callback_query(F.data.startswith("spam_ban:"), IsAdmin())
async def spam_ban_callback(callback: CallbackQuery) -> None:
"""
Банит пользователя прямо из уведомления.
"""
try:
# Парсим данные: spam_ban:user_id:chat_id
parts = callback.data.split(":")
user_id = int(parts[1])
chat_id = int(parts[2])
# Баним пользователя
try:
await callback.bot.ban_chat_member(
chat_id=chat_id,
user_id=user_id
)
# Обновляем сообщение
updated_text = callback.message.text + f"\n\n🔨 <b>Пользователь забанен</b> (@{callback.from_user.username or callback.from_user.id})"
# Убираем кнопки
await callback.message.edit_text(
text=updated_text,
parse_mode="HTML"
)
await callback.answer("✅ Пользователь забанен", show_alert=True)
logger.info(
f"Пользователь {user_id} забанен админом {callback.from_user.id} через уведомление о спаме",
log_type="SPAM_BAN"
)
except TelegramBadRequest as e:
await callback.answer(f"❌ Ошибка бана: {str(e)}", show_alert=True)
except Exception as e:
logger.error(f"Ошибка обработки бана из уведомления: {e}", log_type="ERROR")
await callback.answer("❌ Ошибка выполнения", show_alert=True)
# ================= СТАТИСТИКА ПОЛЬЗОВАТЕЛЯ =================
@router.callback_query(F.data.startswith("spam_stats:"), IsAdmin())
async def spam_stats_callback(callback: CallbackQuery) -> None:
"""
Показывает статистику пользователя.
"""
try:
# Парсим данные: spam_stats:user_id
parts = callback.data.split(":")
user_id = int(parts[1])
manager = get_manager()
# Получаем статистику
spam_count = await manager.get_user_spam_count(user_id)
recent_spam = await manager.get_spam_stats(limit=5, user_id=user_id)
# Формируем текст
text = f"📊 <b>Статистика пользователя</b>\n\n"
text += f"🆔 ID: <code>{user_id}</code>\n"
text += f"🗑 Удалено сообщений: <code>{spam_count}</code>\n\n"
if recent_spam:
text += f"📝 <b>Последние нарушения:</b>\n"
for idx, stat in enumerate(recent_spam, 1):
matched_word = stat.matched_word or "неизвестно"
match_type = stat.match_type or "unknown"
text += f"{idx}. <code>{matched_word}</code> ({match_type})\n"
else:
text += "✅ <i>Нет нарушений</i>"
await callback.answer(text, show_alert=True)
except Exception as e:
logger.error(f"Ошибка получения статистики из уведомления: {e}", log_type="ERROR")
await callback.answer("❌ Ошибка получения статистики", show_alert=True)

View File

@@ -0,0 +1,162 @@
"""
Обработчик команды /start и /help для администраторов.
Показывает список доступных команд для управления банвордами.
"""
from aiogram import Router, F
from aiogram.filters import Command
from aiogram.types import Message, CallbackQuery
from aiogram.utils.keyboard import InlineKeyboardBuilder
from bot.filters.admin import IsAdmin
from configs import settings, COMMANDS
from middleware.loggers import logger
from bot.utils import log_action, tg_emoji
__all__ = ("router",)
CMD: str = "start"
router: Router = Router(name="start_cmd_router")
def kb(text: str = "Создатель⬆️", url: str = "https://t.me/verdise"):
ikb = InlineKeyboardBuilder()
ikb.button(text=text, url=url)
return ikb.as_markup()
@router.callback_query(F.data.casefold() == CMD)
@router.message(Command(*COMMANDS[CMD], prefix=settings.PREFIX, ignore_case=True), IsAdmin())
@log_action(action_name="START_COMMAND", log_args=True)
async def start_cmd(update: Message | CallbackQuery) -> None:
"""
Обработчик команды /start и /help.
Показывает справку по командам бота для администраторов.
Доступно только администраторам (суперадмин или доп. админ из БД).
Args:
update: Message или CallbackQuery
"""
print(123)
# Определяем тип update и извлекаем данные
if isinstance(update, CallbackQuery):
message = update.message
user_id = update.from_user.id
is_callback = True
else:
message = update
user_id = update.from_user.id
is_callback = False
# Проверяем, является ли пользователь суперадмином
is_super_admin = user_id in settings.OWNER_ID
# Формируем текст помощи
help_text = (
f'{tg_emoji(4961073056677103064)} <b>PrimoGuard - Бот-модератор</b>\n\n'
'<blockquote>Автоматическое удаление сообщений с запрещёнными словами.\nПоддержка подстрок, лемм, временных блокировок и режимов модерации.</blockquote>\n\n'
)
# === Команды просмотра ===
help_text += (
f'{tg_emoji(4961141003059725568)} <b>Просмотр:</b>\n'
'<b>/list</b> — список всех правил и слов\n'
'<b>/stats</b> — статистика по удалениям\n'
'<b>/id</b> — получение айди пользователя\n'
'<b>/chatid</b> — получение айди чата\n\n'
)
# === Постоянные банворды ===
help_text += (
f'{tg_emoji(4961019408240608234)} <b>Добавить банворд (постоянно):</b>\n'
'<code>/addword</code> <code>слово</code> — подстрока (простой поиск)\n'
'<code>/addlemma</code> <code>слово</code> — лемма (все формы слова)\n'
'<code>/addpart</code> <code>комбинация</code> — часть (поиск без пробелов)\n\n'
)
# === Временные банворды ===
help_text += (
f'{tg_emoji(4960719190026618714)} <b>Добавить банворд (временно):</b>\n'
'<code>/addtempword</code> <code>слово минуты</code> — временная подстрока\n'
'<code>/addtemplemma</code> <code>слово минуты</code> — временная лемма\n'
'<i>Пример: /addtempword спам 60</i>\n\n'
)
# === Исключения (whitelist) ===
help_text += (
f'{tg_emoji(4963010134172239128)} <b>Исключения (whitelist):</b>\n'
'<code>/addexcept</code> <code>текст</code> — добавить исключение\n'
'<code>/remexcept</code> <code>текст</code> — удалить исключение\n'
'<i>Исключения не проверяются фильтром</i>\n\n'
)
# === Режимы модерации ===
help_text += (
f'{tg_emoji(4960987543878239236)} <b>Режим тишины:</b>\n'
'<code>/silence</code> <code>минуты</code> — удалять ВСЕ сообщения\n'
'<b>/unsilence</b> — отключить режим тишины\n'
'<code>/report</code> — отправить репорт\n\n'
)
help_text += (
f'{tg_emoji(4960986152308835400)} <b>Режим антиконфликта:</b>\n'
'<code>/addconflictword</code> <code>слово</code> — добавить конфликтное слово\n'
'<code>/addconflictlemma</code> <code>слово</code> — добавить конфликтную лемму\n'
'<code>/stopconflict</code> <code>минуты</code> — активировать режим\n'
'<code>/unstopconflict</code> — отключить режим\n\n'
)
# === Удаление ===
help_text += (
f'{tg_emoji(4961196485447254983)} <b>Удалить:</b>\n'
'<code>/remword</code> <code>слово</code> — удалить подстроку\n'
'<code>/remlemma</code> <code>слово</code> — удалить лемму\n'
'<code>/rempart</code> <code>комбинация</code> — удалить часть\n'
'<code>/remtempword</code> <code>слово</code> — удалить временную подстроку\n'
'<code>/remtemplemma</code> <code>слово</code> — удалить временную лемму\n'
'<code>/remconflictword</code> <code>слово</code> — удалить конфликтное слово\n'
'<code>/remconflictlemma</code> <code>слово</code> — удалить конфликтную лемму\n\n'
)
# === Управление админами (только для суперадминов) ===
if is_super_admin:
help_text += (
f'{tg_emoji(4960891456869893259)} <b>Управление админами (только для владельцев):</b>\n'
'<code>/addadmin</code> <i>ID</i> — добавить администратора\n'
'<code>/remadmin</code> <i>ID</i> — удалить администратора\n'
'<b>/redactcomment</b> — изменить комментарий под постом\n'
'<b>/listadmins</b> — список всех админов\n\n'
)
# === Типы проверок ===
help_text += (
f'{tg_emoji(4961021096162755737)} <b>Типы проверок:</b>\n'
'• <b>Подстрока</b> — простой поиск в тексте\n'
'• <b>Лемма</b> — все формы слова (купить→куплю, купил, купишь...)\n'
'• <b>Часть</b> — поиск без пробелов (обходит \"к у п и т ь\")\n'
'• <b>Временные</b> — автоматически удаляются через N минут\n'
'• <b>Конфликтные</b> — работают только в режиме /stopconflict\n\n'
)
# Отправляем ответ
try:
if is_callback:
await message.edit_text(
text=help_text,
parse_mode="HTML",
reply_markup=kb()
)
await update.answer()
else:
await message.answer(
text=help_text,
parse_mode="HTML",
reply_markup=kb()
)
except Exception as e:
logger.error(
f"Ошибка отправки help сообщения: {e}",
log_type="ERROR"
)
if is_callback:
await update.answer(f'{tg_emoji(4963277744994518278)} Ошибка отображения справки', show_alert=True)

View File

@@ -0,0 +1,242 @@
"""
Триггер-хэндлер: реагирует на обращения к Лайле с именем персонажа.
Формат: "Лайла [что угодно] [имя или псевдоним]"
"""
from typing import Dict, Optional
import random
from aiogram import Router
from aiogram.types import Message
__all__ = ("router",)
router: Router = Router(name="triggers_router")
CHARACTERS: Dict[str, Dict] = {
"эвелин": {
"aliases": ["эвелин", "эва", "эви"],
"answers": [
"Эвелин умеет молчать так, что хочется говорить.",
"Эва всегда знает больше, чем говорит. Это немного пугает.",
"С ней рядом становится спокойно. Не знаю почему.",
"Интересно, о чём она думает в тишине...",
"Эвелин тихая снаружи. Но внутри — целый ураган, я уверена.",
],
},
"лео": {
"aliases": ["лео", "лёва", "лёня"],
"answers": [
"Лео громкий, яркий и немного безрассудный. Мне нравится!",
"Он смеётся первым и уходит последним. Настоящий.",
"Лео всегда найдёт повод для праздника, даже если его нет.",
"Кажется, он боится тишины. Поэтому и заполняет её собой.",
"За его смехом прячется что-то очень серьёзное...",
],
},
"маркус": {
"aliases": ["маркус", "марк"],
"answers": [
"Маркус говорит мало, но каждое слово весит.",
"Он из тех, кто держит слово даже когда это неудобно.",
"С Маркусом не поспоришь. Не потому что нельзя — просто незачем.",
"Он смотрит так, будто видит тебя насквозь. Немного жутковато.",
"Маркус — тот, на кого можно положиться в самый плохой день.",
],
},
"мари": {
"aliases": ["мари", "маришка", "мариша"],
"answers": [
"Мари — это как утренний свет. Мягко и неожиданно тепло.",
"Она помнит мелочи, которые другие не замечают. Это её суперсила.",
"С Мари любой разговор становится важным.",
"Она улыбается даже когда грустно. Не притворяется — просто верит.",
"Мари умеет прощать. Это редкость.",
],
},
"либе": {
"aliases": ["либе", "либ"],
"answers": [
"Либе... имя звучит как песня на незнакомом языке.",
"Она всегда чуть в стороне, но именно к ней тянутся люди.",
"Либе видит красоту там, где другие видят хаос.",
"Она не объясняет себя. И не должна.",
"С Либе можно молчать — и это не будет неловко.",
],
},
"мотциэль": {
"aliases": ["мотциэль", "мотц", "моц"],
"answers": [
"Мотциэль... даже имя звучит как заклинание.",
"Он существует между мирами. Буквально.",
"Спрашивать его о прошлом — плохая идея. Очень плохая.",
"Мотциэль помнит вещи, которых не было. Или были?",
"Его глаза смотрят в разные эпохи одновременно.",
],
},
"виктор": {
"aliases": ["виктор", "вик", "витя"],
"answers": [
"Виктор всегда побеждает. Это в имени.",
"Он не злой. Просто у него другая шкала ценностей.",
"Виктор говорит правду даже когда это больно. Особенно когда больно.",
"Не стоит играть с ним в слова — проиграешь.",
"За его холодностью — старая-старая усталость.",
],
},
"кситти": {
"aliases": ["кситти", "кси", "ксит"],
"answers": [
"Кситти — маленький хаос в красивой упаковке.",
"Она никогда не делает то, что от неё ожидают. Никогда.",
"С Кситти скучно не бывает. Опасно — бывает. Скучно — нет.",
"Она собирает странные вещи и странных людей.",
"Кситти смеётся над правилами. Потому что сама их придумывает.",
],
},
"кадфаль": {
"aliases": ["кадфаль", "кад", "кадф"],
"answers": [
"Кадфаль несёт что-то древнее в каждом шаге.",
"Он не торопится. У него другое ощущение времени.",
"Кадфаль знает цену словам — поэтому тратит их редко.",
"В его присутствии хочется стоять прямо.",
"Он видел многое. Слишком многое для одной жизни.",
],
},
"вайш": {
"aliases": ["вайш", "вай"],
"answers": [
"Вайш появляется неожиданно и исчезает так же.",
"Её след — это вопросы без ответов.",
"Вайш знает что-то, что тебе лучше не знать.",
"Она не объясняет своих решений. Просто делает.",
"С Вайш никогда не знаешь, друг она или нет.",
],
},
"скаф": {
"aliases": ["скаф"],
"answers": [
"Скаф — имя, которое не забывается.",
"Он работает в тени. Не потому что боится света — просто так удобнее.",
"Скаф знает цену всему. Буквально всему.",
"Его нельзя купить. Его можно только нанять. Это разница.",
"Те, кто встречал Скафа, редко рассказывают об этом дважды.",
],
},
"куарти": {
"aliases": ["куарти", "куар"],
"answers": [
"Куарти — четыре буквы и миллион вопросов.",
"Он улыбается, когда другие нервничают. Это не успокаивает.",
"Куарти коллекционирует долги. Чужие.",
"Говорят, он никогда не проигрывает. Говорят.",
"Куарти появляется именно тогда, когда тебе нужна помощь. И это не случайно.",
],
},
"саэрин": {
"aliases": ["саэрин", "саэ", "сэрин"],
"answers": [
"Саэрин — как туман. Красиво и немного опасно.",
"Она говорит загадками не потому что хочет запутать — просто иначе не умеет.",
"Саэрин помнит всё, что ей говорят. Всё.",
"Её спокойствие пугает больше, чем чужой гнев.",
"Саэрин выбирает слова как оружие — точно и без лишнего.",
],
},
"котики": {
"aliases": ["котики", "котик", "кот", "кошка"],
"answers": [
"Котики — это лучшее, что есть в этом мире. Без обсуждений.",
"Котик сел на тебя — ты избран.",
"Кот смотрит на тебя и думает что-то важное. Наверное.",
"Котики всегда правы. Это научный факт.",
"Маленький тёплый комочек счастья. Что ещё нужно?",
],
},
"нотик": {
"aliases": ["нотик", "нота", "нотка"],
"answers": [
"Нотик! Звучит как маленькая музыкальная нота! 🎵",
"Нотик — тот, кто приносит мелодию туда, где её не хватает.",
"Маленький, но важный. Как все хорошие вещи.",
"Нотик — это и ласково, и загадочно одновременно.",
"Из таких маленьких нотиков складываются большие истории.",
],
},
"илья": {
"aliases": ["илья", "илюха", "илюша"],
"answers": [
"Илья — имя с характером. Твёрдое и живое.",
"Илья всегда знает что делать. Или уверенно делает вид.",
"С Ильёй легко — он не усложняет лишнего.",
"Он из тех, кто сделает, а потом расскажет. Не наоборот.",
"Илья редко жалуется. Чаще просто решает.",
],
},
"ина": {
"aliases": ["ина", "инка", "инуля"],
"answers": [
"Ина — короткое имя, за которым много всего.",
"Она тихая, но запоминается.",
"Ина умеет слушать так, что хочется говорить.",
"В ней есть что-то очень своё, неповторимое.",
"Ина — как маленький огонёк. Незаметный, но греет.",
],
},
"абсцисс": {
"aliases": ["абсцисс", "абс"],
"answers": [
"Абсцисс! Это математика или имя? Хи-хи!",
"Ось абсцисс — горизонталь жизни. Всё движется по ней.",
"Абсцисс звучит как заклинание из учебника.",
"Кто-то мечтает о приключениях, а кто-то — об осях координат!",
"Абсцисс и ордината. Звучит как имена двух загадочных персонажей!",
],
},
}
# Имена, на которые Лайла откликается
LAYLA_NAMES = ["лайла", "лайл", "лая"]
def find_character_answer(text: str) -> Optional[str]:
"""
Проверяет:
1. Есть ли в тексте обращение к Лайле
2. Есть ли имя персонажа
Возвращает случайный ответ или None.
"""
text_lower = text.lower()
# Проверяем обращение к Лайле
if not any(name in text_lower for name in LAYLA_NAMES):
return None
# Ищем персонажа
for character, data in CHARACTERS.items():
for alias in data["aliases"]:
if alias in text_lower:
return random.choice(data["answers"])
return None
@router.message()
async def handle_triggers(message: Message) -> None:
"""
Реагирует только если:
- Сообщение от живого человека
- В тексте есть обращение к Лайле
- В тексте есть имя персонажа
На всё остальное — молчит.
"""
#if not message.text or not message.from_user or message.from_user.is_bot:
#return
#response = find_character_answer(message.text)
#if response:
#await message.reply(response)
return

0
bot/keyboards/inline.py Normal file
View File

688
bot/utils/argument.py Normal file
View File

@@ -0,0 +1,688 @@
"""
Утилиты для работы с командами бота
"""
from typing import Optional, Union, Dict, List, Tuple, Set
from dataclasses import dataclass, field
import re
from aiogram.types import Message
from configs import settings
__all__ = (
'is_command',
'find_argument',
'get_command',
'parse_arguments',
'parse_flags',
'CommandParser',
'ParsedCommand',
'parse_command',
'validate_command',
'get_command_usage',
'extract_mentions',
'extract_user_ids',
'extract_hashtags'
)
@dataclass
class ParsedCommand:
"""
Распарсенная команда.
Attributes:
command: Название команды
prefix: Префикс команды
args: Список аргументов
raw_args: Исходная строка аргументов
flags: Словарь флагов (--flag value)
bot_username: Username бота (если было упоминание)
is_group_command: Команда в группе с упоминанием бота
"""
command: str
prefix: str
args: List[str] = field(default_factory=list)
raw_args: Optional[str] = None
flags: Dict[str, Union[str, bool]] = field(default_factory=dict)
bot_username: Optional[str] = None
is_group_command: bool = False
@property
def has_args(self) -> bool:
"""Есть ли аргументы"""
return len(self.args) > 0
@property
def has_flags(self) -> bool:
"""Есть ли флаги"""
return len(self.flags) > 0
def get_arg(self, index: int, default: Optional[str] = None) -> Optional[str]:
"""Получает аргумент по индексу"""
return self.args[index] if index < len(self.args) else default
def get_flag(self, name: str, default: Optional[Union[str, bool]] = None) -> Union[str, bool, None]:
"""Получает значение флага"""
return self.flags.get(name, default)
def __repr__(self) -> str:
return (
f"ParsedCommand(command='{self.command}', "
f"args={self.args}, flags={self.flags})"
)
class CommandParser:
"""
Парсер команд бота.
Возможности:
- Поддержка нескольких префиксов
- Парсинг аргументов
- Парсинг флагов (--flag value, -f value)
- Поддержка упоминаний бота (@botname)
- Парсинг quoted аргументов ("arg with spaces")
- Валидация команд
Example:
```python
parser = CommandParser()
# Парсинг команды
parsed = parser.parse("/ban @user 7d --reason спам")
print(parsed.command) # "ban"
print(parsed.args) # ["@user", "7d"]
print(parsed.flags) # {"reason": "спам"}
```
"""
def __init__(
self,
prefixes: Optional[List[str]] = None,
bot_username: Optional[str] = None
):
"""
Args:
prefixes: Список префиксов (по умолчанию из settings)
bot_username: Username бота для проверки упоминаний
"""
self.prefixes = prefixes or settings.PREFIX
self.bot_username = bot_username
def is_command(self, text: Optional[str]) -> bool:
"""
Проверяет, является ли текст командой.
Args:
text: Текст для проверки
Returns:
bool: True если это команда
Example:
>> parser.is_command("/start")
True
>> parser.is_command("hello")
False
"""
if not text:
return False
text = text.strip()
# Проверяем все префиксы
return any(text.startswith(prefix) for prefix in self.prefixes)
def get_command(
self,
text: Optional[str],
strip_mention: bool = True
) -> Optional[str]:
"""
Извлекает название команды из текста.
Args:
text: Текст сообщения
strip_mention: Убирать упоминание бота (@botname)
Returns:
Optional[str]: Название команды или None
Example:
>> parser.get_command("/start@mybot arg")
'start'
>> parser.get_command("!help")
'help'
"""
if not self.is_command(text):
return None
text = text.strip()
# Находим префикс
prefix = next(p for p in self.prefixes if text.startswith(p))
# Убираем префикс
without_prefix = text[len(prefix):]
# Берем первое слово
command = without_prefix.split()[0] if without_prefix else ""
# Убираем упоминание бота если есть
if strip_mention and '@' in command:
command = command.split('@')[0]
return command.lower() if command else None
def find_argument(self, text: Optional[str]) -> Optional[str]:
"""
Извлекает аргументы команды (все после команды).
Args:
text: Текст сообщения
Returns:
Optional[str]: Аргументы или None
Example:
>> parser.find_argument("/start referrer")
'referrer'
>> parser.find_argument("/ban @user reason text")
'@user reason text'
"""
if not self.is_command(text):
return None
parts = text.strip().split(maxsplit=1)
return parts[1] if len(parts) > 1 else None
@staticmethod
def parse_arguments(
args_text: Optional[str],
preserve_quotes: bool = False
) -> List[str]:
"""
Парсит аргументы, поддерживает кавычки.
Args:
args_text: Строка аргументов
preserve_quotes: Сохранять кавычки в результате
Returns:
List[str]: Список аргументов
Example:
>> parser.parse_arguments('user 7d "ban reason here"')
['user', '7d', 'ban reason here']
"""
if not args_text:
return []
# Regex для парсинга с кавычками
# Поддерживает: "arg with spaces" 'arg' arg
pattern = r'''(?:[^\s"']+|"[^"]*"|'[^']*')+'''
matches = re.findall(pattern, args_text)
if preserve_quotes:
return matches
# Убираем кавычки
return [m.strip('"').strip("'") for m in matches]
@staticmethod
def parse_flags(
args: List[str]
) -> Tuple[List[str], Dict[str, Union[str, bool]]]:
"""
Парсит флаги из аргументов.
Поддерживает:
- --flag value
- --flag (boolean, True)
- -f value (короткая форма)
Args:
args: Список аргументов
Returns:
Tuple: (аргументы_без_флагов, словарь_флагов)
Example:
>> args = ['user', '--reason', 'spam', '--silent']
>> clean_args, flags = parser.parse_flags(args)
>> print(clean_args) # ['user']
>> print(flags) # {'reason': 'spam', 'silent': True}
"""
clean_args = []
flags = {}
i = 0
while i < len(args):
arg = args[i]
# Длинный флаг --flag
if arg.startswith('--'):
flag_name = arg[2:]
# Проверяем, есть ли значение
if i + 1 < len(args) and not args[i + 1].startswith('-'):
flags[flag_name] = args[i + 1]
i += 2
else:
# Boolean флаг
flags[flag_name] = True
i += 1
# Короткий флаг -f
elif arg.startswith('-') and len(arg) == 2:
flag_name = arg[1]
# Проверяем значение
if i + 1 < len(args) and not args[i + 1].startswith('-'):
flags[flag_name] = args[i + 1]
i += 2
else:
flags[flag_name] = True
i += 1
# Обычный аргумент
else:
clean_args.append(arg)
i += 1
return clean_args, flags
def parse(
self,
text: str,
parse_flags: bool = True
) -> Optional[ParsedCommand]:
"""
Полный парсинг команды.
Args:
text: Текст команды
parse_flags: Парсить флаги
Returns:
Optional[ParsedCommand]: Распарсенная команда или None
Example:
>> parsed = parser.parse('/ban @user 7d --reason "spam bot"')
>> print(parsed.command) # 'ban'
>> print(parsed.args) # ['@user', '7d']
>> print(parsed.flags) # {'reason': 'spam bot'}
"""
if not self.is_command(text):
return None
text = text.strip()
# Находим префикс
prefix = next(p for p in self.prefixes if text.startswith(p))
# Убираем префикс
without_prefix = text[len(prefix):]
# Разделяем на команду и аргументы
parts = without_prefix.split(maxsplit=1)
if not parts:
return None
command_part = parts[0]
raw_args = parts[1] if len(parts) > 1 else None
# Проверяем упоминание бота
bot_username = None
is_group_command = False
if '@' in command_part:
cmd_parts = command_part.split('@')
command_name = cmd_parts[0]
bot_username = cmd_parts[1] if len(cmd_parts) > 1 else None
is_group_command = True
else:
command_name = command_part
# Парсим аргументы
args = self.parse_arguments(raw_args) if raw_args else []
# Парсим флаги
flags = {}
if parse_flags and args:
args, flags = self.parse_flags(args)
return ParsedCommand(
command=command_name.lower(),
prefix=prefix,
args=args,
raw_args=raw_args,
flags=flags,
bot_username=bot_username,
is_group_command=is_group_command
)
def parse_from_message(
self,
message: Message,
parse_flags: bool = True
) -> Optional[ParsedCommand]:
"""
Парсит команду из объекта Message.
Args:
message: Объект сообщения
parse_flags: Парсить флаги
Returns:
Optional[ParsedCommand]: Распарсенная команда
Example:
>> parsed = parser.parse_from_message(message)
>> if parsed:
... print(f"Команда: {parsed.command}")
"""
if not message.text:
return None
return self.parse(message.text, parse_flags=parse_flags)
# Глобальный парсер
_default_parser: Optional[CommandParser] = None
def get_parser() -> CommandParser:
"""Получает глобальный парсер команд"""
global _default_parser
if _default_parser is None:
_default_parser = CommandParser()
return _default_parser
# ================= УДОБНЫЕ ФУНКЦИИ =================
def is_command(text: Optional[str]) -> bool:
"""
Проверяет, является ли текст командой.
Args:
text: Текст для проверки
Returns:
bool: True если это команда
Example:
>> is_command("/start")
True
>> is_command("hello")
False
"""
return get_parser().is_command(text)
def find_argument(text: Optional[str]) -> Optional[str]:
"""
Извлекает аргументы команды.
Args:
text: Текст команды
Returns:
Optional[str]: Аргументы или None
Example:
>> find_argument("/start referrer")
'referrer'
>> find_argument("/ban @user spam")
'@user spam'
"""
return get_parser().find_argument(text)
def get_command(text: Optional[str]) -> Optional[str]:
"""
Извлекает название команды.
Args:
text: Текст сообщения
Returns:
Optional[str]: Название команды или None
Example:
>> get_command("/start@mybot")
'start'
>> get_command("!help")
'help'
"""
return get_parser().get_command(text)
def parse_arguments(args_text: Optional[str]) -> List[str]:
"""
Парсит аргументы команды.
Args:
args_text: Строка аргументов
Returns:
List[str]: Список аргументов
Example:
>> parse_arguments('user 7d "ban reason"')
['user', '7d', 'ban reason']
"""
return get_parser().parse_arguments(args_text)
def parse_flags(args: List[str]) -> Tuple[List[str], Dict[str, Union[str, bool]]]:
"""
Парсит флаги из аргументов.
Args:
args: Список аргументов
Returns:
Tuple: (аргументы, флаги)
Example:
>> args = ['user', '--reason', 'spam', '--silent']
>> clean_args, flags = parse_flags(args)
>> print(flags) # {'reason': 'spam', 'silent': True}
"""
return get_parser().parse_flags(args)
def parse_command(text: str) -> Optional[ParsedCommand]:
"""
Полный парсинг команды.
Args:
text: Текст команды
Returns:
Optional[ParsedCommand]: Распарсенная команда
Example:
>> parsed = parse_command('/ban @user --reason spam')
>> print(parsed.command) # 'ban'
>> print(parsed.args) # ['@user']
>> print(parsed.flags) # {'reason': 'spam'}
"""
return get_parser().parse(text)
# ================= ВАЛИДАЦИЯ КОМАНД =================
def validate_command(
text: str,
expected_command: str,
min_args: int = 0,
max_args: Optional[int] = None,
required_flags: Optional[Set[str]] = None
) -> Tuple[bool, Optional[str]]:
"""
Валидирует команду.
Args:
text: Текст команды
expected_command: Ожидаемая команда
min_args: Минимальное количество аргументов
max_args: Максимальное количество аргументов
required_flags: Обязательные флаги
Returns:
Tuple[bool, Optional[str]]: (валидна, сообщение_об_ошибке)
Example:
>> valid, error = validate_command(
... '/ban user',
... 'ban',
... min_args=1,
... max_args=2
... )
>> if not valid:
... print(error)
"""
parsed = parse_command(text)
if not parsed:
return False, "Невалидная команда"
# Проверка команды
if parsed.command != expected_command:
return False, f"Ожидалась команда '{expected_command}'"
# Проверка количества аргументов
arg_count = len(parsed.args)
if arg_count < min_args:
return False, f"Недостаточно аргументов (минимум {min_args})"
if max_args is not None and arg_count > max_args:
return False, f"Слишком много аргументов (максимум {max_args})"
# Проверка обязательных флагов
if required_flags:
missing_flags = required_flags - set(parsed.flags.keys())
if missing_flags:
return False, f"Отсутствуют обязательные флаги: {', '.join(missing_flags)}"
return True, None
def get_command_usage(
command: str,
args: List[str],
flags: Optional[Dict[str, str]] = None,
description: Optional[str] = None
) -> str:
"""
Формирует строку использования команды.
Args:
command: Название команды
args: Список аргументов
flags: Словарь флагов с описанием
description: Описание команды
Returns:
str: Форматированная строка использования
Example:
>> usage = get_command_usage(
... 'ban',
... ['<user>', '[duration]'],
... {'reason': 'Причина бана', 'silent': 'Тихий бан'},
... 'Банит пользователя'
... )
>> print(usage)
"""
lines = []
# Описание
if description:
lines.append(f"📝 {description}\n")
# Использование
args_str = ' '.join(args)
lines.append(f"<b>Использование:</b>")
lines.append(f"<code>/{command} {args_str}</code>\n")
# Аргументы
if args:
lines.append("<b>Аргументы:</b>")
for arg in args:
# Определяем обязательность
if arg.startswith('<') and arg.endswith('>'):
lines.append(f"{arg} - обязательный")
elif arg.startswith('[') and arg.endswith(']'):
lines.append(f"{arg} - необязательный")
lines.append("")
# Флаги
if flags:
lines.append("<b>Флаги:</b>")
for flag, desc in flags.items():
lines.append(f"• --{flag} - {desc}")
return '\n'.join(lines)
# ================= ИЗВЛЕЧЕНИЕ УПОМИНАНИЙ =================
def extract_mentions(text: str) -> List[str]:
"""
Извлекает все упоминания (@username) из текста.
Args:
text: Текст для анализа
Returns:
List[str]: Список username (без @)
Example:
>> extract_mentions("Бан @user1 и @user2")
['user1', 'user2']
"""
pattern = r'@(\w+)'
return re.findall(pattern, text)
def extract_user_ids(text: str) -> List[int]:
"""
Извлекает все ID пользователей из текста.
Args:
text: Текст для анализа
Returns:
List[int]: Список ID
Example:
>> extract_user_ids("Бан id123456789 и id987654321")
[123456789, 987654321]
"""
pattern = r'id(\d+)'
matches = re.findall(pattern, text)
return [int(m) for m in matches]
def extract_hashtags(text: str) -> List[str]:
"""
Извлекает все хештеги из текста.
Args:
text: Текст для анализа
Returns:
List[str]: Список хештегов (без #)
Example:
>> extract_hashtags("Пост #важное #новости")
['важное', 'новости']
"""
pattern = r'#(\w+)'
return re.findall(pattern, text)

636
bot/utils/auto_delete.py Normal file
View File

@@ -0,0 +1,636 @@
"""
Утилиты для автоматического удаления сообщений
"""
from typing import Optional, Callable, Awaitable, Dict, Any
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from asyncio import sleep, create_task, Task, CancelledError
from aiogram import Bot
from aiogram.types import Message
from aiogram.exceptions import TelegramBadRequest, TelegramForbiddenError
from middleware.loggers import logger
from .format_time import format_duration
__all__ = (
'auto_delete_message',
'schedule_delete',
'cancel_delete',
'delete_after',
'auto_delete_manager',
'AutoDeleteManager',
'DeleteTask',
'delete_both_after',
'delete_messages_after',
)
@dataclass
class DeleteTask:
"""
Задача на удаление сообщения.
Attributes:
chat_id: ID чата
message_id: ID сообщения
delete_at: Время удаления
task: Asyncio task
created_at: Время создания задачи
reason: Причина удаления
callback: Callback функция после удаления
"""
chat_id: int
message_id: int
delete_at: datetime
task: Optional[Task] = None
created_at: datetime = field(default_factory=datetime.now)
reason: Optional[str] = None
callback: Optional[Callable[[], Awaitable[None]]] = None
@property
def delay(self) -> int:
"""Задержка до удаления в секундах"""
delta = self.delete_at - datetime.now()
return max(0, int(delta.total_seconds()))
@property
def is_expired(self) -> bool:
"""Истекло ли время удаления"""
return datetime.now() >= self.delete_at
def __repr__(self) -> str:
return (
f"DeleteTask(chat={self.chat_id}, msg={self.message_id}, "
f"delay={self.delay}s, reason={self.reason})"
)
class AutoDeleteManager:
"""
Менеджер автоматического удаления сообщений.
Возможности:
- Планирование удаления с задержкой
- Отмена запланированного удаления
- Массовое удаление
- Callback функции
- История задач
- Автоматическая очистка завершенных задач
Example:
```python
from utils.auto_delete import auto_delete_manager
# Планирование удаления
await auto_delete_manager.schedule(
bot=bot,
chat_id=message.chat.id,
message_id=message.message_id,
delay=60,
reason="Временное сообщение"
)
# Отмена удаления
auto_delete_manager.cancel(message.chat.id, message.message_id)
# Получение статистики
stats = auto_delete_manager.get_stats()
```
"""
def __init__(self):
# Активные задачи: {(chat_id, message_id): DeleteTask}
self.tasks: Dict[tuple[int, int], DeleteTask] = {}
# Завершенные задачи (последние 100)
self.completed: list[DeleteTask] = []
self.max_completed = 100
# Статистика
self.total_scheduled: int = 0
self.total_deleted: int = 0
self.total_failed: int = 0
self.total_cancelled: int = 0
async def schedule(
self,
bot: Bot,
chat_id: int,
message_id: int,
delay: int,
reason: Optional[str] = None,
callback: Optional[Callable[[], Awaitable[None]]] = None,
log: bool = True
) -> DeleteTask:
"""
Планирует удаление сообщения.
Args:
bot: Экземпляр бота
chat_id: ID чата
message_id: ID сообщения
delay: Задержка в секундах
reason: Причина удаления
callback: Callback функция после удаления
log: Логировать планирование
Returns:
DeleteTask: Созданная задача
Example:
>> task = await auto_delete_manager.schedule(
... bot=bot,
... chat_id=message.chat.id,
... message_id=message.message_id,
... delay=60,
... reason="Спам"
... )
"""
# Отменяем предыдущую задачу если есть
key = (chat_id, message_id)
if key in self.tasks:
self.cancel(chat_id, message_id)
# Создаем задачу
delete_at = datetime.now() + timedelta(seconds=delay)
task_data = DeleteTask(
chat_id=chat_id,
message_id=message_id,
delete_at=delete_at,
reason=reason,
callback=callback
)
# Создаем asyncio task
task = create_task(self._delete_task(bot, task_data, log))
task_data.task = task
# Сохраняем
self.tasks[key] = task_data
self.total_scheduled += 1
if log:
delay_str = format_duration(delay)
logger.info(
f"Запланировано удаление сообщения через {delay_str}",
log_type='AUTO_DELETE'
)
return task_data
async def _delete_task(
self,
bot: Bot,
task_data: DeleteTask,
log: bool
) -> None:
"""
Внутренняя функция для выполнения задачи удаления.
Args:
bot: Экземпляр бота
task_data: Данные задачи
log: Логировать выполнение
"""
key = (task_data.chat_id, task_data.message_id)
try:
# Ждем
await sleep(task_data.delay)
# Удаляем сообщение
await bot.delete_message(
chat_id=task_data.chat_id,
message_id=task_data.message_id
)
self.total_deleted += 1
if log:
reason_str = f" (причина: {task_data.reason})" if task_data.reason else ""
logger.info(
f"Сообщение удалено автоматически{reason_str}",
log_type='AUTO_DELETE'
)
# Вызываем callback если есть
if task_data.callback:
try:
await task_data.callback()
except Exception as e:
logger.error(
f"Ошибка в callback автоудаления: {e}",
log_type='AUTO_DELETE'
)
except CancelledError:
# Задача отменена
self.total_cancelled += 1
if log:
logger.debug(
f"Удаление сообщения отменено",
log_type='AUTO_DELETE'
)
raise
except (TelegramBadRequest, TelegramForbiddenError) as e:
# Ошибка удаления
self.total_failed += 1
if log:
logger.warning(
f"Не удалось автоматически удалить сообщение: {e}",
log_type='AUTO_DELETE'
)
finally:
# Удаляем из активных задач
if key in self.tasks:
completed_task = self.tasks.pop(key)
# Сохраняем в завершенные
self.completed.append(completed_task)
if len(self.completed) > self.max_completed:
self.completed.pop(0)
def cancel(
self,
chat_id: int,
message_id: int,
log: bool = True
) -> bool:
"""
Отменяет запланированное удаление.
Args:
chat_id: ID чата
message_id: ID сообщения
log: Логировать отмену
Returns:
bool: True если задача была отменена
Example:
>> cancelled = auto_delete_manager.cancel(
... chat_id=message.chat.id,
... message_id=message.message_id
... )
"""
key = (chat_id, message_id)
if key in self.tasks:
task_data = self.tasks[key]
# Отменяем asyncio task
if task_data.task and not task_data.task.done():
task_data.task.cancel()
# Удаляем из активных
self.tasks.pop(key)
if log:
logger.debug(
f"Автоудаление отменено для сообщения {message_id}",
log_type='AUTO_DELETE'
)
return True
return False
def cancel_all(self, chat_id: Optional[int] = None) -> int:
"""
Отменяет все запланированные удаления.
Args:
chat_id: ID чата (если None, отменяет для всех чатов)
Returns:
int: Количество отмененных задач
Example:
>> # Отменить для всех чатов
>> count = auto_delete_manager.cancel_all()
>> # Отменить для конкретного чата
>> count = auto_delete_manager.cancel_all(chat_id=message.chat.id)
"""
cancelled_count = 0
# Собираем ключи для отмены
keys_to_cancel = []
for key, task_data in self.tasks.items():
if chat_id is None or task_data.chat_id == chat_id:
keys_to_cancel.append(key)
# Отменяем
for key in keys_to_cancel:
if self.cancel(key[0], key[1], log=False):
cancelled_count += 1
if cancelled_count > 0:
logger.info(
f"Отменено {cancelled_count} задач автоудаления",
log_type='AUTO_DELETE'
)
return cancelled_count
def get_task(
self,
chat_id: int,
message_id: int
) -> Optional[DeleteTask]:
"""
Получает задачу по ID чата и сообщения.
Args:
chat_id: ID чата
message_id: ID сообщения
Returns:
Optional[DeleteTask]: Задача или None
"""
key = (chat_id, message_id)
return self.tasks.get(key)
def get_chat_tasks(self, chat_id: int) -> list[DeleteTask]:
"""
Получает все задачи для чата.
Args:
chat_id: ID чата
Returns:
list[DeleteTask]: Список задач
"""
return [
task for task in self.tasks.values()
if task.chat_id == chat_id
]
def get_stats(self) -> Dict[str, Any]:
"""
Возвращает статистику менеджера.
Returns:
Dict: Словарь со статистикой
Example:
>> stats = auto_delete_manager.get_stats()
>> print(f"Активных задач: {stats['active_tasks']}")
"""
return {
'active_tasks': len(self.tasks),
'completed_tasks': len(self.completed),
'total_scheduled': self.total_scheduled,
'total_deleted': self.total_deleted,
'total_failed': self.total_failed,
'total_cancelled': self.total_cancelled,
'success_rate': (
f"{(self.total_deleted / self.total_scheduled * 100):.1f}%"
if self.total_scheduled > 0 else "0%"
)
}
def cleanup_expired(self) -> int:
"""
Удаляет истекшие задачи (которые должны были выполниться, но не выполнились).
Returns:
int: Количество удаленных задач
"""
expired_keys = [
key for key, task in self.tasks.items()
if task.is_expired and (not task.task or task.task.done())
]
for key in expired_keys:
self.tasks.pop(key)
return len(expired_keys)
# Глобальный менеджер
auto_delete_manager = AutoDeleteManager()
# ================= УДОБНЫЕ ФУНКЦИИ =================
async def auto_delete_message(
bot: Bot,
chat_id: int,
message_id: int,
delay: int = 604800,
reason: Optional[str] = None
) -> DeleteTask:
"""
Автоматически удаляет сообщение через указанное время.
Args:
bot: Экземпляр бота
chat_id: ID чата
message_id: ID сообщения
delay: Задержка в секундах (по умолчанию 7 дней)
reason: Причина удаления
Returns:
DeleteTask: Созданная задача
Example:
>> # Удалить через 1 минуту
>> await auto_delete_message(bot, chat_id, message_id, delay=60)
>> # Удалить через 7 дней (по умолчанию)
>> await auto_delete_message(bot, chat_id, message_id)
"""
return await auto_delete_manager.schedule(
bot=bot,
chat_id=chat_id,
message_id=message_id,
delay=delay,
reason=reason
)
async def schedule_delete(
message: Message,
delay: int,
reason: Optional[str] = None
) -> DeleteTask:
"""
Планирует удаление сообщения (упрощенная версия).
Args:
message: Объект сообщения
delay: Задержка в секундах
reason: Причина удаления
Returns:
DeleteTask: Созданная задача
Example:
>> # Планируем удаление через 30 секунд
>> await schedule_delete(message, delay=30, reason="Временное")
"""
return await auto_delete_manager.schedule(
bot=message.bot,
chat_id=message.chat.id,
message_id=message.message_id,
delay=delay,
reason=reason
)
def cancel_delete(message: Message) -> bool:
"""
Отменяет запланированное удаление сообщения.
Args:
message: Объект сообщения
Returns:
bool: True если удаление было отменено
Example:
>> if cancel_delete(message):
... await message.answer("Удаление отменено")
"""
return auto_delete_manager.cancel(
chat_id=message.chat.id,
message_id=message.message_id
)
async def delete_after(
message: Message,
text: str,
delay: int = 10,
**kwargs
) -> Message:
"""
Отправляет сообщение и автоматически удаляет его через указанное время.
Args:
message: Исходное сообщение
text: Текст нового сообщения
delay: Задержка до удаления в секундах
**kwargs: Дополнительные параметры для message.answer()
Returns:
Message: Отправленное сообщение
Example:
>> # Отправить и удалить через 10 секунд
>> await delete_after(message, "Это временное сообщение")
>> # Отправить и удалить через 5 секунд
>> await delete_after(
... message,
... "⚠️ Ошибка!",
... delay=5,
... parse_mode="HTML"
... )
"""
sent_message = await message.answer(text, **kwargs)
await auto_delete_manager.schedule(
bot=message.bot,
chat_id=sent_message.chat.id,
message_id=sent_message.message_id,
delay=delay,
reason="delete_after"
)
return sent_message
async def delete_both_after(
original: Message,
reply_text: str,
delay: int = 10,
**kwargs
) -> Message:
"""
Отправляет ответ и удаляет оба сообщения через указанное время.
Args:
original: Исходное сообщение
reply_text: Текст ответа
delay: Задержка до удаления
**kwargs: Дополнительные параметры
Returns:
Message: Отправленное сообщение
Example:
>> # Удалить и команду, и ответ через 5 секунд
>> await delete_both_after(
... message,
... "✅ Команда выполнена",
... delay=5
... )
"""
# Отправляем ответ
sent = await delete_after(original, reply_text, delay, **kwargs)
# Планируем удаление оригинала
await auto_delete_manager.schedule(
bot=original.bot,
chat_id=original.chat.id,
message_id=original.message_id,
delay=delay,
reason="delete_both"
)
return sent
async def delete_messages_after(
bot: Bot,
chat_id: int,
message_ids: list[int],
delay: int
) -> int:
"""
Планирует удаление нескольких сообщений.
Args:
bot: Экземпляр бота
chat_id: ID чата
message_ids: Список ID сообщений
delay: Задержка до удаления
Returns:
int: Количество запланированных удалений
Example:
>> # Удалить все сообщения через 1 час
>> count = await delete_messages_after(
... bot,
... chat_id,
... [123, 124, 125, 126],
... delay=3600
... )
"""
count = 0
for message_id in message_ids:
await auto_delete_manager.schedule(
bot=bot,
chat_id=chat_id,
message_id=message_id,
delay=delay,
reason="mass_delete",
log=False
)
count += 1
logger.info(
f"Запланировано удаление {count} сообщений через {format_duration(delay)}",
log_type='AUTO_DELETE'
)
return count