Release 1.7.0: Grafana catalog, ingress/IRM, tests
This commit is contained in:
@ -14,6 +14,7 @@ for key in (
|
||||
"FORGEJO_URL",
|
||||
"FORGEJO_TOKEN",
|
||||
"GRAFANA_WEBHOOK_SECRET",
|
||||
"GRAFANA_SOURCES_JSON",
|
||||
):
|
||||
os.environ.pop(key, None)
|
||||
os.environ["DATABASE_URL"] = ""
|
||||
|
||||
286
tests/irm_db_fake.py
Normal file
286
tests/irm_db_fake.py
Normal file
@ -0,0 +1,286 @@
|
||||
"""In-memory «пул» для тестов IRM без реального PostgreSQL."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any
|
||||
from uuid import UUID, uuid4
|
||||
|
||||
|
||||
def _now() -> datetime:
|
||||
return datetime.now(timezone.utc)
|
||||
|
||||
|
||||
@dataclass
|
||||
class Row:
|
||||
"""Минимальная обёртка под asyncpg.Record (доступ по ключу)."""
|
||||
|
||||
_data: dict[str, Any]
|
||||
|
||||
def __getitem__(self, key: str) -> Any:
|
||||
return self._data[key]
|
||||
|
||||
def get(self, key: str, default: Any = None) -> Any:
|
||||
return self._data.get(key, default)
|
||||
|
||||
|
||||
class IrmFakeConn:
|
||||
def __init__(self, store: IrmFakeStore) -> None:
|
||||
self.store = store
|
||||
|
||||
def _q(self, query: str) -> str:
|
||||
return " ".join(query.split())
|
||||
|
||||
async def execute(self, query: str, *args: Any) -> str:
|
||||
q = self._q(query)
|
||||
if "INSERT INTO incidents" in q and "ingress_event_id" in q:
|
||||
self.store.insert_incident_alert(
|
||||
args[0], args[1], args[2], args[3], args[4]
|
||||
)
|
||||
return "INSERT 0 1"
|
||||
raise AssertionError(f"execute not implemented: {q[:80]}")
|
||||
|
||||
async def fetchval(self, query: str, *args: Any) -> Any:
|
||||
q = self._q(query)
|
||||
if "count(*)" in q and "FROM incidents" in q and "escalation" not in q:
|
||||
return len(self.store.incidents)
|
||||
if "count(*)" in q and "FROM tasks" in q:
|
||||
return len(self.store.tasks)
|
||||
if "count(*)" in q and "escalation_policies" in q:
|
||||
return sum(1 for p in self.store.policies.values() if p["enabled"])
|
||||
if "SELECT 1 FROM incidents WHERE id" in q:
|
||||
uid = args[0]
|
||||
return 1 if uid in self.store.incidents else None
|
||||
raise AssertionError(f"fetchval not implemented: {q[:100]}")
|
||||
|
||||
async def fetch(self, query: str, *args: Any) -> list[Row]:
|
||||
q = self._q(query)
|
||||
if "FROM incidents" in q and "ORDER BY created_at DESC" in q:
|
||||
rows = sorted(self.store.incidents.values(), key=lambda x: x["created_at"], reverse=True)
|
||||
if "grafana_org_slug =" in q and "service_name =" in q:
|
||||
rows = [
|
||||
r
|
||||
for r in rows
|
||||
if r.get("grafana_org_slug") == args[0]
|
||||
and r.get("service_name") == args[1]
|
||||
]
|
||||
lim = args[2]
|
||||
elif "grafana_org_slug =" in q:
|
||||
rows = [r for r in rows if r.get("grafana_org_slug") == args[0]]
|
||||
lim = args[1]
|
||||
elif "service_name =" in q:
|
||||
rows = [r for r in rows if r.get("service_name") == args[0]]
|
||||
lim = args[1]
|
||||
else:
|
||||
lim = args[0]
|
||||
return [Row(dict(r)) for r in rows[:lim]]
|
||||
if "FROM tasks" in q and "WHERE incident_id" in q and "ORDER BY" in q:
|
||||
iid, lim = args[0], args[1]
|
||||
match = [t for t in self.store.tasks.values() if t["incident_id"] == iid]
|
||||
match.sort(key=lambda x: x["created_at"], reverse=True)
|
||||
return [Row(dict(t)) for t in match[:lim]]
|
||||
if "FROM tasks" in q and "WHERE incident_id" in q:
|
||||
iid, lim = args[0], args[1]
|
||||
match = [t for t in self.store.tasks.values() if t["incident_id"] == iid]
|
||||
match.sort(key=lambda x: x["created_at"], reverse=True)
|
||||
return [Row(dict(t)) for t in match[:lim]]
|
||||
if "FROM tasks t" in q or ("FROM tasks" in q and "ORDER BY t.created_at" in q):
|
||||
lim = args[0]
|
||||
rows = sorted(self.store.tasks.values(), key=lambda x: x["created_at"], reverse=True)[:lim]
|
||||
return [Row(dict(r)) for r in rows]
|
||||
if "FROM tasks" in q and "ORDER BY created_at DESC" in q and "WHERE" not in q:
|
||||
lim = args[0]
|
||||
rows = sorted(self.store.tasks.values(), key=lambda x: x["created_at"], reverse=True)[:lim]
|
||||
return [Row(dict(r)) for r in rows]
|
||||
if "FROM escalation_policies" in q and "ORDER BY name" in q:
|
||||
rows = sorted(self.store.policies.values(), key=lambda x: x["name"])
|
||||
return [Row(dict(r)) for r in rows]
|
||||
raise AssertionError(f"fetch not implemented: {q[:120]}")
|
||||
|
||||
async def fetchrow(self, query: str, *args: Any) -> Row | None:
|
||||
q = self._q(query)
|
||||
if "INSERT INTO incidents" in q and "VALUES ($1, $2, $3, 'manual'" in q:
|
||||
return Row(self.store.insert_incident_manual(args[0], args[1], args[2]))
|
||||
if "FROM incidents WHERE id" in q and "UPDATE" not in q and "/tasks" not in query.lower():
|
||||
return self.store.get_incident(args[0])
|
||||
if "UPDATE incidents SET" in q:
|
||||
return self.store.update_incident(args[0], args[1], args[2], args[3])
|
||||
if "INSERT INTO tasks" in q:
|
||||
return Row(self.store.insert_task(args[0], args[1]))
|
||||
if "FROM tasks WHERE id" in q and "UPDATE" not in q:
|
||||
tid = args[0]
|
||||
t = self.store.tasks.get(tid)
|
||||
return Row(dict(t)) if t else None
|
||||
if "UPDATE tasks SET" in q:
|
||||
return self.store.update_task(args[0], args[1], args[2])
|
||||
if "INSERT INTO escalation_policies" in q:
|
||||
return Row(self.store.insert_policy(args[0], args[1], args[2]))
|
||||
if "FROM escalation_policies WHERE id" in q and "UPDATE" not in q and "DELETE" not in q:
|
||||
pid = args[0]
|
||||
p = self.store.policies.get(pid)
|
||||
return Row(dict(p)) if p else None
|
||||
if "UPDATE escalation_policies SET" in q:
|
||||
return self.store.update_policy(args[0], args[1], args[2], args[3])
|
||||
if "DELETE FROM escalation_policies" in q:
|
||||
return self.store.delete_policy(args[0])
|
||||
raise AssertionError(f"fetchrow not implemented: {q[:120]}")
|
||||
|
||||
|
||||
@dataclass
|
||||
class IrmFakeStore:
|
||||
incidents: dict[UUID, dict[str, Any]] = field(default_factory=dict)
|
||||
tasks: dict[UUID, dict[str, Any]] = field(default_factory=dict)
|
||||
policies: dict[UUID, dict[str, Any]] = field(default_factory=dict)
|
||||
|
||||
def insert_incident_alert(
|
||||
self,
|
||||
title: str,
|
||||
sev: str,
|
||||
ingress_id: UUID,
|
||||
grafana_org_slug: Any,
|
||||
service_name: Any,
|
||||
) -> None:
|
||||
iid = uuid4()
|
||||
now = _now()
|
||||
self.incidents[iid] = {
|
||||
"id": iid,
|
||||
"title": title,
|
||||
"status": "open",
|
||||
"severity": sev,
|
||||
"source": "grafana",
|
||||
"ingress_event_id": ingress_id,
|
||||
"created_at": now,
|
||||
"updated_at": now,
|
||||
"grafana_org_slug": grafana_org_slug,
|
||||
"service_name": service_name,
|
||||
}
|
||||
|
||||
def insert_incident_manual(self, title: str, status: str, severity: str) -> dict[str, Any]:
|
||||
iid = uuid4()
|
||||
now = _now()
|
||||
row = {
|
||||
"id": iid,
|
||||
"title": title,
|
||||
"status": status,
|
||||
"severity": severity,
|
||||
"source": "manual",
|
||||
"ingress_event_id": None,
|
||||
"created_at": now,
|
||||
"updated_at": now,
|
||||
"grafana_org_slug": None,
|
||||
"service_name": None,
|
||||
}
|
||||
self.incidents[iid] = row
|
||||
return row
|
||||
|
||||
def get_incident(self, iid: UUID) -> Row | None:
|
||||
r = self.incidents.get(iid)
|
||||
return Row(dict(r)) if r else None
|
||||
|
||||
def update_incident(
|
||||
self,
|
||||
iid: UUID,
|
||||
title: str | None,
|
||||
status: str | None,
|
||||
severity: str | None,
|
||||
) -> Row | None:
|
||||
r = self.incidents.get(iid)
|
||||
if not r:
|
||||
return None
|
||||
if title is not None:
|
||||
r["title"] = title
|
||||
if status is not None:
|
||||
r["status"] = status
|
||||
if severity is not None:
|
||||
r["severity"] = severity
|
||||
r["updated_at"] = _now()
|
||||
return Row(dict(r))
|
||||
|
||||
def insert_task(self, title: str, incident_id: UUID | None) -> dict[str, Any]:
|
||||
tid = uuid4()
|
||||
now = _now()
|
||||
row = {
|
||||
"id": tid,
|
||||
"incident_id": incident_id,
|
||||
"title": title,
|
||||
"status": "open",
|
||||
"created_at": now,
|
||||
}
|
||||
self.tasks[tid] = row
|
||||
return row
|
||||
|
||||
def update_task(self, tid: UUID, title: str | None, status: str | None) -> Row | None:
|
||||
r = self.tasks.get(tid)
|
||||
if not r:
|
||||
return None
|
||||
if title is not None:
|
||||
r["title"] = title
|
||||
if status is not None:
|
||||
r["status"] = status
|
||||
return Row(dict(r))
|
||||
|
||||
def insert_policy(self, name: str, enabled: bool, steps_json: str) -> dict[str, Any]:
|
||||
import json
|
||||
|
||||
pid = uuid4()
|
||||
now = _now()
|
||||
steps = json.loads(steps_json)
|
||||
row = {
|
||||
"id": pid,
|
||||
"name": name,
|
||||
"enabled": enabled,
|
||||
"steps": steps,
|
||||
"created_at": now,
|
||||
}
|
||||
self.policies[pid] = row
|
||||
return row
|
||||
|
||||
def update_policy(
|
||||
self,
|
||||
pid: UUID,
|
||||
name: str | None,
|
||||
enabled: bool | None,
|
||||
steps_json: str | None,
|
||||
) -> Row | None:
|
||||
import json
|
||||
|
||||
r = self.policies.get(pid)
|
||||
if not r:
|
||||
return None
|
||||
if name is not None:
|
||||
r["name"] = name
|
||||
if enabled is not None:
|
||||
r["enabled"] = enabled
|
||||
if steps_json is not None:
|
||||
r["steps"] = json.loads(steps_json)
|
||||
return Row(dict(r))
|
||||
|
||||
def delete_policy(self, pid: UUID) -> Row | None:
|
||||
if pid not in self.policies:
|
||||
return None
|
||||
self.policies.pop(pid)
|
||||
return Row({"id": pid})
|
||||
|
||||
|
||||
class IrmFakeAcquire:
|
||||
def __init__(self, store: IrmFakeStore) -> None:
|
||||
self.store = store
|
||||
|
||||
async def __aenter__(self) -> IrmFakeConn:
|
||||
return IrmFakeConn(self.store)
|
||||
|
||||
async def __aexit__(self, *args: Any) -> None:
|
||||
pass
|
||||
|
||||
|
||||
class IrmFakePool:
|
||||
def __init__(self, store: IrmFakeStore | None = None) -> None:
|
||||
self._store = store or IrmFakeStore()
|
||||
|
||||
def acquire(self) -> IrmFakeAcquire:
|
||||
return IrmFakeAcquire(self._store)
|
||||
|
||||
@property
|
||||
def store(self) -> IrmFakeStore:
|
||||
return self._store
|
||||
19
tests/test_grafana_catalog_api.py
Normal file
19
tests/test_grafana_catalog_api.py
Normal file
@ -0,0 +1,19 @@
|
||||
"""HTTP-обёртки каталога Grafana без реальной БД."""
|
||||
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
|
||||
def test_grafana_catalog_sync_requires_db(client: TestClient) -> None:
|
||||
r = client.post("/api/v1/modules/grafana-catalog/sync", json={})
|
||||
assert r.status_code == 503
|
||||
|
||||
|
||||
def test_grafana_catalog_meta_no_db(client: TestClient) -> None:
|
||||
r = client.get("/api/v1/modules/grafana-catalog/meta")
|
||||
assert r.status_code == 200
|
||||
assert r.json().get("database") == "disabled"
|
||||
|
||||
|
||||
def test_grafana_catalog_tree_requires_db(client: TestClient) -> None:
|
||||
r = client.get("/api/v1/modules/grafana-catalog/tree?instance_slug=default")
|
||||
assert r.status_code == 503
|
||||
93
tests/test_grafana_topology.py
Normal file
93
tests/test_grafana_topology.py
Normal file
@ -0,0 +1,93 @@
|
||||
"""Парсинг Ruler / слияние папок и HTTP-mock синхронизации Grafana."""
|
||||
|
||||
import httpx
|
||||
import pytest
|
||||
import respx
|
||||
|
||||
from onguard24.integrations.grafana_topology import (
|
||||
merge_folder_rows,
|
||||
parse_ruler_rules,
|
||||
)
|
||||
from onguard24.modules.grafana_catalog import pull_topology
|
||||
|
||||
|
||||
def test_parse_ruler_grafana_managed() -> None:
|
||||
data = {
|
||||
"nginx": [
|
||||
{
|
||||
"name": "prometheus (Nginx)",
|
||||
"interval": "60s",
|
||||
"rules": [
|
||||
{
|
||||
"grafana_alert": {"uid": "uid1", "title": "Nginx Down"},
|
||||
"labels": {"service": "nginx", "severity": "critical"},
|
||||
}
|
||||
],
|
||||
}
|
||||
]
|
||||
}
|
||||
rows = parse_ruler_rules(data)
|
||||
assert len(rows) == 1
|
||||
assert rows[0].namespace_uid == "nginx"
|
||||
assert rows[0].rule_uid == "uid1"
|
||||
assert rows[0].rule_group_name == "prometheus (Nginx)"
|
||||
assert rows[0].labels["service"] == "nginx"
|
||||
|
||||
|
||||
def test_parse_ruler_skips_non_grafana_alert() -> None:
|
||||
data = {"x": [{"name": "g", "rules": [{"expr": "1"}]}]}
|
||||
assert parse_ruler_rules(data) == []
|
||||
|
||||
|
||||
def test_merge_folder_rows_adds_namespaces() -> None:
|
||||
api = [{"uid": "system", "title": "System", "parentUid": None}]
|
||||
merged = merge_folder_rows(api, {"nginx", "system"})
|
||||
uids = {m[0] for m in merged}
|
||||
assert uids == {"system", "nginx"}
|
||||
titles = {m[0]: m[1] for m in merged}
|
||||
assert titles["nginx"] == "nginx"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@respx.mock
|
||||
async def test_pull_topology_end_to_end() -> None:
|
||||
base = "https://grafana.example.com"
|
||||
respx.get(f"{base}/api/org").mock(
|
||||
return_value=httpx.Response(200, json={"id": 3, "name": "adibrov"})
|
||||
)
|
||||
def _folders(request: httpx.Request) -> httpx.Response:
|
||||
if "parentUid" in str(request.url):
|
||||
return httpx.Response(200, json=[])
|
||||
return httpx.Response(
|
||||
200,
|
||||
json=[{"uid": "nginx", "title": "Nginx Alerts", "parentUid": None}],
|
||||
)
|
||||
|
||||
respx.get(f"{base}/api/folders").mock(side_effect=_folders)
|
||||
ruler_body = {
|
||||
"nginx": [
|
||||
{
|
||||
"name": "grp",
|
||||
"interval": "1m",
|
||||
"rules": [
|
||||
{
|
||||
"grafana_alert": {"uid": "r1", "title": "Down"},
|
||||
"labels": {"service": "nginx"},
|
||||
}
|
||||
],
|
||||
}
|
||||
]
|
||||
}
|
||||
respx.get(f"{base}/api/ruler/grafana/api/v1/rules").mock(
|
||||
return_value=httpx.Response(200, json=ruler_body)
|
||||
)
|
||||
|
||||
out, err = await pull_topology(base, "test-token")
|
||||
assert err is None
|
||||
assert out is not None
|
||||
assert out.org_id == 3
|
||||
assert out.org_name == "adibrov"
|
||||
assert len(out.folder_rows) == 1
|
||||
assert out.folder_rows[0][0] == "nginx"
|
||||
assert len(out.rules) == 1
|
||||
assert out.rules[0].rule_uid == "r1"
|
||||
@ -63,6 +63,35 @@ def test_grafana_webhook_inserts_with_mock_pool(client: TestClient) -> None:
|
||||
app.state.pool = real_pool
|
||||
|
||||
|
||||
def test_grafana_webhook_auto_org_from_external_url(client: TestClient) -> None:
|
||||
from uuid import uuid4
|
||||
|
||||
mock_conn = AsyncMock()
|
||||
uid = uuid4()
|
||||
mock_conn.fetchrow = AsyncMock(return_value={"id": uid})
|
||||
mock_cm = AsyncMock()
|
||||
mock_cm.__aenter__ = AsyncMock(return_value=mock_conn)
|
||||
mock_cm.__aexit__ = AsyncMock(return_value=None)
|
||||
mock_pool = MagicMock()
|
||||
mock_pool.acquire = MagicMock(return_value=mock_cm)
|
||||
|
||||
app = client.app
|
||||
real_pool = app.state.pool
|
||||
app.state.pool = mock_pool
|
||||
try:
|
||||
r = client.post(
|
||||
"/api/v1/ingress/grafana",
|
||||
content=json.dumps(
|
||||
{"externalURL": "https://grafana-adibrov.example.com/", "title": "x"}
|
||||
),
|
||||
headers={"Content-Type": "application/json"},
|
||||
)
|
||||
assert r.status_code == 202
|
||||
assert mock_conn.fetchrow.call_args[0][3] == "grafana-adibrov.example.com"
|
||||
finally:
|
||||
app.state.pool = real_pool
|
||||
|
||||
|
||||
def test_grafana_webhook_publishes_alert_received(client: TestClient) -> None:
|
||||
from unittest.mock import patch
|
||||
from uuid import uuid4
|
||||
@ -92,3 +121,66 @@ def test_grafana_webhook_publishes_alert_received(client: TestClient) -> None:
|
||||
assert spy.await_args.kwargs.get("raw_payload_ref") == uid
|
||||
finally:
|
||||
app.state.pool = real_pool
|
||||
|
||||
|
||||
def test_grafana_webhook_org_any_slug_without_json_config(client: TestClient) -> None:
|
||||
"""Путь /{slug} не требует GRAFANA_SOURCES_JSON — slug просто сохраняется."""
|
||||
from uuid import uuid4
|
||||
|
||||
mock_conn = AsyncMock()
|
||||
uid = uuid4()
|
||||
mock_conn.fetchrow = AsyncMock(return_value={"id": uid})
|
||||
mock_cm = AsyncMock()
|
||||
mock_cm.__aenter__ = AsyncMock(return_value=mock_conn)
|
||||
mock_cm.__aexit__ = AsyncMock(return_value=None)
|
||||
mock_pool = MagicMock()
|
||||
mock_pool.acquire = MagicMock(return_value=mock_cm)
|
||||
|
||||
app = client.app
|
||||
real_pool = app.state.pool
|
||||
app.state.pool = mock_pool
|
||||
try:
|
||||
r = client.post(
|
||||
"/api/v1/ingress/grafana/other",
|
||||
content=b"{}",
|
||||
headers={"Content-Type": "application/json"},
|
||||
)
|
||||
assert r.status_code == 202
|
||||
assert mock_conn.fetchrow.call_args[0][3] == "other"
|
||||
finally:
|
||||
app.state.pool = real_pool
|
||||
|
||||
|
||||
def test_grafana_webhook_org_ok(client: TestClient) -> None:
|
||||
from uuid import uuid4
|
||||
|
||||
mock_conn = AsyncMock()
|
||||
uid = uuid4()
|
||||
mock_conn.fetchrow = AsyncMock(return_value={"id": uid})
|
||||
mock_cm = AsyncMock()
|
||||
mock_cm.__aenter__ = AsyncMock(return_value=mock_conn)
|
||||
mock_cm.__aexit__ = AsyncMock(return_value=None)
|
||||
mock_pool = MagicMock()
|
||||
mock_pool.acquire = MagicMock(return_value=mock_cm)
|
||||
|
||||
app = client.app
|
||||
real_json = app.state.settings.grafana_sources_json
|
||||
real_pool = app.state.pool
|
||||
app.state.settings.grafana_sources_json = (
|
||||
'[{"slug":"adibrov","api_url":"http://192.168.0.1:3000","api_token":"","webhook_secret":""}]'
|
||||
)
|
||||
app.state.pool = mock_pool
|
||||
try:
|
||||
r = client.post(
|
||||
"/api/v1/ingress/grafana/adibrov",
|
||||
content='{"title":"t"}',
|
||||
headers={"Content-Type": "application/json"},
|
||||
)
|
||||
assert r.status_code == 202
|
||||
call = mock_conn.fetchrow.call_args
|
||||
assert "org_slug" in call[0][0].lower() or "org_slug" in str(call)
|
||||
assert call[0][1] == "grafana"
|
||||
assert call[0][3] == "adibrov"
|
||||
finally:
|
||||
app.state.settings.grafana_sources_json = real_json
|
||||
app.state.pool = real_pool
|
||||
|
||||
118
tests/test_irm_api_with_fake_db.py
Normal file
118
tests/test_irm_api_with_fake_db.py
Normal file
@ -0,0 +1,118 @@
|
||||
"""IRM API с подменённым пулом БД (без PostgreSQL)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
from fastapi.testclient import TestClient
|
||||
from starlette.requests import Request
|
||||
|
||||
from onguard24.deps import get_pool
|
||||
from onguard24.main import app
|
||||
|
||||
from tests.irm_db_fake import IrmFakePool
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def irm_client() -> tuple[TestClient, IrmFakePool]:
|
||||
pool = IrmFakePool()
|
||||
|
||||
def override_get_pool(_request: Request):
|
||||
return pool
|
||||
|
||||
app.dependency_overrides[get_pool] = override_get_pool
|
||||
with TestClient(app) as client:
|
||||
yield client, pool
|
||||
app.dependency_overrides.pop(get_pool, None)
|
||||
|
||||
|
||||
def test_irm_incident_crud_and_tasks(irm_client: tuple[TestClient, IrmFakePool]) -> None:
|
||||
client, _pool = irm_client
|
||||
r = client.post(
|
||||
"/api/v1/modules/incidents/",
|
||||
json={"title": "Сбой API", "status": "open", "severity": "critical"},
|
||||
)
|
||||
assert r.status_code == 201
|
||||
iid = r.json()["id"]
|
||||
assert r.json()["source"] == "manual"
|
||||
|
||||
r = client.get(f"/api/v1/modules/incidents/{iid}")
|
||||
assert r.status_code == 200
|
||||
assert r.json()["title"] == "Сбой API"
|
||||
|
||||
r = client.patch(f"/api/v1/modules/incidents/{iid}", json={"status": "resolved"})
|
||||
assert r.status_code == 200
|
||||
assert r.json()["status"] == "resolved"
|
||||
|
||||
r = client.post(
|
||||
"/api/v1/modules/tasks/",
|
||||
json={"title": "Разбор логов", "incident_id": iid},
|
||||
)
|
||||
assert r.status_code == 201
|
||||
tid = r.json()["id"]
|
||||
|
||||
r = client.get(f"/api/v1/modules/incidents/{iid}/tasks")
|
||||
assert r.status_code == 200
|
||||
assert len(r.json()["items"]) == 1
|
||||
assert r.json()["items"][0]["id"] == tid
|
||||
|
||||
r = client.get(f"/api/v1/modules/tasks/{tid}")
|
||||
assert r.status_code == 200
|
||||
assert r.json()["status"] == "open"
|
||||
|
||||
r = client.patch(f"/api/v1/modules/tasks/{tid}", json={"status": "done"})
|
||||
assert r.status_code == 200
|
||||
assert r.json()["status"] == "done"
|
||||
|
||||
|
||||
def test_irm_task_bad_incident(irm_client: tuple[TestClient, IrmFakePool]) -> None:
|
||||
client, _ = irm_client
|
||||
import uuid
|
||||
|
||||
r = client.post(
|
||||
"/api/v1/modules/tasks/",
|
||||
json={"title": "x", "incident_id": str(uuid.uuid4())},
|
||||
)
|
||||
assert r.status_code == 400
|
||||
assert r.json()["detail"] == "incident not found"
|
||||
|
||||
|
||||
def test_irm_incident_tasks_unknown(irm_client: tuple[TestClient, IrmFakePool]) -> None:
|
||||
client, _ = irm_client
|
||||
import uuid
|
||||
|
||||
rid = str(uuid.uuid4())
|
||||
r = client.get(f"/api/v1/modules/incidents/{rid}/tasks")
|
||||
assert r.status_code == 404
|
||||
|
||||
|
||||
def test_irm_patch_validation(irm_client: tuple[TestClient, IrmFakePool]) -> None:
|
||||
client, _ = irm_client
|
||||
r = client.post("/api/v1/modules/incidents/", json={"title": "t"})
|
||||
iid = r.json()["id"]
|
||||
r = client.patch(f"/api/v1/modules/incidents/{iid}", json={})
|
||||
assert r.status_code == 400
|
||||
|
||||
|
||||
def test_irm_escalations_crud(irm_client: tuple[TestClient, IrmFakePool]) -> None:
|
||||
client, _ = irm_client
|
||||
r = client.post(
|
||||
"/api/v1/modules/escalations/",
|
||||
json={"name": "L1", "enabled": True, "steps": [{"after_min": 5, "channel": "slack"}]},
|
||||
)
|
||||
assert r.status_code == 201
|
||||
pid = r.json()["id"]
|
||||
assert r.json()["steps"][0]["channel"] == "slack"
|
||||
|
||||
r = client.get(f"/api/v1/modules/escalations/{pid}")
|
||||
assert r.status_code == 200
|
||||
assert r.json()["name"] == "L1"
|
||||
|
||||
r = client.patch(f"/api/v1/modules/escalations/{pid}", json={"enabled": False})
|
||||
assert r.status_code == 200
|
||||
assert r.json()["enabled"] is False
|
||||
|
||||
r = client.delete(f"/api/v1/modules/escalations/{pid}")
|
||||
assert r.status_code == 204
|
||||
|
||||
r = client.get(f"/api/v1/modules/escalations/{pid}")
|
||||
assert r.status_code == 404
|
||||
@ -62,6 +62,8 @@ async def test_incident_inserted_on_alert_received() -> None:
|
||||
assert inserted["args"][0] == "CPU high"
|
||||
assert inserted["args"][1] == "warning"
|
||||
assert inserted["args"][2] == uid
|
||||
assert inserted["args"][3] is None
|
||||
assert inserted["args"][4] is None
|
||||
|
||||
|
||||
def test_incidents_post_requires_db(client: TestClient) -> None:
|
||||
|
||||
@ -32,6 +32,7 @@ def test_rail_lists_all_registered_ui_modules(client: TestClient) -> None:
|
||||
assert r.status_code == 200
|
||||
t = r.text
|
||||
expected = (
|
||||
("grafana-catalog", "Каталог Grafana"),
|
||||
("incidents", "Инциденты"),
|
||||
("tasks", "Задачи"),
|
||||
("escalations", "Эскалации"),
|
||||
@ -47,6 +48,7 @@ def test_rail_lists_all_registered_ui_modules(client: TestClient) -> None:
|
||||
def test_each_module_page_single_active_nav_item(client: TestClient) -> None:
|
||||
"""На странице модуля ровно один пункт с aria-current (текущий раздел)."""
|
||||
for slug in (
|
||||
"grafana-catalog",
|
||||
"incidents",
|
||||
"tasks",
|
||||
"escalations",
|
||||
|
||||
@ -42,6 +42,7 @@ def test_status_with_mocks(client: TestClient) -> None:
|
||||
vault_token="t",
|
||||
grafana_url="https://grafana.example",
|
||||
grafana_service_account_token="g",
|
||||
grafana_sources_json="",
|
||||
forgejo_url="https://git.example",
|
||||
forgejo_token="f",
|
||||
grafana_webhook_secret="",
|
||||
@ -57,5 +58,6 @@ def test_status_with_mocks(client: TestClient) -> None:
|
||||
d = r.json()
|
||||
assert d["vault"]["status"] == "ok"
|
||||
assert d["grafana"]["status"] == "ok"
|
||||
assert d["grafana"].get("service_account_login") == "tester"
|
||||
assert len(d["grafana"]["instances"]) == 1
|
||||
assert d["grafana"]["instances"][0].get("service_account_login") == "tester"
|
||||
assert d["forgejo"]["status"] == "ok"
|
||||
|
||||
Reference in New Issue
Block a user