from __future__ import annotations from dataclasses import dataclass from typing import TYPE_CHECKING, Any from aiogram.types import MessageEntity from session_bot.html_entities import html_to_text_entities from session_bot.render import build_channel_text if TYPE_CHECKING: from session_bot.config import BotSettings from session_bot.storage import JsonStateStorage @dataclass(slots=True) class TelethonPublisher: settings: "BotSettings" _client: Any = None _peer: Any = None @property def enabled(self) -> bool: return self.settings.telethon_enabled async def start(self) -> None: if not self.enabled or self._client is not None: return try: from telethon import TelegramClient from telethon.sessions import StringSession except ImportError as exc: raise RuntimeError( "Telethon is not installed. Add it to the runtime environment to use channel editing via MTProto." ) from exc self._client = TelegramClient( StringSession(self.settings.telethon_session_string), self.settings.telethon_api_id, self.settings.telethon_api_hash, ) await self._client.connect() if not await self._client.is_user_authorized(): raise RuntimeError("Telethon session is not authorized. Generate a valid StringSession first.") async def close(self) -> None: if self._client is None: return await self._client.disconnect() self._client = None self._peer = None async def edit_channel_post(self, app_config: dict, state_storage: "JsonStateStorage", message_id: int) -> None: await self.start() html_text = build_channel_text(app_config, state_storage.load(), include_hidden_link=True) text, bot_entities = html_to_text_entities(html_text) entities = self._convert_entities(bot_entities) peer = await self._resolve_peer() await self._client.edit_message( peer, message_id, text=text, formatting_entities=entities, link_preview=bool(app_config.get("hidden_link_url", "").strip()), ) async def _resolve_peer(self) -> Any: if self._peer is not None: return self._peer if self._client is None: raise RuntimeError("Telethon client is not started.") candidates: list[Any] = [] channel_ref = self.settings.telethon_channel.strip() if channel_ref: candidates.extend(self._candidate_refs(channel_ref)) else: candidates.extend(self._candidate_refs(self.settings.channel_id)) last_error: Exception | None = None seen: set[str] = set() for candidate in candidates: key = repr(candidate) if key in seen: continue seen.add(key) try: self._peer = await self._client.get_entity(candidate) return self._peer except Exception as exc: # pragma: no cover - depends on runtime Telegram session last_error = exc raise RuntimeError( "Unable to resolve TELETHON_CHANNEL. Set TELETHON_CHANNEL to @username, t.me link, or numeric channel id." ) from last_error def _candidate_refs(self, value: str | int) -> list[Any]: candidates: list[Any] = [value] raw = str(value).strip() if not raw: return candidates if raw.lstrip("-").isdigit(): numeric = int(raw) candidates.append(numeric) if raw.startswith("-100"): candidates.append(int(raw[4:])) try: from telethon import utils resolved_id, _ = utils.resolve_id(numeric) candidates.append(resolved_id) except Exception: # pragma: no cover - best effort normalization pass return candidates def _convert_entities(self, entities: list[MessageEntity]) -> list[Any]: try: from telethon.tl import types except ImportError as exc: raise RuntimeError( "Telethon is not installed. Add it to the runtime environment to use channel editing via MTProto." ) from exc converted: list[Any] = [] for entity in entities: item = self._convert_entity(entity, types) if item is not None: converted.append(item) return converted @staticmethod def _convert_entity(entity: MessageEntity, types: Any) -> Any | None: kwargs = {"offset": entity.offset, "length": entity.length} if entity.type == "bold": return types.MessageEntityBold(**kwargs) if entity.type == "italic": return types.MessageEntityItalic(**kwargs) if entity.type == "underline": return types.MessageEntityUnderline(**kwargs) if entity.type == "strikethrough": return types.MessageEntityStrike(**kwargs) if entity.type == "code": return types.MessageEntityCode(**kwargs) if entity.type == "pre": return types.MessageEntityPre(language=entity.language or "", **kwargs) if entity.type == "text_link": return types.MessageEntityTextUrl(url=entity.url or "", **kwargs) if entity.type == "spoiler": return types.MessageEntitySpoiler(**kwargs) if entity.type == "blockquote" and hasattr(types, "MessageEntityBlockquote"): try: return types.MessageEntityBlockquote(collapsed=False, **kwargs) except TypeError: return types.MessageEntityBlockquote(**kwargs) if entity.type == "custom_emoji" and entity.custom_emoji_id: return types.MessageEntityCustomEmoji(document_id=int(entity.custom_emoji_id), **kwargs) return None