Files
GlitchupBot/src/glitchup_bot/services/sync_service.py
Verum ea4a6fbe38
Some checks failed
CI / Lint (ruff + mypy) (push) Failing after 35s
CI / Run tests (push) Has been skipped
CI / Docker build test (push) Successful in 18s
Упрощение для балбесов
2026-03-30 18:26:49 +07:00

259 lines
8.7 KiB
Python

import logging
from asyncio import Lock, wait_for
from collections import defaultdict
from dataclasses import dataclass
from datetime import UTC, datetime
from typing import Any
from sqlalchemy import select
from glitchup_bot.config import settings
from glitchup_bot.glitchtip_client.client import get_glitchtip_client
from glitchup_bot.models.database import get_session_factory
from glitchup_bot.models.issues import IssueCache
from glitchup_bot.models.sync import SyncState
logger = logging.getLogger(__name__)
_sync_lock = Lock()
@dataclass(slots=True)
class IssueSnapshot:
issue_id: int
project_slug: str
title: str
culprit: str | None
level: str
status: str
first_seen: datetime | None
last_seen: datetime | None
event_count: int
is_regression: bool
link: str | None
release: str | None
@dataclass(slots=True)
class SyncSummary:
project_count: int
issue_count: int
resolved_count: int
synced_at: datetime
def _configured_project_slugs() -> list[str]:
return settings.backend_projects + settings.frontend_projects
def _parse_dt(value: str | None) -> datetime | None:
if not value:
return None
return datetime.fromisoformat(value.replace("Z", "+00:00"))
def _extract_release(issue: dict[str, Any]) -> str | None:
direct = issue.get("lastRelease") or issue.get("release") or issue.get("releaseName")
if isinstance(direct, str) and direct.strip():
return direct.strip()
if isinstance(direct, dict):
for key in ("version", "shortVersion", "name"):
value = direct.get(key)
if isinstance(value, str) and value.strip():
return value.strip()
tags = issue.get("tags")
if isinstance(tags, list):
for tag in tags:
if not isinstance(tag, dict):
continue
if tag.get("key") != "release":
continue
value = tag.get("value")
if isinstance(value, str) and value.strip():
return value.strip()
return None
def _normalize_issue(project_slug: str, issue: dict[str, Any]) -> IssueSnapshot:
issue_id = int(issue["id"])
return IssueSnapshot(
issue_id=issue_id,
project_slug=project_slug,
title=issue.get("title") or "unknown",
culprit=issue.get("culprit"),
level=(issue.get("level") or "error").lower(),
status=(issue.get("status") or "unresolved").lower(),
first_seen=_parse_dt(issue.get("firstSeen")),
last_seen=_parse_dt(issue.get("lastSeen")),
event_count=int(issue.get("count") or 0),
is_regression=bool(issue.get("isRegression")),
link=issue.get("permalink") or issue.get("link"),
release=_extract_release(issue),
)
async def mark_sync_success(source: str) -> None:
now = datetime.now(UTC)
async with get_session_factory()() as session:
result = await session.execute(select(SyncState).where(SyncState.source == source))
record = result.scalar_one_or_none()
if record is None:
record = SyncState(source=source, last_successful_at=now)
session.add(record)
else:
record.last_successful_at = now
await session.commit()
async def sync_issues(project_slugs: list[str] | None = None) -> SyncSummary:
async with _sync_lock:
slugs = project_slugs or _configured_project_slugs()
client = get_glitchtip_client()
snapshots: list[IssueSnapshot] = []
for slug in slugs:
issues = await client.list_issues(slug)
snapshots.extend(
_normalize_issue(slug, issue) for issue in issues if issue.get("id") is not None
)
issue_ids_by_slug: dict[str, set[int]] = defaultdict(set)
for snapshot in snapshots:
issue_ids_by_slug[snapshot.project_slug].add(snapshot.issue_id)
now = datetime.now(UTC)
resolved_count = 0
async with get_session_factory()() as session:
existing_rows = (
await session.execute(select(IssueCache).where(IssueCache.project_slug.in_(slugs)))
).scalars()
existing_by_id = {row.glitchtip_issue_id: row for row in existing_rows}
for snapshot in snapshots:
row = existing_by_id.get(snapshot.issue_id)
if row is None:
row = IssueCache(
glitchtip_issue_id=snapshot.issue_id,
project_slug=snapshot.project_slug,
title=snapshot.title,
culprit=snapshot.culprit,
level=snapshot.level,
status=snapshot.status,
first_seen=snapshot.first_seen,
last_seen=snapshot.last_seen,
event_count=snapshot.event_count,
is_regression=snapshot.is_regression,
link=snapshot.link,
release=snapshot.release,
)
session.add(row)
continue
row.project_slug = snapshot.project_slug
row.title = snapshot.title
row.culprit = snapshot.culprit
row.level = snapshot.level
row.status = snapshot.status
row.first_seen = snapshot.first_seen
row.last_seen = snapshot.last_seen
row.event_count = snapshot.event_count
row.is_regression = snapshot.is_regression
row.link = snapshot.link
row.release = snapshot.release
row.updated_at = now
for row in existing_by_id.values():
if row.glitchtip_issue_id in issue_ids_by_slug[row.project_slug]:
continue
if row.status != "resolved":
row.status = "resolved"
row.updated_at = now
resolved_count += 1
result = await session.execute(select(SyncState).where(SyncState.source == "api_sync"))
state = result.scalar_one_or_none()
if state is None:
state = SyncState(source="api_sync", last_successful_at=now)
session.add(state)
else:
state.last_successful_at = now
await session.commit()
return SyncSummary(
project_count=len(slugs),
issue_count=len(snapshots),
resolved_count=resolved_count,
synced_at=now,
)
async def warm_issue_cache_on_startup(timeout_seconds: int = 180) -> bool:
logger.info("Starting startup cache warmup")
try:
summary = await wait_for(sync_issues(), timeout=timeout_seconds)
logger.info(
"Startup cache warmup finished: %s projects, %s issues, %s resolved",
summary.project_count,
summary.issue_count,
summary.resolved_count,
)
return True
except TimeoutError:
logger.warning(
"Startup cache warmup timed out after %s seconds; continuing with cached data",
timeout_seconds,
)
return False
except Exception:
logger.exception("Startup cache warmup failed; continuing with cached data")
return False
async def load_issue_snapshots(
project_slugs: list[str] | None = None,
*,
refresh: bool = True,
unresolved_only: bool = True,
) -> list[IssueSnapshot]:
slugs = project_slugs or _configured_project_slugs()
if refresh:
try:
await sync_issues(slugs)
except Exception:
logger.exception("Issue sync failed, falling back to cached data")
async with get_session_factory()() as session:
stmt = select(IssueCache).where(IssueCache.project_slug.in_(slugs))
if unresolved_only:
stmt = stmt.where(IssueCache.status == "unresolved")
rows = (await session.execute(stmt)).scalars().all()
return [
IssueSnapshot(
issue_id=row.glitchtip_issue_id,
project_slug=row.project_slug,
title=row.title,
culprit=row.culprit,
level=row.level,
status=row.status,
first_seen=row.first_seen,
last_seen=row.last_seen,
event_count=row.event_count,
is_regression=row.is_regression,
link=row.link,
release=row.release,
)
for row in rows
]
async def get_last_sync_state(source: str) -> SyncState | None:
async with get_session_factory()() as session:
result = await session.execute(select(SyncState).where(SyncState.source == source))
return result.scalar_one_or_none()