v1.1.0: Alembic, pytest, домен и документация
- Миграции PostgreSQL через Alembic; DDL убран из lifespan приложения. - Тесты: health, status, ingress Grafana; моки Vault/Grafana/Forgejo. - Пакет onguard24/domain/ (сущности, шина событий), docs/DOMAIN.md. - Обновлены README, CHANGELOG, ARCHITECTURE. Made-with: Cursor
This commit is contained in:
60
tests/test_ingress.py
Normal file
60
tests/test_ingress.py
Normal file
@ -0,0 +1,60 @@
|
||||
import json
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
|
||||
def test_grafana_webhook_no_db(client: TestClient) -> None:
|
||||
"""Без пула БД — 202, запись не падает."""
|
||||
r = client.post(
|
||||
"/api/v1/ingress/grafana",
|
||||
content=json.dumps({"title": "t"}),
|
||||
headers={"Content-Type": "application/json"},
|
||||
)
|
||||
assert r.status_code == 202
|
||||
|
||||
|
||||
def test_grafana_webhook_unauthorized_when_secret_set(client: TestClient) -> None:
|
||||
app = client.app
|
||||
real = app.state.settings.grafana_webhook_secret
|
||||
app.state.settings.grafana_webhook_secret = "s3cr3t"
|
||||
try:
|
||||
r = client.post(
|
||||
"/api/v1/ingress/grafana",
|
||||
content=b"{}",
|
||||
headers={"Content-Type": "application/json"},
|
||||
)
|
||||
assert r.status_code == 401
|
||||
r2 = client.post(
|
||||
"/api/v1/ingress/grafana",
|
||||
content=b"{}",
|
||||
headers={"Content-Type": "application/json", "X-OnGuard-Secret": "s3cr3t"},
|
||||
)
|
||||
assert r2.status_code == 202
|
||||
finally:
|
||||
app.state.settings.grafana_webhook_secret = real
|
||||
|
||||
|
||||
def test_grafana_webhook_inserts_with_mock_pool(client: TestClient) -> None:
|
||||
mock_conn = AsyncMock()
|
||||
mock_conn.execute = AsyncMock()
|
||||
mock_cm = AsyncMock()
|
||||
mock_cm.__aenter__ = AsyncMock(return_value=mock_conn)
|
||||
mock_cm.__aexit__ = AsyncMock(return_value=None)
|
||||
|
||||
mock_pool = MagicMock()
|
||||
mock_pool.acquire = MagicMock(return_value=mock_cm)
|
||||
|
||||
app = client.app
|
||||
real_pool = app.state.pool
|
||||
app.state.pool = mock_pool
|
||||
try:
|
||||
r = client.post(
|
||||
"/api/v1/ingress/grafana",
|
||||
content=json.dumps({"a": 1}),
|
||||
headers={"Content-Type": "application/json"},
|
||||
)
|
||||
assert r.status_code == 202
|
||||
mock_conn.execute.assert_called_once()
|
||||
finally:
|
||||
app.state.pool = real_pool
|
||||
Reference in New Issue
Block a user