Фильтр для првоерки содержимого сообщения

This commit is contained in:
2026-02-23 14:39:18 +07:00
parent c74732cbd4
commit cd7d6512dd

395
bot/filters/msg_content.py Normal file
View File

@@ -0,0 +1,395 @@
"""
Фильтры для проверки содержимого сообщений
"""
import re
from typing import Optional, Union
from aiogram.filters import BaseFilter
from aiogram.types import Message, ContentType
from middleware.loggers import logger
__all__ = (
'IsReply',
'IsForwarded',
'HasMedia',
'ContainsURL',
'HasText',
'HasCaption',
'HasEntities',
'MediaType'
)
class IsReply(BaseFilter):
"""
Проверяет, является ли сообщение ответом на другое сообщение.
Example:
```python
@router.message(IsReply())
async def handle_reply(message: Message):
original = message.reply_to_message
await message.answer(f"Это ответ на: {original.text}")
```
"""
async def __call__(self, message: Message) -> Union[bool, dict]:
is_reply = message.reply_to_message is not None
if is_reply:
return {
'is_reply': True,
'reply_to_message': message.reply_to_message,
'reply_to_user_id': message.reply_to_message.from_user.id if message.reply_to_message.from_user else None
}
return False
class IsForwarded(BaseFilter):
"""
Проверяет, является ли сообщение пересланным.
Поддерживает:
- Пересылку от пользователей (forward_from)
- Пересылку из каналов/групп (forward_from_chat)
- Скрытую пересылку (forward_sender_name)
Example:
```python
@router.message(IsForwarded())
async def handle_forwarded(message: Message, forward_info: dict):
await message.answer(f"Переслано из: {forward_info['origin']}")
```
"""
async def __call__(self, message: Message) -> Union[bool, dict]:
# Проверка различных типов пересылки
is_forwarded = (
message.forward_origin is not None or # Новый API (aiogram 3.x)
message.forward_from is not None or
message.forward_from_chat is not None or
message.forward_sender_name is not None
)
if is_forwarded:
origin = "неизвестно"
if message.forward_from:
origin = f"пользователь @{message.forward_from.username or message.forward_from.id}"
elif message.forward_from_chat:
origin = f"чат {message.forward_from_chat.title or message.forward_from_chat.id}"
elif message.forward_sender_name:
origin = f"скрытый пользователь ({message.forward_sender_name})"
logger.debug(
f"Обнаружено пересланное сообщение из: {origin}",
log_type='FORWARD',
message=message
)
return {
'is_forwarded': True,
'origin': origin,
'forward_date': message.forward_date
}
return False
class HasMedia(BaseFilter):
"""
Проверяет, содержит ли сообщение медиа-контент.
Attributes:
media_types: Список типов медиа для проверки (если None, проверяются все)
Example:
```python
# Любое медиа
@router.message(HasMedia())
async def handle_media(message: Message):
await message.answer("Получено медиа!")
# Только фото и видео
@router.message(HasMedia(['photo', 'video']))
async def handle_visual(message: Message):
await message.answer("Фото или видео!")
```
"""
def __init__(self, media_types: Optional[list[str]] = None):
"""
Args:
media_types: Список типов медиа ('photo', 'video', 'document', и т.д.)
Если None, проверяются все типы
"""
self.media_types = media_types
async def __call__(self, message: Message) -> Union[bool, dict]:
# Все возможные типы медиа
media_checks = {
'photo': message.photo,
'video': message.video,
'document': message.document,
'audio': message.audio,
'voice': message.voice,
'video_note': message.video_note,
'sticker': message.sticker,
'animation': message.animation,
}
# Если указаны конкретные типы, проверяем только их
if self.media_types:
has_media = any(
media_checks[media_type]
for media_type in self.media_types
if media_type in media_checks
)
detected_type = next(
(media_type for media_type in self.media_types if media_checks.get(media_type)),
None
)
else:
# Проверяем все типы
has_media = any(media_checks.values())
detected_type = next(
(media_type for media_type, value in media_checks.items() if value),
None
)
if has_media:
return {
'has_media': True,
'media_type': detected_type,
'content': media_checks[detected_type]
}
return False
class ContainsURL(BaseFilter):
"""
Проверяет, содержит ли сообщение ссылки.
Поддерживает:
- HTTP/HTTPS ссылки
- Telegram ссылки (t.me, tg://)
- Проверку через entities (более точная)
Attributes:
strict: Использовать строгую проверку через entities
Example:
```python
@router.message(ContainsURL())
async def handle_url(message: Message, url_info: dict):
urls = url_info['urls']
await message.answer(f"Обнаружено {len(urls)} ссылок")
```
"""
def __init__(self, strict: bool = False):
"""
Args:
strict: Если True, проверяет через entities (игнорирует текст в коде/pre)
"""
self.strict = strict
# Паттерн для поиска URL
self.url_pattern = re.compile(
r'https?://[^\s]+|' # http(s)://
r't\.me/[^\s]+|' # t.me/
r'tg://[^\s]+', # tg://
re.IGNORECASE
)
async def __call__(self, message: Message) -> Union[bool, dict]:
if not message.text and not message.caption:
return False
text = message.text or message.caption
if self.strict and message.entities:
# Строгая проверка через entities
url_entities = [
entity for entity in message.entities
if entity.type in ('url', 'text_link')
]
if url_entities:
urls = []
for entity in url_entities:
if entity.type == 'url':
url = text[entity.offset:entity.offset + entity.length]
urls.append(url)
elif entity.type == 'text_link':
urls.append(entity.url)
return {
'contains_url': True,
'urls': urls,
'url_count': len(urls)
}
else:
# Простая проверка через regex
urls = self.url_pattern.findall(text)
if urls:
return {
'contains_url': True,
'urls': urls,
'url_count': len(urls)
}
return False
class HasText(BaseFilter):
"""
Проверяет, содержит ли сообщение текст.
Attributes:
min_length: Минимальная длина текста (по умолчанию 1)
max_length: Максимальная длина текста (по умолчанию None)
Example:
```python
# Любой текст
@router.message(HasText())
async def handle_text(message: Message):
await message.answer("Получен текст!")
# Текст от 10 до 100 символов
@router.message(HasText(min_length=10, max_length=100))
async def handle_medium_text(message: Message):
await message.answer("Текст подходящей длины!")
```
"""
def __init__(self, min_length: int = 1, max_length: Optional[int] = None):
self.min_length = min_length
self.max_length = max_length
async def __call__(self, message: Message) -> Union[bool, dict]:
if not message.text:
return False
text_length = len(message.text)
# Проверка длины
if text_length < self.min_length:
return False
if self.max_length and text_length > self.max_length:
return False
return {
'has_text': True,
'text_length': text_length,
'text': message.text
}
class HasCaption(BaseFilter):
"""
Проверяет, есть ли у медиа подпись.
Example:
```python
@router.message(HasCaption())
async def handle_caption(message: Message):
await message.answer(f"Подпись: {message.caption}")
```
"""
async def __call__(self, message: Message) -> Union[bool, dict]:
if message.caption:
return {
'has_caption': True,
'caption': message.caption,
'caption_length': len(message.caption)
}
return False
class HasEntities(BaseFilter):
"""
Проверяет наличие entities (упоминания, хештеги, команды и т.д.).
Attributes:
entity_types: Список типов entities для проверки
Example:
```python
# Любые entities
@router.message(HasEntities())
async def handle_entities(message: Message):
pass
# Только упоминания и хештеги
@router.message(HasEntities(['mention', 'hashtag']))
async def handle_mentions(message: Message):
pass
```
"""
def __init__(self, entity_types: Optional[list[str]] = None):
"""
Args:
entity_types: Список типов ('mention', 'hashtag', 'bot_command', и т.д.)
"""
self.entity_types = entity_types
async def __call__(self, message: Message) -> Union[bool, dict]:
if not message.entities:
return False
if self.entity_types:
# Фильтруем по типам
matching_entities = [
entity for entity in message.entities
if entity.type in self.entity_types
]
if matching_entities:
return {
'has_entities': True,
'entities': matching_entities,
'entity_count': len(matching_entities)
}
else:
# Любые entities
return {
'has_entities': True,
'entities': message.entities,
'entity_count': len(message.entities)
}
return False
class MediaType(BaseFilter):
"""
Проверяет точный тип контента сообщения.
Attributes:
content_type: Тип контента из ContentType enum
Example:
```python
@router.message(MediaType(ContentType.PHOTO))
async def handle_photo(message: Message):
await message.answer("Это фото!")
```
"""
def __init__(self, content_type: Union[ContentType, str]):
"""
Args:
content_type: Тип контента (ContentType enum или строка)
"""
self.content_type = content_type if isinstance(content_type, str) else content_type.value
async def __call__(self, message: Message) -> bool:
return message.content_type == self.content_type