initial commit
This commit is contained in:
165
tests/test_alert_processor.py
Normal file
165
tests/test_alert_processor.py
Normal file
@@ -0,0 +1,165 @@
|
||||
import pytest
|
||||
|
||||
from glitchup_bot.services import alert_processor
|
||||
|
||||
|
||||
def make_issue_payload(project: str = "backend-production", color: str = "#e52b50") -> dict:
|
||||
return {
|
||||
"text": "GlitchTip Alert",
|
||||
"attachments": [
|
||||
{
|
||||
"title": "ValueError: boom",
|
||||
"title_link": "https://glitchtip.example.com/issues/1",
|
||||
"text": "app.views.index",
|
||||
"color": color,
|
||||
"fields": [
|
||||
{"title": "Project", "value": project, "short": True},
|
||||
{"title": "Environment", "value": "production", "short": True},
|
||||
],
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
async def _noop(*args, **kwargs):
|
||||
return None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_process_webhook_sends_and_records_notification(monkeypatch):
|
||||
sent_calls = []
|
||||
recorded = []
|
||||
|
||||
async def fake_send_alert(attachment, project_slug, priority, **kwargs):
|
||||
sent_calls.append((attachment.title, project_slug, priority))
|
||||
return "backend"
|
||||
|
||||
async def fake_is_duplicate(fingerprint: str) -> bool:
|
||||
assert fingerprint == "backend-production:ValueError: boom"
|
||||
return False
|
||||
|
||||
async def fake_record(issue_id: int, fingerprint: str, **kwargs) -> None:
|
||||
recorded.append((issue_id, fingerprint, kwargs["delivery_status"]))
|
||||
|
||||
monkeypatch.setattr(alert_processor, "send_alert", fake_send_alert)
|
||||
monkeypatch.setattr(alert_processor, "_is_duplicate", fake_is_duplicate)
|
||||
monkeypatch.setattr(alert_processor, "_record_notification", fake_record)
|
||||
monkeypatch.setattr(alert_processor, "mark_sync_success", _noop)
|
||||
monkeypatch.setattr(
|
||||
alert_processor, "resolve_group", lambda project_slug: _async_value("backend")
|
||||
)
|
||||
monkeypatch.setattr(alert_processor, "find_matching_rule", lambda *args: _async_value(None))
|
||||
monkeypatch.setattr(alert_processor, "_is_rate_limited", lambda *args: _async_value(False))
|
||||
|
||||
await alert_processor.process_webhook_payload(make_issue_payload())
|
||||
|
||||
assert sent_calls == [("ValueError: boom", "backend-production", "P1")]
|
||||
assert recorded == [(0, "backend-production:ValueError: boom", "sent")]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_process_webhook_skips_duplicate(monkeypatch):
|
||||
async def fake_send_alert(*args, **kwargs):
|
||||
raise AssertionError("duplicate alert should not be sent")
|
||||
|
||||
monkeypatch.setattr(alert_processor, "send_alert", fake_send_alert)
|
||||
monkeypatch.setattr(alert_processor, "_is_duplicate", lambda fingerprint: _async_value(True))
|
||||
monkeypatch.setattr(alert_processor, "mark_sync_success", _noop)
|
||||
|
||||
await alert_processor.process_webhook_payload(make_issue_payload())
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_process_webhook_skips_non_alert_environment(monkeypatch):
|
||||
called = False
|
||||
|
||||
async def fake_send_alert(*args, **kwargs):
|
||||
nonlocal called
|
||||
called = True
|
||||
|
||||
monkeypatch.setattr(alert_processor, "send_alert", fake_send_alert)
|
||||
monkeypatch.setattr(alert_processor, "mark_sync_success", _noop)
|
||||
|
||||
await alert_processor.process_webhook_payload(make_issue_payload(project="backend-staging"))
|
||||
|
||||
assert called is False
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_process_webhook_handles_uptime(monkeypatch):
|
||||
calls = []
|
||||
|
||||
async def fake_send_alert(attachment, project_slug, priority, is_uptime=False, **kwargs):
|
||||
calls.append((attachment.title, project_slug, priority, is_uptime))
|
||||
return "backend"
|
||||
|
||||
monkeypatch.setattr(alert_processor, "send_alert", fake_send_alert)
|
||||
monkeypatch.setattr(alert_processor, "_record_notification", _noop)
|
||||
monkeypatch.setattr(alert_processor, "mark_sync_success", _noop)
|
||||
|
||||
await alert_processor.process_webhook_payload(
|
||||
{
|
||||
"text": "GlitchTip Uptime Alert",
|
||||
"attachments": [
|
||||
{
|
||||
"title": "Example Monitor",
|
||||
"text": "The monitored site has gone down.",
|
||||
"title_link": "https://glitchtip.example.com/uptime/1",
|
||||
}
|
||||
],
|
||||
}
|
||||
)
|
||||
|
||||
assert calls == [("Example Monitor", None, "P1", True)]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_process_webhook_skips_muted(monkeypatch):
|
||||
recorded = []
|
||||
|
||||
class FakeRule:
|
||||
pattern = "ValueError"
|
||||
|
||||
async def fake_record(issue_id: int, fingerprint: str, **kwargs) -> None:
|
||||
recorded.append(kwargs["delivery_status"])
|
||||
|
||||
monkeypatch.setattr(alert_processor, "_is_duplicate", lambda fingerprint: _async_value(False))
|
||||
monkeypatch.setattr(
|
||||
alert_processor, "find_matching_rule", lambda *args: _async_value(FakeRule())
|
||||
)
|
||||
monkeypatch.setattr(alert_processor, "_record_notification", fake_record)
|
||||
monkeypatch.setattr(
|
||||
alert_processor, "resolve_group", lambda project_slug: _async_value("backend")
|
||||
)
|
||||
monkeypatch.setattr(alert_processor, "mark_sync_success", _noop)
|
||||
|
||||
await alert_processor.process_webhook_payload(make_issue_payload())
|
||||
|
||||
assert recorded == ["muted"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_process_webhook_skips_rate_limited(monkeypatch):
|
||||
recorded = []
|
||||
|
||||
async def fake_record(issue_id: int, fingerprint: str, **kwargs) -> None:
|
||||
recorded.append(kwargs["delivery_status"])
|
||||
|
||||
monkeypatch.setattr(alert_processor, "_is_duplicate", lambda fingerprint: _async_value(False))
|
||||
monkeypatch.setattr(alert_processor, "find_matching_rule", lambda *args: _async_value(None))
|
||||
monkeypatch.setattr(alert_processor, "_record_notification", fake_record)
|
||||
monkeypatch.setattr(
|
||||
alert_processor, "resolve_group", lambda project_slug: _async_value("backend")
|
||||
)
|
||||
monkeypatch.setattr(alert_processor, "_is_rate_limited", lambda *args: _async_value(True))
|
||||
monkeypatch.setattr(alert_processor, "mark_sync_success", _noop)
|
||||
|
||||
await alert_processor.process_webhook_payload(
|
||||
make_issue_payload(project="backend-production", color="#e9b949")
|
||||
)
|
||||
|
||||
assert recorded == ["rate_limited"]
|
||||
|
||||
|
||||
async def _async_value(value):
|
||||
return value
|
||||
Reference in New Issue
Block a user