diff --git a/session_bot/bot.py b/session_bot/bot.py
index 0419bf8..a3f5045 100644
--- a/session_bot/bot.py
+++ b/session_bot/bot.py
@@ -16,6 +16,7 @@ from aiogram.types import CallbackQuery, LinkPreviewOptions, Message
from aiogram.utils.keyboard import InlineKeyboardBuilder
from session_bot.config import load_actor_config, load_settings
+from session_bot.html_entities import html_to_text_entities
from session_bot.render import build_channel_text
from session_bot.storage import JsonStateStorage
@@ -220,13 +221,14 @@ async def update_channel_post(bot: Bot, app_config: dict, state_storage: JsonSta
show_above_text=True,
)
- text = build_channel_text(app_config, state)
+ html_text = build_channel_text(app_config, state)
+ text, entities = html_to_text_entities(html_text)
try:
await bot.edit_message_text(
chat_id=settings.channel_id,
message_id=settings.channel_message_id,
text=text,
- parse_mode=ParseMode.HTML,
+ entities=entities,
link_preview_options=link_preview_options,
)
except TelegramBadRequest as exc:
@@ -241,12 +243,13 @@ async def update_channel_post(bot: Bot, app_config: dict, state_storage: JsonSta
fallback_state = dict(state)
fallback_state["template"] = {"text": sanitize_template_html(template)}
- fallback_text = build_channel_text(app_config, fallback_state)
+ fallback_html = build_channel_text(app_config, fallback_state)
+ fallback_text, fallback_entities = html_to_text_entities(fallback_html)
await bot.edit_message_text(
chat_id=settings.channel_id,
message_id=settings.channel_message_id,
text=fallback_text,
- parse_mode=ParseMode.HTML,
+ entities=fallback_entities,
link_preview_options=link_preview_options,
)
@@ -285,7 +288,8 @@ async def send_test_post(bot: Bot, chat_id: int, app_config: dict, state_storage
if normalized_template != state["template"]["text"]:
state["template"]["text"] = normalized_template
state_storage.save(state)
- text = build_channel_text(app_config, state)
+ html_text = build_channel_text(app_config, state)
+ text, entities = html_to_text_entities(html_text)
link_preview_options = None
if app_config.get("hidden_link_url", "").strip():
link_preview_options = LinkPreviewOptions(
@@ -296,7 +300,7 @@ async def send_test_post(bot: Bot, chat_id: int, app_config: dict, state_storage
await bot.send_message(
chat_id=chat_id,
text=text,
- parse_mode=ParseMode.HTML,
+ entities=entities,
link_preview_options=link_preview_options,
)
diff --git a/session_bot/html_entities.py b/session_bot/html_entities.py
new file mode 100644
index 0000000..cfb2b7f
--- /dev/null
+++ b/session_bot/html_entities.py
@@ -0,0 +1,94 @@
+from __future__ import annotations
+
+from html.parser import HTMLParser
+
+from aiogram.types import MessageEntity
+
+
+def utf16_len(value: str) -> int:
+ return len(value.encode("utf-16-le")) // 2
+
+
+class TelegramHTMLParser(HTMLParser):
+ def __init__(self) -> None:
+ super().__init__(convert_charrefs=True)
+ self.text_parts: list[str] = []
+ self.entities: list[MessageEntity] = []
+ self.stack: list[dict] = []
+
+ @property
+ def text(self) -> str:
+ return "".join(self.text_parts)
+
+ @property
+ def entity_offset(self) -> int:
+ return utf16_len(self.text)
+
+ def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None:
+ attr_map = {key: value for key, value in attrs if value is not None}
+ entry = {"tag": tag, "offset": self.entity_offset, "attrs": attr_map}
+ self.stack.append(entry)
+
+ def handle_endtag(self, tag: str) -> None:
+ for index in range(len(self.stack) - 1, -1, -1):
+ entry = self.stack[index]
+ if entry["tag"] != tag:
+ continue
+ self.stack.pop(index)
+ self._emit_entity(tag, entry["offset"], self.entity_offset - entry["offset"], entry["attrs"])
+ return
+
+ def handle_data(self, data: str) -> None:
+ self.text_parts.append(data)
+
+ def _emit_entity(self, tag: str, offset: int, length: int, attrs: dict[str, str]) -> None:
+ if length <= 0 and tag != "tg-emoji":
+ return
+
+ entity_type = None
+ kwargs: dict[str, str] = {}
+
+ if tag in {"b", "strong"}:
+ entity_type = "bold"
+ elif tag in {"i", "em"}:
+ entity_type = "italic"
+ elif tag == "u":
+ entity_type = "underline"
+ elif tag in {"s", "strike", "del"}:
+ entity_type = "strikethrough"
+ elif tag == "code":
+ entity_type = "code"
+ elif tag == "pre":
+ entity_type = "pre"
+ elif tag == "a":
+ entity_type = "text_link"
+ kwargs["url"] = attrs["href"]
+ elif tag == "tg-spoiler":
+ entity_type = "spoiler"
+ elif tag == "blockquote":
+ entity_type = "blockquote"
+ elif tag == "tg-emoji":
+ entity_type = "custom_emoji"
+ kwargs["custom_emoji_id"] = attrs.get("emoji-id") or attrs.get("emoji_id", "")
+ if length <= 0:
+ return
+
+ if entity_type is None:
+ return
+
+ self.entities.append(
+ MessageEntity(
+ type=entity_type,
+ offset=offset,
+ length=length,
+ **kwargs,
+ )
+ )
+
+
+def html_to_text_entities(html: str) -> tuple[str, list[MessageEntity]]:
+ parser = TelegramHTMLParser()
+ parser.feed(html)
+ parser.close()
+ entities = sorted(parser.entities, key=lambda item: (item.offset, -item.length))
+ return parser.text, entities
diff --git a/tests/test_render.py b/tests/test_render.py
index 2a56f78..d060d3f 100644
--- a/tests/test_render.py
+++ b/tests/test_render.py
@@ -1,4 +1,5 @@
from session_bot.render import build_channel_text
+from session_bot.html_entities import html_to_text_entities
def test_build_channel_text_includes_phrase() -> None:
@@ -111,3 +112,13 @@ def test_plain_links_are_converted_to_html() -> None:
text = build_channel_text(config, {"actors": {}})
assert 'rules' in text
+
+
+def test_html_to_text_entities_supports_custom_emoji_and_links() -> None:
+ text, entities = html_to_text_entities(
+ '🌟 rules'
+ )
+
+ assert text == "🌟 rules"
+ assert any(entity.type == "custom_emoji" and entity.custom_emoji_id == "123" for entity in entities)
+ assert any(entity.type == "text_link" and entity.url == "https://example.com" for entity in entities)