- Миграции PostgreSQL через Alembic; DDL убран из lifespan приложения. - Тесты: health, status, ingress Grafana; моки Vault/Grafana/Forgejo. - Пакет onguard24/domain/ (сущности, шина событий), docs/DOMAIN.md. - Обновлены README, CHANGELOG, ARCHITECTURE. Made-with: Cursor
61 lines
1.9 KiB
Python
61 lines
1.9 KiB
Python
import json
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
from fastapi.testclient import TestClient
|
|
|
|
|
|
def test_grafana_webhook_no_db(client: TestClient) -> None:
|
|
"""Без пула БД — 202, запись не падает."""
|
|
r = client.post(
|
|
"/api/v1/ingress/grafana",
|
|
content=json.dumps({"title": "t"}),
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
assert r.status_code == 202
|
|
|
|
|
|
def test_grafana_webhook_unauthorized_when_secret_set(client: TestClient) -> None:
|
|
app = client.app
|
|
real = app.state.settings.grafana_webhook_secret
|
|
app.state.settings.grafana_webhook_secret = "s3cr3t"
|
|
try:
|
|
r = client.post(
|
|
"/api/v1/ingress/grafana",
|
|
content=b"{}",
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
assert r.status_code == 401
|
|
r2 = client.post(
|
|
"/api/v1/ingress/grafana",
|
|
content=b"{}",
|
|
headers={"Content-Type": "application/json", "X-OnGuard-Secret": "s3cr3t"},
|
|
)
|
|
assert r2.status_code == 202
|
|
finally:
|
|
app.state.settings.grafana_webhook_secret = real
|
|
|
|
|
|
def test_grafana_webhook_inserts_with_mock_pool(client: TestClient) -> None:
|
|
mock_conn = AsyncMock()
|
|
mock_conn.execute = AsyncMock()
|
|
mock_cm = AsyncMock()
|
|
mock_cm.__aenter__ = AsyncMock(return_value=mock_conn)
|
|
mock_cm.__aexit__ = AsyncMock(return_value=None)
|
|
|
|
mock_pool = MagicMock()
|
|
mock_pool.acquire = MagicMock(return_value=mock_cm)
|
|
|
|
app = client.app
|
|
real_pool = app.state.pool
|
|
app.state.pool = mock_pool
|
|
try:
|
|
r = client.post(
|
|
"/api/v1/ingress/grafana",
|
|
content=json.dumps({"a": 1}),
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
assert r.status_code == 202
|
|
mock_conn.execute.assert_called_once()
|
|
finally:
|
|
app.state.pool = real_pool
|