Files
Otkritiebot/session_bot/telethon_publisher.py
Verum a13f4e378c
Some checks failed
CI / Lint (ruff + mypy) (push) Failing after 33s
CI / Run tests (push) Has been skipped
CI / Docker build test (push) Successful in 12s
Security / Dependency security scan (push) Failing after 48s
ир
2026-04-03 01:03:52 +07:00

160 lines
5.9 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any
from aiogram.types import MessageEntity
from session_bot.html_entities import html_to_text_entities
from session_bot.render import build_channel_text
if TYPE_CHECKING:
from session_bot.config import BotSettings
from session_bot.storage import JsonStateStorage
@dataclass(slots=True)
class TelethonPublisher:
settings: "BotSettings"
_client: Any = None
_peer: Any = None
@property
def enabled(self) -> bool:
return self.settings.telethon_enabled
async def start(self) -> None:
if not self.enabled or self._client is not None:
return
try:
from telethon import TelegramClient
from telethon.sessions import StringSession
except ImportError as exc:
raise RuntimeError(
"Telethon is not installed. Add it to the runtime environment to use channel editing via MTProto."
) from exc
self._client = TelegramClient(
StringSession(self.settings.telethon_session_string),
self.settings.telethon_api_id,
self.settings.telethon_api_hash,
)
await self._client.connect()
if not await self._client.is_user_authorized():
raise RuntimeError("Telethon session is not authorized. Generate a valid StringSession first.")
async def close(self) -> None:
if self._client is None:
return
await self._client.disconnect()
self._client = None
self._peer = None
async def edit_channel_post(self, app_config: dict, state_storage: "JsonStateStorage", message_id: int) -> None:
await self.start()
html_text = build_channel_text(app_config, state_storage.load(), include_hidden_link=True)
text, bot_entities = html_to_text_entities(html_text)
entities = self._convert_entities(bot_entities)
peer = await self._resolve_peer()
await self._client.edit_message(
peer,
message_id,
text=text,
formatting_entities=entities,
link_preview=bool(app_config.get("hidden_link_url", "").strip()),
)
async def _resolve_peer(self) -> Any:
if self._peer is not None:
return self._peer
if self._client is None:
raise RuntimeError("Telethon client is not started.")
candidates: list[Any] = []
channel_ref = self.settings.telethon_channel.strip()
if channel_ref:
candidates.extend(self._candidate_refs(channel_ref))
else:
candidates.extend(self._candidate_refs(self.settings.channel_id))
last_error: Exception | None = None
seen: set[str] = set()
for candidate in candidates:
key = repr(candidate)
if key in seen:
continue
seen.add(key)
try:
self._peer = await self._client.get_entity(candidate)
return self._peer
except Exception as exc: # pragma: no cover - depends on runtime Telegram session
last_error = exc
raise RuntimeError(
"Unable to resolve TELETHON_CHANNEL. Set TELETHON_CHANNEL to @username, t.me link, or numeric channel id."
) from last_error
def _candidate_refs(self, value: str | int) -> list[Any]:
candidates: list[Any] = [value]
raw = str(value).strip()
if not raw:
return candidates
if raw.lstrip("-").isdigit():
numeric = int(raw)
candidates.append(numeric)
if raw.startswith("-100"):
candidates.append(int(raw[4:]))
try:
from telethon import utils
resolved_id, _ = utils.resolve_id(numeric)
candidates.append(resolved_id)
except Exception: # pragma: no cover - best effort normalization
pass
return candidates
def _convert_entities(self, entities: list[MessageEntity]) -> list[Any]:
try:
from telethon.tl import types
except ImportError as exc:
raise RuntimeError(
"Telethon is not installed. Add it to the runtime environment to use channel editing via MTProto."
) from exc
converted: list[Any] = []
for entity in entities:
item = self._convert_entity(entity, types)
if item is not None:
converted.append(item)
return converted
@staticmethod
def _convert_entity(entity: MessageEntity, types: Any) -> Any | None:
kwargs = {"offset": entity.offset, "length": entity.length}
if entity.type == "bold":
return types.MessageEntityBold(**kwargs)
if entity.type == "italic":
return types.MessageEntityItalic(**kwargs)
if entity.type == "underline":
return types.MessageEntityUnderline(**kwargs)
if entity.type == "strikethrough":
return types.MessageEntityStrike(**kwargs)
if entity.type == "code":
return types.MessageEntityCode(**kwargs)
if entity.type == "pre":
return types.MessageEntityPre(language=entity.language or "", **kwargs)
if entity.type == "text_link":
return types.MessageEntityTextUrl(url=entity.url or "", **kwargs)
if entity.type == "spoiler":
return types.MessageEntitySpoiler(**kwargs)
if entity.type == "blockquote" and hasattr(types, "MessageEntityBlockquote"):
try:
return types.MessageEntityBlockquote(collapsed=False, **kwargs)
except TypeError:
return types.MessageEntityBlockquote(**kwargs)
if entity.type == "custom_emoji" and entity.custom_emoji_id:
return types.MessageEntityCustomEmoji(document_id=int(entity.custom_emoji_id), **kwargs)
return None