From cd7d6512ddc431dd6e75302215fcd0a6bbbe7e1c Mon Sep 17 00:00:00 2001 From: Verum Date: Mon, 23 Feb 2026 14:39:18 +0700 Subject: [PATCH] =?UTF-8?q?=D0=A4=D0=B8=D0=BB=D1=8C=D1=82=D1=80=20=D0=B4?= =?UTF-8?q?=D0=BB=D1=8F=20=D0=BF=D1=80=D0=B2=D0=BE=D0=B5=D1=80=D0=BA=D0=B8?= =?UTF-8?q?=20=D1=81=D0=BE=D0=B4=D0=B5=D1=80=D0=B6=D0=B8=D0=BC=D0=BE=D0=B3?= =?UTF-8?q?=D0=BE=20=D1=81=D0=BE=D0=BE=D0=B1=D1=89=D0=B5=D0=BD=D0=B8=D1=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bot/filters/msg_content.py | 395 +++++++++++++++++++++++++++++++++++++ 1 file changed, 395 insertions(+) create mode 100644 bot/filters/msg_content.py diff --git a/bot/filters/msg_content.py b/bot/filters/msg_content.py new file mode 100644 index 0000000..548e8d5 --- /dev/null +++ b/bot/filters/msg_content.py @@ -0,0 +1,395 @@ +""" +Фильтры для проверки содержимого сообщений +""" +import re +from typing import Optional, Union + +from aiogram.filters import BaseFilter +from aiogram.types import Message, ContentType + +from middleware.loggers import logger + +__all__ = ( + 'IsReply', + 'IsForwarded', + 'HasMedia', + 'ContainsURL', + 'HasText', + 'HasCaption', + 'HasEntities', + 'MediaType' +) + + +class IsReply(BaseFilter): + """ + Проверяет, является ли сообщение ответом на другое сообщение. + + Example: + ```python + @router.message(IsReply()) + async def handle_reply(message: Message): + original = message.reply_to_message + await message.answer(f"Это ответ на: {original.text}") + ``` + """ + + async def __call__(self, message: Message) -> Union[bool, dict]: + is_reply = message.reply_to_message is not None + + if is_reply: + return { + 'is_reply': True, + 'reply_to_message': message.reply_to_message, + 'reply_to_user_id': message.reply_to_message.from_user.id if message.reply_to_message.from_user else None + } + + return False + + +class IsForwarded(BaseFilter): + """ + Проверяет, является ли сообщение пересланным. + + Поддерживает: + - Пересылку от пользователей (forward_from) + - Пересылку из каналов/групп (forward_from_chat) + - Скрытую пересылку (forward_sender_name) + + Example: + ```python + @router.message(IsForwarded()) + async def handle_forwarded(message: Message, forward_info: dict): + await message.answer(f"Переслано из: {forward_info['origin']}") + ``` + """ + + async def __call__(self, message: Message) -> Union[bool, dict]: + # Проверка различных типов пересылки + is_forwarded = ( + message.forward_origin is not None or # Новый API (aiogram 3.x) + message.forward_from is not None or + message.forward_from_chat is not None or + message.forward_sender_name is not None + ) + + if is_forwarded: + origin = "неизвестно" + + if message.forward_from: + origin = f"пользователь @{message.forward_from.username or message.forward_from.id}" + elif message.forward_from_chat: + origin = f"чат {message.forward_from_chat.title or message.forward_from_chat.id}" + elif message.forward_sender_name: + origin = f"скрытый пользователь ({message.forward_sender_name})" + + logger.debug( + f"Обнаружено пересланное сообщение из: {origin}", + log_type='FORWARD', + message=message + ) + + return { + 'is_forwarded': True, + 'origin': origin, + 'forward_date': message.forward_date + } + + return False + + +class HasMedia(BaseFilter): + """ + Проверяет, содержит ли сообщение медиа-контент. + + Attributes: + media_types: Список типов медиа для проверки (если None, проверяются все) + + Example: + ```python + # Любое медиа + @router.message(HasMedia()) + async def handle_media(message: Message): + await message.answer("Получено медиа!") + + # Только фото и видео + @router.message(HasMedia(['photo', 'video'])) + async def handle_visual(message: Message): + await message.answer("Фото или видео!") + ``` + """ + + def __init__(self, media_types: Optional[list[str]] = None): + """ + Args: + media_types: Список типов медиа ('photo', 'video', 'document', и т.д.) + Если None, проверяются все типы + """ + self.media_types = media_types + + async def __call__(self, message: Message) -> Union[bool, dict]: + # Все возможные типы медиа + media_checks = { + 'photo': message.photo, + 'video': message.video, + 'document': message.document, + 'audio': message.audio, + 'voice': message.voice, + 'video_note': message.video_note, + 'sticker': message.sticker, + 'animation': message.animation, + } + + # Если указаны конкретные типы, проверяем только их + if self.media_types: + has_media = any( + media_checks[media_type] + for media_type in self.media_types + if media_type in media_checks + ) + detected_type = next( + (media_type for media_type in self.media_types if media_checks.get(media_type)), + None + ) + else: + # Проверяем все типы + has_media = any(media_checks.values()) + detected_type = next( + (media_type for media_type, value in media_checks.items() if value), + None + ) + + if has_media: + return { + 'has_media': True, + 'media_type': detected_type, + 'content': media_checks[detected_type] + } + + return False + + +class ContainsURL(BaseFilter): + """ + Проверяет, содержит ли сообщение ссылки. + + Поддерживает: + - HTTP/HTTPS ссылки + - Telegram ссылки (t.me, tg://) + - Проверку через entities (более точная) + + Attributes: + strict: Использовать строгую проверку через entities + + Example: + ```python + @router.message(ContainsURL()) + async def handle_url(message: Message, url_info: dict): + urls = url_info['urls'] + await message.answer(f"Обнаружено {len(urls)} ссылок") + ``` + """ + + def __init__(self, strict: bool = False): + """ + Args: + strict: Если True, проверяет через entities (игнорирует текст в коде/pre) + """ + self.strict = strict + # Паттерн для поиска URL + self.url_pattern = re.compile( + r'https?://[^\s]+|' # http(s):// + r't\.me/[^\s]+|' # t.me/ + r'tg://[^\s]+', # tg:// + re.IGNORECASE + ) + + async def __call__(self, message: Message) -> Union[bool, dict]: + if not message.text and not message.caption: + return False + + text = message.text or message.caption + + if self.strict and message.entities: + # Строгая проверка через entities + url_entities = [ + entity for entity in message.entities + if entity.type in ('url', 'text_link') + ] + + if url_entities: + urls = [] + for entity in url_entities: + if entity.type == 'url': + url = text[entity.offset:entity.offset + entity.length] + urls.append(url) + elif entity.type == 'text_link': + urls.append(entity.url) + + return { + 'contains_url': True, + 'urls': urls, + 'url_count': len(urls) + } + else: + # Простая проверка через regex + urls = self.url_pattern.findall(text) + + if urls: + return { + 'contains_url': True, + 'urls': urls, + 'url_count': len(urls) + } + + return False + + +class HasText(BaseFilter): + """ + Проверяет, содержит ли сообщение текст. + + Attributes: + min_length: Минимальная длина текста (по умолчанию 1) + max_length: Максимальная длина текста (по умолчанию None) + + Example: + ```python + # Любой текст + @router.message(HasText()) + async def handle_text(message: Message): + await message.answer("Получен текст!") + + # Текст от 10 до 100 символов + @router.message(HasText(min_length=10, max_length=100)) + async def handle_medium_text(message: Message): + await message.answer("Текст подходящей длины!") + ``` + """ + + def __init__(self, min_length: int = 1, max_length: Optional[int] = None): + self.min_length = min_length + self.max_length = max_length + + async def __call__(self, message: Message) -> Union[bool, dict]: + if not message.text: + return False + + text_length = len(message.text) + + # Проверка длины + if text_length < self.min_length: + return False + + if self.max_length and text_length > self.max_length: + return False + + return { + 'has_text': True, + 'text_length': text_length, + 'text': message.text + } + + +class HasCaption(BaseFilter): + """ + Проверяет, есть ли у медиа подпись. + + Example: + ```python + @router.message(HasCaption()) + async def handle_caption(message: Message): + await message.answer(f"Подпись: {message.caption}") + ``` + """ + + async def __call__(self, message: Message) -> Union[bool, dict]: + if message.caption: + return { + 'has_caption': True, + 'caption': message.caption, + 'caption_length': len(message.caption) + } + return False + + +class HasEntities(BaseFilter): + """ + Проверяет наличие entities (упоминания, хештеги, команды и т.д.). + + Attributes: + entity_types: Список типов entities для проверки + + Example: + ```python + # Любые entities + @router.message(HasEntities()) + async def handle_entities(message: Message): + pass + + # Только упоминания и хештеги + @router.message(HasEntities(['mention', 'hashtag'])) + async def handle_mentions(message: Message): + pass + ``` + """ + + def __init__(self, entity_types: Optional[list[str]] = None): + """ + Args: + entity_types: Список типов ('mention', 'hashtag', 'bot_command', и т.д.) + """ + self.entity_types = entity_types + + async def __call__(self, message: Message) -> Union[bool, dict]: + if not message.entities: + return False + + if self.entity_types: + # Фильтруем по типам + matching_entities = [ + entity for entity in message.entities + if entity.type in self.entity_types + ] + + if matching_entities: + return { + 'has_entities': True, + 'entities': matching_entities, + 'entity_count': len(matching_entities) + } + else: + # Любые entities + return { + 'has_entities': True, + 'entities': message.entities, + 'entity_count': len(message.entities) + } + + return False + + +class MediaType(BaseFilter): + """ + Проверяет точный тип контента сообщения. + + Attributes: + content_type: Тип контента из ContentType enum + + Example: + ```python + @router.message(MediaType(ContentType.PHOTO)) + async def handle_photo(message: Message): + await message.answer("Это фото!") + ``` + """ + + def __init__(self, content_type: Union[ContentType, str]): + """ + Args: + content_type: Тип контента (ContentType enum или строка) + """ + self.content_type = content_type if isinstance(content_type, str) else content_type.value + + async def __call__(self, message: Message) -> bool: + return message.content_type == self.content_type