42авфаыв
This commit is contained in:
@@ -16,6 +16,7 @@ from aiogram.types import CallbackQuery, LinkPreviewOptions, Message
|
||||
from aiogram.utils.keyboard import InlineKeyboardBuilder
|
||||
|
||||
from session_bot.config import load_actor_config, load_settings
|
||||
from session_bot.html_entities import html_to_text_entities
|
||||
from session_bot.render import build_channel_text
|
||||
from session_bot.storage import JsonStateStorage
|
||||
|
||||
@@ -220,13 +221,14 @@ async def update_channel_post(bot: Bot, app_config: dict, state_storage: JsonSta
|
||||
show_above_text=True,
|
||||
)
|
||||
|
||||
text = build_channel_text(app_config, state)
|
||||
html_text = build_channel_text(app_config, state)
|
||||
text, entities = html_to_text_entities(html_text)
|
||||
try:
|
||||
await bot.edit_message_text(
|
||||
chat_id=settings.channel_id,
|
||||
message_id=settings.channel_message_id,
|
||||
text=text,
|
||||
parse_mode=ParseMode.HTML,
|
||||
entities=entities,
|
||||
link_preview_options=link_preview_options,
|
||||
)
|
||||
except TelegramBadRequest as exc:
|
||||
@@ -241,12 +243,13 @@ async def update_channel_post(bot: Bot, app_config: dict, state_storage: JsonSta
|
||||
|
||||
fallback_state = dict(state)
|
||||
fallback_state["template"] = {"text": sanitize_template_html(template)}
|
||||
fallback_text = build_channel_text(app_config, fallback_state)
|
||||
fallback_html = build_channel_text(app_config, fallback_state)
|
||||
fallback_text, fallback_entities = html_to_text_entities(fallback_html)
|
||||
await bot.edit_message_text(
|
||||
chat_id=settings.channel_id,
|
||||
message_id=settings.channel_message_id,
|
||||
text=fallback_text,
|
||||
parse_mode=ParseMode.HTML,
|
||||
entities=fallback_entities,
|
||||
link_preview_options=link_preview_options,
|
||||
)
|
||||
|
||||
@@ -285,7 +288,8 @@ async def send_test_post(bot: Bot, chat_id: int, app_config: dict, state_storage
|
||||
if normalized_template != state["template"]["text"]:
|
||||
state["template"]["text"] = normalized_template
|
||||
state_storage.save(state)
|
||||
text = build_channel_text(app_config, state)
|
||||
html_text = build_channel_text(app_config, state)
|
||||
text, entities = html_to_text_entities(html_text)
|
||||
link_preview_options = None
|
||||
if app_config.get("hidden_link_url", "").strip():
|
||||
link_preview_options = LinkPreviewOptions(
|
||||
@@ -296,7 +300,7 @@ async def send_test_post(bot: Bot, chat_id: int, app_config: dict, state_storage
|
||||
await bot.send_message(
|
||||
chat_id=chat_id,
|
||||
text=text,
|
||||
parse_mode=ParseMode.HTML,
|
||||
entities=entities,
|
||||
link_preview_options=link_preview_options,
|
||||
)
|
||||
|
||||
|
||||
94
session_bot/html_entities.py
Normal file
94
session_bot/html_entities.py
Normal file
@@ -0,0 +1,94 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from html.parser import HTMLParser
|
||||
|
||||
from aiogram.types import MessageEntity
|
||||
|
||||
|
||||
def utf16_len(value: str) -> int:
|
||||
return len(value.encode("utf-16-le")) // 2
|
||||
|
||||
|
||||
class TelegramHTMLParser(HTMLParser):
|
||||
def __init__(self) -> None:
|
||||
super().__init__(convert_charrefs=True)
|
||||
self.text_parts: list[str] = []
|
||||
self.entities: list[MessageEntity] = []
|
||||
self.stack: list[dict] = []
|
||||
|
||||
@property
|
||||
def text(self) -> str:
|
||||
return "".join(self.text_parts)
|
||||
|
||||
@property
|
||||
def entity_offset(self) -> int:
|
||||
return utf16_len(self.text)
|
||||
|
||||
def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None:
|
||||
attr_map = {key: value for key, value in attrs if value is not None}
|
||||
entry = {"tag": tag, "offset": self.entity_offset, "attrs": attr_map}
|
||||
self.stack.append(entry)
|
||||
|
||||
def handle_endtag(self, tag: str) -> None:
|
||||
for index in range(len(self.stack) - 1, -1, -1):
|
||||
entry = self.stack[index]
|
||||
if entry["tag"] != tag:
|
||||
continue
|
||||
self.stack.pop(index)
|
||||
self._emit_entity(tag, entry["offset"], self.entity_offset - entry["offset"], entry["attrs"])
|
||||
return
|
||||
|
||||
def handle_data(self, data: str) -> None:
|
||||
self.text_parts.append(data)
|
||||
|
||||
def _emit_entity(self, tag: str, offset: int, length: int, attrs: dict[str, str]) -> None:
|
||||
if length <= 0 and tag != "tg-emoji":
|
||||
return
|
||||
|
||||
entity_type = None
|
||||
kwargs: dict[str, str] = {}
|
||||
|
||||
if tag in {"b", "strong"}:
|
||||
entity_type = "bold"
|
||||
elif tag in {"i", "em"}:
|
||||
entity_type = "italic"
|
||||
elif tag == "u":
|
||||
entity_type = "underline"
|
||||
elif tag in {"s", "strike", "del"}:
|
||||
entity_type = "strikethrough"
|
||||
elif tag == "code":
|
||||
entity_type = "code"
|
||||
elif tag == "pre":
|
||||
entity_type = "pre"
|
||||
elif tag == "a":
|
||||
entity_type = "text_link"
|
||||
kwargs["url"] = attrs["href"]
|
||||
elif tag == "tg-spoiler":
|
||||
entity_type = "spoiler"
|
||||
elif tag == "blockquote":
|
||||
entity_type = "blockquote"
|
||||
elif tag == "tg-emoji":
|
||||
entity_type = "custom_emoji"
|
||||
kwargs["custom_emoji_id"] = attrs.get("emoji-id") or attrs.get("emoji_id", "")
|
||||
if length <= 0:
|
||||
return
|
||||
|
||||
if entity_type is None:
|
||||
return
|
||||
|
||||
self.entities.append(
|
||||
MessageEntity(
|
||||
type=entity_type,
|
||||
offset=offset,
|
||||
length=length,
|
||||
**kwargs,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def html_to_text_entities(html: str) -> tuple[str, list[MessageEntity]]:
|
||||
parser = TelegramHTMLParser()
|
||||
parser.feed(html)
|
||||
parser.close()
|
||||
entities = sorted(parser.entities, key=lambda item: (item.offset, -item.length))
|
||||
return parser.text, entities
|
||||
Reference in New Issue
Block a user