diff --git a/.env.example b/.env.example index b8e4230..0d2888f 100644 --- a/.env.example +++ b/.env.example @@ -3,10 +3,15 @@ HTTP_ADDR=0.0.0.0:8080 LOG_LEVEL=info -# Опционально: если задан — POST /api/v1/ingress/grafana требует заголовок X-OnGuard-Secret +# Опционально: общий секрет для вебхуков (если у источника в JSON не задан свой webhook_secret) # GRAFANA_WEBHOOK_SECRET= -# --- Grafana HTTP API (service account, не пароль admin) --- +# Несколько Grafana: JSON-массив. slug — часть URL вебхука: /api/v1/ingress/grafana/{slug} +# Пример: [{"slug":"adibrov","api_url":"https://grafana-adibrov.example","api_token":"glsa_...","webhook_secret":"длинный-секрет"}] +# Если пусто, но задан GRAFANA_URL — один источник со slug "default" (вебхук /api/v1/ingress/grafana/default) +# GRAFANA_SOURCES_JSON= + +# --- Grafana HTTP API (один инстанс, если без GRAFANA_SOURCES_JSON) --- # URL без завершающего слэша. Токен: Grafana → Administration → Service accounts → onguard24 → Add service account token GRAFANA_URL=https://grafana.pvenode.ru # GRAFANA_SERVICE_ACCOUNT_TOKEN= diff --git a/CHANGELOG.md b/CHANGELOG.md index 9566823..3b56a69 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,18 @@ Формат: семантическое версионирование `MAJOR.MINOR.PATCH`. Git-теги `v1.0.0`, `v1.1.0` и т.д. — см. [docs/VERSIONING.md](docs/VERSIONING.md). +## [1.7.0] — 2026-04-03 + +Каталог Grafana (топология правил), доработки ingress/IRM, тесты. + +### Добавлено + +- **Alembic `003_ingress_org_service`**, **`004_grafana_catalog`** — метаданные и снимок папок/правил Grafana. +- **Модуль `grafana-catalog`:** `POST …/sync`, `GET …/meta`, `GET …/tree`, UI. +- **`onguard24/integrations/grafana_topology.py`**, **`grafana_sources.py`**. +- **Документация:** [docs/GRAFANA_TOPOLOGY.md](docs/GRAFANA_TOPOLOGY.md). +- **Тесты:** `test_grafana_topology.py`, `test_grafana_catalog_api.py`, `irm_db_fake.py`, `test_irm_api_with_fake_db.py`. + ## [1.6.0] — 2026-04-03 Docker-образ, `docker-compose.yml`, CI/CD Forgejo/Gitea Actions. diff --git a/alembic/versions/003_ingress_org_service.py b/alembic/versions/003_ingress_org_service.py new file mode 100644 index 0000000..581a412 --- /dev/null +++ b/alembic/versions/003_ingress_org_service.py @@ -0,0 +1,48 @@ +"""ingress org/service + incident dimensions for multi-grafana + +Revision ID: 003_ingress_org +Revises: 002_irm_core +Create Date: 2026-04-03 + +""" + +from typing import Sequence, Union + +from alembic import op + +revision: str = "003_ingress_org" +down_revision: Union[str, None] = "002_irm_core" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.execute( + """ + ALTER TABLE ingress_events + ADD COLUMN IF NOT EXISTS org_slug text, + ADD COLUMN IF NOT EXISTS service_name text; + """ + ) + op.execute( + """ + ALTER TABLE incidents + ADD COLUMN IF NOT EXISTS grafana_org_slug text, + ADD COLUMN IF NOT EXISTS service_name text; + """ + ) + op.execute( + """ + CREATE INDEX IF NOT EXISTS incidents_org_service_idx + ON incidents (grafana_org_slug, service_name) + WHERE grafana_org_slug IS NOT NULL OR service_name IS NOT NULL; + """ + ) + + +def downgrade() -> None: + op.execute("DROP INDEX IF EXISTS incidents_org_service_idx;") + op.execute("ALTER TABLE incidents DROP COLUMN IF EXISTS service_name;") + op.execute("ALTER TABLE incidents DROP COLUMN IF EXISTS grafana_org_slug;") + op.execute("ALTER TABLE ingress_events DROP COLUMN IF EXISTS service_name;") + op.execute("ALTER TABLE ingress_events DROP COLUMN IF EXISTS org_slug;") diff --git a/alembic/versions/004_grafana_catalog.py b/alembic/versions/004_grafana_catalog.py new file mode 100644 index 0000000..14e34e0 --- /dev/null +++ b/alembic/versions/004_grafana_catalog.py @@ -0,0 +1,88 @@ +"""Grafana topology cache: org, folders, alert rules per instance slug + +Revision ID: 004_grafana_catalog +Revises: 003_ingress_org +Create Date: 2026-04-03 + +""" + +from typing import Sequence, Union + +from alembic import op + +revision: str = "004_grafana_catalog" +down_revision: Union[str, None] = "003_ingress_org" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.execute( + """ + CREATE TABLE IF NOT EXISTS grafana_catalog_meta ( + instance_slug text NOT NULL, + grafana_org_id int NOT NULL, + org_name text NOT NULL DEFAULT '', + synced_at timestamptz NOT NULL DEFAULT now(), + folder_count int NOT NULL DEFAULT 0, + rule_count int NOT NULL DEFAULT 0, + error_text text, + PRIMARY KEY (instance_slug, grafana_org_id) + ); + """ + ) + op.execute( + """ + CREATE TABLE IF NOT EXISTS grafana_catalog_folders ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + instance_slug text NOT NULL, + grafana_org_id int NOT NULL, + folder_uid text NOT NULL, + title text NOT NULL DEFAULT '', + parent_uid text, + updated_at timestamptz NOT NULL DEFAULT now(), + UNIQUE (instance_slug, grafana_org_id, folder_uid) + ); + """ + ) + op.execute( + """ + CREATE INDEX IF NOT EXISTS grafana_cat_folders_inst_org_idx + ON grafana_catalog_folders (instance_slug, grafana_org_id); + """ + ) + op.execute( + """ + CREATE TABLE IF NOT EXISTS grafana_catalog_rules ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + instance_slug text NOT NULL, + grafana_org_id int NOT NULL, + namespace_uid text NOT NULL, + rule_group_name text NOT NULL, + rule_uid text NOT NULL, + title text NOT NULL DEFAULT '', + rule_group_interval text, + labels jsonb NOT NULL DEFAULT '{}'::jsonb, + updated_at timestamptz NOT NULL DEFAULT now(), + UNIQUE (instance_slug, grafana_org_id, rule_uid) + ); + """ + ) + op.execute( + """ + CREATE INDEX IF NOT EXISTS grafana_cat_rules_inst_org_idx + ON grafana_catalog_rules (instance_slug, grafana_org_id); + """ + ) + op.execute( + """ + CREATE INDEX IF NOT EXISTS grafana_cat_rules_ns_idx + ON grafana_catalog_rules (instance_slug, grafana_org_id, namespace_uid); + """ + ) + + +def downgrade() -> None: + op.execute("DROP TABLE IF EXISTS grafana_catalog_rules;") + op.execute("DROP TABLE IF EXISTS grafana_catalog_folders;") + op.execute("DROP TABLE IF EXISTS grafana_catalog_meta;") diff --git a/docs/CICD.md b/docs/CICD.md index 2b9f33c..1eb53e5 100644 --- a/docs/CICD.md +++ b/docs/CICD.md @@ -18,6 +18,24 @@ 2. Зарегистрировать **runner** с меткой `ubuntu-latest` (или изменить `runs-on` в YAML на вашу метку, например `self-hosted`). 3. Если образы `actions/checkout` недоступны, в настройках Actions задайте зеркало GitHub или используйте встроенные экшены Forgejo (см. документацию вашей версии). +### Образ act_runner (Docker Hub) + +Используйте **`gitea/act_runner`** (например тег **`nightly`**), а не `docker.gitea.com/...:latest` — последний часто недоступен. + +```bash +sudo docker pull gitea/act_runner:nightly +sudo docker run -d --restart always --name act_runner \ + -e GITEA_INSTANCE_URL="https://forgejo.pvenode.ru" \ + -e GITEA_RUNNER_REGISTRATION_TOKEN="ТОКЕН_ИЗ_UI" \ + -e GITEA_RUNNER_NAME="pvestandt9" \ + -e GITEA_RUNNER_LABELS="ubuntu-latest:docker://catthehacker/ubuntu:act-22.04" \ + -v /var/run/docker.sock:/var/run/docker.sock \ + -v act_runner_data:/data \ + gitea/act_runner:nightly +``` + +Стабильные теги: [hub.docker.com/r/gitea/act_runner/tags](https://hub.docker.com/r/gitea/act_runner/tags). + ## Секреты репозитория **Настройки репозитория → Actions → Secrets:** diff --git a/docs/GRAFANA_TOPOLOGY.md b/docs/GRAFANA_TOPOLOGY.md new file mode 100644 index 0000000..acf6580 --- /dev/null +++ b/docs/GRAFANA_TOPOLOGY.md @@ -0,0 +1,54 @@ +# Каталог Grafana в onGuard24 + +Иерархия **инстанс → организация → папки → правила алертинга** подтягивается **по HTTP API** Grafana и сохраняется в PostgreSQL. Вебхук при этом не заменяет синхронизацию: вебхук даёт события, каталог — актуальную структуру правил. + +## Что нужно + +1. Миграция **`004_grafana_catalog`** (выполняется при старте контейнера через `alembic upgrade head`). +2. В `.env` у источника Grafana заданы **`api_url`** и **`api_token`** (service account с правами читать папки и alert rules — обычно роль Viewer/Editor и доступ к Alerting). +3. Источник попадает в **`iter_grafana_sources`**: `GRAFANA_SOURCES_JSON` или пара `GRAFANA_URL` + `GRAFANA_SERVICE_ACCOUNT_TOKEN` (slug `default`). + +## API + +| Метод | Путь | Назначение | +|--------|------|------------| +| POST | `/api/v1/modules/grafana-catalog/sync` | Синхронизация. Тело: `{}` — все источники с токеном; `{"instance_slug":"default"}` — один slug. | +| GET | `/api/v1/modules/grafana-catalog/meta` | Последние метаданные синхронизации по всем slug. | +| GET | `/api/v1/modules/grafana-catalog/tree?instance_slug=default` | Дерево: папки и правила, сгруппированные по `namespace_uid` (обычно UID папки правил). | + +## Проверка на https://onguard24.pvenode.ru/ + +1. Убедитесь, что задеплоена версия с модулем **Каталог Grafana** (пункт в боковом меню). +2. Выполните синхронизацию (с сервера или с машины с доступом к API): + +```bash +curl -sS -X POST "https://onguard24.pvenode.ru/api/v1/modules/grafana-catalog/sync" \ + -H "Content-Type: application/json" \ + -d '{}' +``` + +Ответ: `results[]` с полями `ok`, `folders`, `rules`, при ошибке — `error`. + +3. Посмотреть дерево: + +```bash +curl -sS "https://onguard24.pvenode.ru/api/v1/modules/grafana-catalog/tree?instance_slug=default" | python3 -m json.tool +``` + +(Если используете только `GRAFANA_URL`, slug источника — **`default`**.) + +4. В UI: **Каталог Grafana** — таблица последних синхронизаций. + +## Связь с вебхуком + +- В инцидентах по-прежнему используется **`externalURL` / лейблы** из тела вебхука. +- Каталог позволяет в UI/API сопоставлять **`rule_uid`** и папку с сохранённым снимком (после доработок привязки). + +## Ограничения + +- Один токен = **одна Grafana-организация** (контекст `GET /api/org`). Обход нескольких org на одном инстансе — отдельные токены или админ-API (не в этой версии). +- Эндпоинты Ruler: пробуются `/api/ruler/grafana/api/v1/rules` и `/api/ruler/Grafana/api/v1/rules` (зависит от версии Grafana). + +## Автоматический cron + +Периодический `POST .../sync` пока не встроен: можно повесить **системный cron**, **Forgejo scheduled workflow** или внешний оркестратор. diff --git a/docs/IRM.md b/docs/IRM.md index 2db0d13..3239175 100644 --- a/docs/IRM.md +++ b/docs/IRM.md @@ -25,8 +25,15 @@ ## Что настроить в Grafana (обязательно для приёма алертов) -1. **Alerting → Contact points → New** — тип **Webhook**, URL: `https://<ваш-хост>/api/v1/ingress/grafana`, метод POST, Optional HTTP headers если задан `GRAFANA_WEBHOOK_SECRET`: `X-OnGuard-Secret: <секрет>`. -2. **Notification policies** — направить нужные правила на этот contact point (или default policy). -3. Убедиться, что сеть до onGuard24 доступна (firewall, TLS). +### Один URL для всех инстансов и организаций + +1. **Contact point → Webhook**, URL: **`https://<ваш-хост>/api/v1/ingress/grafana`** (POST). **Не нужно** заводить slug в `.env`: источник в БД определяется из JSON Grafana — **`externalURL`** (хост Grafana), при наличии **`orgId`** / **`org_id`**, иначе лейблы первого алерта (`__org_id__`, `grafana_org`, `tenant`, `cluster`, `namespace`). +2. В **`grafana.ini`** / настройках сервера корректно задайте **`root_url` / `external URL`**, чтобы в вебхук попадал нужный хост (за NPM — публичный URL). +3. Опционально **`X-OnGuard-Secret`** (если задан **`GRAFANA_WEBHOOK_SECRET`**) и **`X-OnGuard-Service`** (имя сервиса в инциденте). +4. **Notification policies** — привязать правила к contact point. + +### Дополнительно: путь `/ingress/grafana/` + +Явный ярлык в URL **не требует** записи в `GRAFANA_SOURCES_JSON`. **`GRAFANA_SOURCES_JSON`** нужен в основном для **`/api/v1/status`** (проверка API каждого инстанса) и для **отдельного `webhook_secret` на slug**. Подробнее: [MODULES.md](MODULES.md), [DOMAIN.md](DOMAIN.md). diff --git a/onguard24/__init__.py b/onguard24/__init__.py index 7f42710..8424d89 100644 --- a/onguard24/__init__.py +++ b/onguard24/__init__.py @@ -1,3 +1,3 @@ """onGuard24 — модульный монолит (ядро + модули).""" -__version__ = "1.6.0" +__version__ = "1.7.0" diff --git a/onguard24/config.py b/onguard24/config.py index f1a2832..7c37853 100644 --- a/onguard24/config.py +++ b/onguard24/config.py @@ -19,6 +19,9 @@ class Settings(BaseSettings): http_addr: str = Field(default="0.0.0.0:8080", validation_alias="HTTP_ADDR") database_url: str = Field(default="", validation_alias="DATABASE_URL") grafana_webhook_secret: str = Field(default="", validation_alias="GRAFANA_WEBHOOK_SECRET") + # JSON-массив: [{"slug":"adibrov","api_url":"https://...","api_token":"glsa_...","webhook_secret":"..."}] + # Пусто + задан GRAFANA_URL → один источник slug "default" + grafana_sources_json: str = Field(default="", validation_alias="GRAFANA_SOURCES_JSON") # HTTP API (service account): Grafana → Administration → Service accounts → токен grafana_url: str = Field(default="", validation_alias="GRAFANA_URL") grafana_service_account_token: str = Field( diff --git a/onguard24/domain/events.py b/onguard24/domain/events.py index ba6077f..9caa72b 100644 --- a/onguard24/domain/events.py +++ b/onguard24/domain/events.py @@ -26,6 +26,8 @@ class AlertReceived(DomainEvent): name: str = "alert.received" alert: Alert | None = None raw_payload_ref: UUID | None = None + grafana_org_slug: str | None = None + service_name: str | None = None Handler = Callable[[DomainEvent], Awaitable[None]] @@ -59,6 +61,18 @@ class InMemoryEventBus: for h in self._subs.get(event.name, []): await h(event) - async def publish_alert_received(self, alert: Alert, raw_payload_ref: UUID | None = None) -> None: - ev = AlertReceived(alert=alert, raw_payload_ref=raw_payload_ref) + async def publish_alert_received( + self, + alert: Alert, + raw_payload_ref: UUID | None = None, + *, + grafana_org_slug: str | None = None, + service_name: str | None = None, + ) -> None: + ev = AlertReceived( + alert=alert, + raw_payload_ref=raw_payload_ref, + grafana_org_slug=grafana_org_slug, + service_name=service_name, + ) await self.publish(ev) diff --git a/onguard24/grafana_sources.py b/onguard24/grafana_sources.py new file mode 100644 index 0000000..6e04df5 --- /dev/null +++ b/onguard24/grafana_sources.py @@ -0,0 +1,84 @@ +"""Несколько инстансов Grafana: URL API, токен, секрет вебхука по slug (организация / стек).""" + +from __future__ import annotations + +import json +import re +from pydantic import BaseModel, Field, field_validator + +_SLUG_RE = re.compile(r"^[a-z0-9][a-z0-9_-]{0,62}$") + + +class GrafanaSourceEntry(BaseModel): + """Один инстанс Grafana для API и/или приёма вебхуков.""" + + slug: str = Field(..., description="Идентификатор в URL: /ingress/grafana/{slug}") + api_url: str = Field(..., description="Базовый URL без завершающего слэша") + api_token: str = Field(default="", description="Service account token для HTTP API") + webhook_secret: str = Field( + default="", + description="Если задан — только этот секрет для вебхука этого slug; иначе см. GRAFANA_WEBHOOK_SECRET", + ) + + @field_validator("slug") + @classmethod + def slug_ok(cls, v: str) -> str: + s = v.strip().lower() + if not _SLUG_RE.match(s): + raise ValueError( + "slug: только a-z, цифры, - и _, длина 1–63, с буквы/цифры" + ) + return s + + @field_validator("api_url") + @classmethod + def strip_url(cls, v: str) -> str: + return v.rstrip("/") + + +def parse_grafana_sources_json(raw: str) -> list[GrafanaSourceEntry]: + if not raw.strip(): + return [] + data = json.loads(raw) + if not isinstance(data, list): + raise ValueError("GRAFANA_SOURCES_JSON должен быть JSON-массивом объектов") + return [GrafanaSourceEntry.model_validate(x) for x in data] + + +def iter_grafana_sources(settings: "Settings") -> list[GrafanaSourceEntry]: + """Список источников: из GRAFANA_SOURCES_JSON или один синтетический default из GRAFANA_URL.""" + try: + parsed = parse_grafana_sources_json(settings.grafana_sources_json) + except (json.JSONDecodeError, ValueError): + parsed = [] + if parsed: + return parsed + gu = settings.grafana_url.strip() + if not gu: + return [] + return [ + GrafanaSourceEntry( + slug="default", + api_url=gu.rstrip("/"), + api_token=settings.grafana_service_account_token.strip(), + webhook_secret="", + ) + ] + + +def sources_by_slug(settings: "Settings") -> dict[str, GrafanaSourceEntry]: + return {s.slug: s for s in iter_grafana_sources(settings)} + + +def effective_webhook_secret(settings: "Settings", source: GrafanaSourceEntry | None) -> str: + """Пустая строка = проверка вебхука отключена (только для dev).""" + if source and source.webhook_secret.strip(): + return source.webhook_secret.strip() + return settings.grafana_webhook_secret.strip() + + +def webhook_authorized(settings: "Settings", source: GrafanaSourceEntry | None, header: str | None) -> bool: + need = effective_webhook_secret(settings, source) + if not need: + return True + return (header or "") == need diff --git a/onguard24/ingress/grafana.py b/onguard24/ingress/grafana.py index de0e3d8..a001dad 100644 --- a/onguard24/ingress/grafana.py +++ b/onguard24/ingress/grafana.py @@ -1,29 +1,92 @@ import json import logging +import re from datetime import datetime, timezone +from urllib.parse import urlparse from fastapi import APIRouter, Depends, Header, HTTPException, Request from starlette.responses import Response from onguard24.domain.entities import Alert, Severity +from onguard24.grafana_sources import sources_by_slug, webhook_authorized logger = logging.getLogger(__name__) router = APIRouter(tags=["ingress"]) +_PATH_SLUG_RE = re.compile(r"^[a-z0-9][a-z0-9_-]{0,62}$") + + +def sanitize_source_key(raw: str) -> str: + s = re.sub(r"[^a-zA-Z0-9._-]+", "-", raw.strip()).strip("-").lower() + return s[:200] if s else "" + + +def extract_grafana_source_key(body: dict) -> str | None: + """ + Идентификатор инстанса/организации из тела вебхука Grafana (без ручной прописки в .env). + См. поля payload: externalURL, orgId, лейблы в alerts[]. + """ + chunks: list[str] = [] + ext = body.get("externalURL") or body.get("external_url") + if isinstance(ext, str) and ext.strip(): + try: + host = urlparse(ext.strip()).hostname + if host: + chunks.append(host) + except Exception: + pass + for key in ("orgId", "org_id"): + v = body.get(key) + if v is not None and str(v).strip(): + chunks.append(f"o{str(v).strip()[:24]}") + break + alerts = body.get("alerts") + if isinstance(alerts, list) and alerts: + a0 = alerts[0] if isinstance(alerts[0], dict) else {} + labels = a0.get("labels") if isinstance(a0.get("labels"), dict) else {} + for lk in ("__org_id__", "grafana_org", "tenant", "cluster", "namespace"): + v = labels.get(lk) + if v is not None and str(v).strip(): + chunks.append(str(v).strip()[:64]) + break + if not chunks: + return None + return sanitize_source_key("-".join(chunks)) or None + async def get_pool(request: Request): return getattr(request.app.state, "pool", None) -@router.post("/ingress/grafana", status_code=202) -async def grafana_webhook( +def service_hint_from_grafana_body(body: dict, header_service: str | None) -> str | None: + """Имя сервиса: заголовок X-OnGuard-Service или лейблы из Unified Alerting.""" + if header_service and header_service.strip(): + return header_service.strip()[:200] + alerts = body.get("alerts") + if isinstance(alerts, list) and alerts: + labels = alerts[0].get("labels") if isinstance(alerts[0], dict) else None + if isinstance(labels, dict): + for key in ("service", "job", "namespace", "cluster", "instance"): + v = labels.get(key) + if v is not None and str(v).strip(): + return str(v).strip()[:200] + common = body.get("commonLabels") + if isinstance(common, dict): + for key in ("service", "job", "namespace"): + v = common.get(key) + if v is not None and str(v).strip(): + return str(v).strip()[:200] + return None + + +async def _grafana_webhook_impl( request: Request, - pool=Depends(get_pool), - x_onguard_secret: str | None = Header(default=None, alias="X-OnGuard-Secret"), -): + pool, + x_onguard_secret: str | None, + x_onguard_service: str | None, + path_slug: str | None, +) -> Response: settings = request.app.state.settings - if settings.grafana_webhook_secret and x_onguard_secret != settings.grafana_webhook_secret: - raise HTTPException(status_code=401, detail="unauthorized") raw = await request.body() if len(raw) > 1_000_000: @@ -32,6 +95,25 @@ async def grafana_webhook( body = json.loads(raw.decode() or "{}") except json.JSONDecodeError: body = {} + if not isinstance(body, dict): + body = {} + + derived = extract_grafana_source_key(body) + path_key: str | None = None + if path_slug is not None: + path_key = path_slug.strip().lower() + if not _PATH_SLUG_RE.match(path_key): + raise HTTPException(status_code=400, detail="invalid path slug") + stored_org_slug = path_key + else: + stored_org_slug = derived + + by = sources_by_slug(settings) + source = by.get(path_key) if path_key else None + if not webhook_authorized(settings, source, x_onguard_secret): + raise HTTPException(status_code=401, detail="unauthorized") + + service_name = service_hint_from_grafana_body(body, x_onguard_service) if pool is None: logger.warning("ingress: database not configured, event not persisted") @@ -39,9 +121,15 @@ async def grafana_webhook( async with pool.acquire() as conn: row = await conn.fetchrow( - "INSERT INTO ingress_events (source, body) VALUES ($1, $2::jsonb) RETURNING id", + """ + INSERT INTO ingress_events (source, body, org_slug, service_name) + VALUES ($1, $2::jsonb, $3, $4) + RETURNING id + """, "grafana", json.dumps(body), + stored_org_slug, + service_name, ) raw_id = row["id"] if row else None bus = getattr(request.app.state, "event_bus", None) @@ -54,5 +142,43 @@ async def grafana_webhook( payload=body, received_at=datetime.now(timezone.utc), ) - await bus.publish_alert_received(alert, raw_payload_ref=raw_id) + await bus.publish_alert_received( + alert, + raw_payload_ref=raw_id, + grafana_org_slug=stored_org_slug, + service_name=service_name, + ) return Response(status_code=202) + + +@router.post("/ingress/grafana", status_code=202) +async def grafana_webhook_legacy( + request: Request, + pool=Depends(get_pool), + x_onguard_secret: str | None = Header(default=None, alias="X-OnGuard-Secret"), + x_onguard_service: str | None = Header(default=None, alias="X-OnGuard-Service"), +): + """ + Универсальный URL для любого инстанса Grafana: org_slug в БД берётся из тела + (externalURL, orgId, лейблы), без преднастройки в .env. + """ + return await _grafana_webhook_impl( + request, pool, x_onguard_secret, x_onguard_service, path_slug=None + ) + + +@router.post("/ingress/grafana/{org_slug}", status_code=202) +async def grafana_webhook_org( + org_slug: str, + request: Request, + pool=Depends(get_pool), + x_onguard_secret: str | None = Header(default=None, alias="X-OnGuard-Secret"), + x_onguard_service: str | None = Header(default=None, alias="X-OnGuard-Service"), +): + """ + Опционально: явный ярлык в URL (перекрывает авто-извлечение из JSON). + Секрет для пути: webhook_secret из GRAFANA_SOURCES_JSON для этого slug, иначе общий. + """ + return await _grafana_webhook_impl( + request, pool, x_onguard_secret, x_onguard_service, path_slug=org_slug + ) diff --git a/onguard24/integrations/grafana_topology.py b/onguard24/integrations/grafana_topology.py new file mode 100644 index 0000000..5f89eea --- /dev/null +++ b/onguard24/integrations/grafana_topology.py @@ -0,0 +1,169 @@ +"""Чтение иерархии Grafana: org, папки, managed alert rules (Ruler API).""" + +from __future__ import annotations + +import json +import logging +from dataclasses import dataclass +from typing import Any + +import httpx + +log = logging.getLogger(__name__) + +_HEADERS_JSON = {"Accept": "application/json"} + + +def _auth_headers(token: str) -> dict[str, str]: + return {**_HEADERS_JSON, "Authorization": f"Bearer {token.strip()}"} + + +@dataclass +class ParsedRuleRow: + namespace_uid: str + rule_group_name: str + rule_group_interval: str | None + rule_uid: str + title: str + labels: dict[str, Any] + + +async def fetch_org(base_url: str, token: str) -> tuple[dict[str, Any] | None, str | None]: + base = base_url.rstrip("/") + try: + async with httpx.AsyncClient(timeout=30.0, verify=True, follow_redirects=True) as client: + r = await client.get(f"{base}/api/org", headers=_auth_headers(token)) + except Exception as e: + return None, str(e) + if r.status_code != 200: + return None, f"http {r.status_code}: {(r.text or '')[:300]}" + try: + return r.json(), None + except Exception: + return None, "invalid json from /api/org" + + +async def fetch_all_folders(base_url: str, token: str) -> tuple[list[dict[str, Any]], str | None]: + """Обход дерева папок через /api/folders?parentUid=…""" + base = base_url.rstrip("/") + out: list[dict[str, Any]] = [] + seen: set[str] = set() + queue: list[str | None] = [None] + + try: + async with httpx.AsyncClient(timeout=60.0, verify=True, follow_redirects=True) as client: + while queue: + parent_uid = queue.pop(0) + params: dict[str, str] = {} + if parent_uid is not None: + params["parentUid"] = parent_uid + r = await client.get(f"{base}/api/folders", headers=_auth_headers(token), params=params) + if r.status_code != 200: + return out, f"folders http {r.status_code}: {(r.text or '')[:200]}" + try: + chunk = r.json() + except Exception: + return out, "invalid json from /api/folders" + if not isinstance(chunk, list): + return out, "folders response is not a list" + for f in chunk: + if not isinstance(f, dict): + continue + uid = f.get("uid") + if not uid or uid in seen: + continue + seen.add(str(uid)) + out.append(f) + queue.append(str(uid)) + except Exception as e: + return out, str(e) + return out, None + + +async def fetch_ruler_rules_raw(base_url: str, token: str) -> tuple[dict[str, Any] | None, str | None]: + """GET Ruler API для Grafana-managed правил (namespace → группы → rules).""" + base = base_url.rstrip("/") + paths = ( + "/api/ruler/grafana/api/v1/rules", + "/api/ruler/Grafana/api/v1/rules", + ) + last_err: str | None = None + try: + async with httpx.AsyncClient(timeout=90.0, verify=True, follow_redirects=True) as client: + for path in paths: + r = await client.get(f"{base}{path}", headers=_auth_headers(token)) + if r.status_code == 200: + try: + data = r.json() + except Exception: + return None, "invalid json from ruler" + if isinstance(data, dict): + return data, None + return None, "ruler response is not an object" + last_err = f"ruler {path} http {r.status_code}: {(r.text or '')[:200]}" + except Exception as e: + return None, str(e) + return None, last_err or "ruler: no path matched" + + +def parse_ruler_rules(data: dict[str, Any]) -> list[ParsedRuleRow]: + rows: list[ParsedRuleRow] = [] + for namespace_uid, groups in data.items(): + if not isinstance(namespace_uid, str) or not namespace_uid.strip(): + continue + ns = namespace_uid.strip() + if not isinstance(groups, list): + continue + for grp in groups: + if not isinstance(grp, dict): + continue + gname = str(grp.get("name") or "group") + interval = grp.get("interval") + interval_s = str(interval) if interval is not None else None + rules = grp.get("rules") + if not isinstance(rules, list): + continue + for rule in rules: + if not isinstance(rule, dict): + continue + ga = rule.get("grafana_alert") + if not isinstance(ga, dict): + continue + uid = ga.get("uid") or rule.get("uid") + if not uid: + continue + title = str(ga.get("title") or gname or uid) + labels = rule.get("labels") + if not isinstance(labels, dict): + labels = {} + lbl = {str(k): str(v) for k, v in labels.items() if v is not None} + rows.append( + ParsedRuleRow( + namespace_uid=ns, + rule_group_name=gname, + rule_group_interval=interval_s, + rule_uid=str(uid), + title=title[:500], + labels=lbl, + ) + ) + return rows + + +def merge_folder_rows( + api_folders: list[dict[str, Any]], + rule_namespaces: set[str], +) -> list[tuple[str, str, str | None]]: + """(uid, title, parent_uid); добавляем namespace из ruler без записи в /api/folders.""" + by_uid: dict[str, tuple[str, str | None]] = {} + for f in api_folders: + uid = f.get("uid") + if not uid: + continue + p = f.get("parentUid") + parent = str(p) if p else None + by_uid[str(uid)] = (str(f.get("title") or uid), parent) + for ns in rule_namespaces: + if ns not in by_uid: + by_uid[ns] = (ns, None) + return [(uid, t[0], t[1]) for uid, t in sorted(by_uid.items(), key=lambda x: x[0])] diff --git a/onguard24/modules/escalations.py b/onguard24/modules/escalations.py index 0d9e126..bd59e46 100644 --- a/onguard24/modules/escalations.py +++ b/onguard24/modules/escalations.py @@ -25,6 +25,12 @@ class PolicyCreate(BaseModel): steps: list[dict] = Field(default_factory=list) +class PolicyPatch(BaseModel): + name: str | None = Field(default=None, min_length=1, max_length=200) + enabled: bool | None = None + steps: list[dict] | None = None + + def register_events(_bus: EventBus, _pool: asyncpg.Pool | None = None) -> None: pass @@ -95,6 +101,67 @@ async def create_policy_api(body: PolicyCreate, pool: asyncpg.Pool | None = Depe } +def _policy_dict(row) -> dict: + steps = row["steps"] + if isinstance(steps, str): + steps = json.loads(steps) + return { + "id": str(row["id"]), + "name": row["name"], + "enabled": row["enabled"], + "steps": steps if isinstance(steps, list) else [], + "created_at": row["created_at"].isoformat() if row["created_at"] else None, + } + + +@router.get("/{policy_id}") +async def get_policy_api(policy_id: UUID, pool: asyncpg.Pool | None = Depends(get_pool)): + if pool is None: + raise HTTPException(status_code=503, detail="database disabled") + async with pool.acquire() as conn: + row = await conn.fetchrow( + """ + SELECT id, name, enabled, steps, created_at + FROM escalation_policies WHERE id = $1::uuid + """, + policy_id, + ) + if not row: + raise HTTPException(status_code=404, detail="not found") + return _policy_dict(row) + + +@router.patch("/{policy_id}") +async def patch_policy_api( + policy_id: UUID, + body: PolicyPatch, + pool: asyncpg.Pool | None = Depends(get_pool), +): + if pool is None: + raise HTTPException(status_code=503, detail="database disabled") + if body.name is None and body.enabled is None and body.steps is None: + raise HTTPException(status_code=400, detail="no fields to update") + steps_json = json.dumps(body.steps) if body.steps is not None else None + async with pool.acquire() as conn: + row = await conn.fetchrow( + """ + UPDATE escalation_policies SET + name = COALESCE($2, name), + enabled = COALESCE($3, enabled), + steps = COALESCE($4::jsonb, steps) + WHERE id = $1::uuid + RETURNING id, name, enabled, steps, created_at + """, + policy_id, + body.name.strip() if body.name is not None else None, + body.enabled, + steps_json, + ) + if not row: + raise HTTPException(status_code=404, detail="not found") + return _policy_dict(row) + + @router.delete("/{policy_id}", status_code=204) async def delete_policy_api(policy_id: UUID, pool: asyncpg.Pool | None = Depends(get_pool)): if pool is None: diff --git a/onguard24/modules/grafana_catalog.py b/onguard24/modules/grafana_catalog.py new file mode 100644 index 0000000..2df6b1c --- /dev/null +++ b/onguard24/modules/grafana_catalog.py @@ -0,0 +1,380 @@ +"""Кэш иерархии Grafana (инстанс → org → папки → правила) через API + БД.""" + +from __future__ import annotations + +import html +import json +import logging +from dataclasses import dataclass + +import asyncpg +from fastapi import APIRouter, Depends, HTTPException, Request +from fastapi.responses import HTMLResponse +from pydantic import BaseModel, Field + +from onguard24.config import get_settings +from onguard24.deps import get_pool +from onguard24.domain.events import EventBus +from onguard24.grafana_sources import iter_grafana_sources +from onguard24.integrations import grafana_topology as gt +from onguard24.modules.ui_support import wrap_module_html_page + +log = logging.getLogger(__name__) + +router = APIRouter(tags=["module-grafana-catalog"]) +ui_router = APIRouter(tags=["web-grafana-catalog"], include_in_schema=False) + + +def register_events(_bus: EventBus, _pool: asyncpg.Pool | None = None) -> None: + pass + + +class SyncBody(BaseModel): + instance_slug: str | None = Field( + default=None, + description="Slug из GRAFANA_SOURCES_JSON; пусто — все источники с api_token", + ) + + +@dataclass +class PullOutcome: + org_id: int + org_name: str + folder_rows: list[tuple[str, str, str | None]] + rules: list[gt.ParsedRuleRow] + warnings: list[str] + + +async def pull_topology(api_url: str, token: str) -> tuple[PullOutcome | None, str | None]: + org, oerr = await gt.fetch_org(api_url, token) + if oerr or not org: + return None, oerr or "no org" + oid = org.get("id") + if oid is None: + return None, "org response without id" + oname = str(org.get("name") or "") + + warnings: list[str] = [] + folders_raw, ferr = await gt.fetch_all_folders(api_url, token) + if ferr: + warnings.append(ferr) + + ruler_raw, rerr = await gt.fetch_ruler_rules_raw(api_url, token) + if rerr: + warnings.append(rerr) + rules: list[gt.ParsedRuleRow] = [] + namespaces: set[str] = set() + else: + rules = gt.parse_ruler_rules(ruler_raw or {}) + namespaces = {r.namespace_uid for r in rules} + + merged = gt.merge_folder_rows(folders_raw, namespaces) + return ( + PullOutcome( + org_id=int(oid), + org_name=oname, + folder_rows=merged, + rules=rules, + warnings=warnings, + ), + None, + ) + + +async def persist_topology( + conn: asyncpg.Connection, + instance_slug: str, + outcome: PullOutcome, +) -> None: + oid = outcome.org_id + await conn.execute( + """ + DELETE FROM grafana_catalog_rules + WHERE instance_slug = $1 AND grafana_org_id = $2 + """, + instance_slug, + oid, + ) + await conn.execute( + """ + DELETE FROM grafana_catalog_folders + WHERE instance_slug = $1 AND grafana_org_id = $2 + """, + instance_slug, + oid, + ) + + for uid, title, parent in outcome.folder_rows: + await conn.execute( + """ + INSERT INTO grafana_catalog_folders + (instance_slug, grafana_org_id, folder_uid, title, parent_uid) + VALUES ($1, $2, $3, $4, $5) + """, + instance_slug, + oid, + uid, + title, + parent, + ) + + for r in outcome.rules: + await conn.execute( + """ + INSERT INTO grafana_catalog_rules ( + instance_slug, grafana_org_id, namespace_uid, rule_group_name, + rule_uid, title, rule_group_interval, labels + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8::jsonb) + """, + instance_slug, + oid, + r.namespace_uid, + r.rule_group_name, + r.rule_uid, + r.title, + r.rule_group_interval, + json.dumps(r.labels), + ) + + fc = len(outcome.folder_rows) + rc = len(outcome.rules) + warn_txt = "; ".join(outcome.warnings) if outcome.warnings else None + if warn_txt and len(warn_txt) > 1900: + warn_txt = warn_txt[:1900] + "…" + await conn.execute( + """ + INSERT INTO grafana_catalog_meta ( + instance_slug, grafana_org_id, org_name, synced_at, + folder_count, rule_count, error_text + ) + VALUES ($1, $2, $3, now(), $4, $5, $6) + ON CONFLICT (instance_slug, grafana_org_id) DO UPDATE SET + org_name = EXCLUDED.org_name, + synced_at = EXCLUDED.synced_at, + folder_count = EXCLUDED.folder_count, + rule_count = EXCLUDED.rule_count, + error_text = EXCLUDED.error_text + """, + instance_slug, + oid, + outcome.org_name, + fc, + rc, + warn_txt, + ) + + +@router.post("/sync", status_code=200) +async def sync_catalog_api( + body: SyncBody, + pool: asyncpg.Pool | None = Depends(get_pool), +): + if pool is None: + raise HTTPException(status_code=503, detail="database disabled") + sources = iter_grafana_sources(get_settings()) + if body.instance_slug: + sl = body.instance_slug.strip().lower() + sources = [s for s in sources if s.slug == sl] + if not sources: + raise HTTPException(status_code=404, detail="unknown instance slug") + to_run = [s for s in sources if s.api_token.strip()] + if not to_run: + raise HTTPException( + status_code=400, + detail="нет источников с api_token; задайте GRAFANA_SOURCES_JSON или GRAFANA_SERVICE_ACCOUNT_TOKEN", + ) + + results: list[dict] = [] + for src in to_run: + outcome, err = await pull_topology(src.api_url, src.api_token) + if err or outcome is None: + results.append({"slug": src.slug, "ok": False, "error": err}) + log.warning("grafana_catalog sync failed %s: %s", src.slug, err) + continue + async with pool.acquire() as conn: + async with conn.transaction(): + await persist_topology(conn, src.slug, outcome) + results.append( + { + "slug": src.slug, + "ok": True, + "org_id": outcome.org_id, + "org_name": outcome.org_name, + "folders": len(outcome.folder_rows), + "rules": len(outcome.rules), + "warnings": outcome.warnings, + } + ) + return {"results": results} + + +@router.get("/meta") +async def list_meta_api(pool: asyncpg.Pool | None = Depends(get_pool)): + if pool is None: + return {"items": [], "database": "disabled"} + async with pool.acquire() as conn: + rows = await conn.fetch( + """ + SELECT instance_slug, grafana_org_id, org_name, synced_at, + folder_count, rule_count, error_text + FROM grafana_catalog_meta + ORDER BY instance_slug, grafana_org_id + """ + ) + items = [] + for r in rows: + items.append( + { + "instance_slug": r["instance_slug"], + "grafana_org_id": r["grafana_org_id"], + "org_name": r["org_name"], + "synced_at": r["synced_at"].isoformat() if r["synced_at"] else None, + "folder_count": r["folder_count"], + "rule_count": r["rule_count"], + "error_text": r["error_text"], + } + ) + return {"items": items} + + +@router.get("/tree") +async def tree_api( + instance_slug: str, + pool: asyncpg.Pool | None = Depends(get_pool), +): + if pool is None: + raise HTTPException(status_code=503, detail="database disabled") + slug = instance_slug.strip().lower() + async with pool.acquire() as conn: + meta = await conn.fetchrow( + """ + SELECT * FROM grafana_catalog_meta + WHERE instance_slug = $1 AND grafana_org_id > 0 + ORDER BY synced_at DESC LIMIT 1 + """, + slug, + ) + if not meta: + raise HTTPException(status_code=404, detail="no catalog for this slug; run POST /sync first") + oid = meta["grafana_org_id"] + folders = await conn.fetch( + """ + SELECT folder_uid, title, parent_uid + FROM grafana_catalog_folders + WHERE instance_slug = $1 AND grafana_org_id = $2 + ORDER BY title + """, + slug, + oid, + ) + rules = await conn.fetch( + """ + SELECT namespace_uid, rule_group_name, rule_uid, title, + rule_group_interval, labels + FROM grafana_catalog_rules + WHERE instance_slug = $1 AND grafana_org_id = $2 + ORDER BY namespace_uid, rule_group_name, title + """, + slug, + oid, + ) + + by_ns: dict[str, list[dict]] = {} + for r in rules: + ns = r["namespace_uid"] + by_ns.setdefault(ns, []).append( + { + "rule_uid": r["rule_uid"], + "title": r["title"], + "rule_group": r["rule_group_name"], + "interval": r["rule_group_interval"], + "labels": r["labels"] if isinstance(r["labels"], dict) else {}, + } + ) + + folder_nodes = [] + for f in folders: + uid = f["folder_uid"] + folder_nodes.append( + { + "folder_uid": uid, + "title": f["title"], + "parent_uid": f["parent_uid"], + "rules": by_ns.get(uid, []), + } + ) + + return { + "instance_slug": slug, + "grafana_org_id": oid, + "org_name": meta["org_name"], + "synced_at": meta["synced_at"].isoformat() if meta["synced_at"] else None, + "folders": folder_nodes, + "orphan_rule_namespaces": sorted(set(by_ns.keys()) - {f["folder_uid"] for f in folders}), + } + + +@ui_router.get("/", response_class=HTMLResponse) +async def grafana_catalog_ui(request: Request): + pool = get_pool(request) + inner = "" + if pool is None: + inner = "

База не настроена.

" + else: + try: + async with pool.acquire() as conn: + rows = await conn.fetch( + """ + SELECT instance_slug, org_name, synced_at, folder_count, rule_count, error_text + FROM grafana_catalog_meta + ORDER BY instance_slug + """ + ) + if not rows: + inner = "

Каталог пуст. Вызовите POST /api/v1/modules/grafana-catalog/sync.

" + else: + inner = "" + for r in rows: + err = html.escape(str(r["error_text"] or "—"))[:120] + st = r["synced_at"].isoformat() if r["synced_at"] else "—" + inner += ( + f"" + f"" + f"" + f"" + f"" + ) + inner += "
SlugOrgСинхр.ПапокПравилОшибка
{html.escape(r['instance_slug'])}{html.escape(str(r['org_name']))}{html.escape(st)}{r['folder_count']}{r['rule_count']}{err}
" + except Exception as e: + inner = f"

{html.escape(str(e))}

" + page = f"""

Каталог Grafana

+

Иерархия: инстанс (slug) → организация → папки → правила. Синхронизация по HTTP API.

+{inner} +

API: POST …/grafana-catalog/sync, GET …/grafana-catalog/tree?instance_slug=…

""" + return HTMLResponse( + wrap_module_html_page( + document_title="Каталог Grafana — onGuard24", + current_slug="grafana-catalog", + main_inner_html=page, + ) + ) + + +async def render_home_fragment(request: Request) -> str: + pool = get_pool(request) + if pool is None: + return '

Нужна БД для каталога Grafana.

' + try: + async with pool.acquire() as conn: + n = await conn.fetchval("SELECT count(*)::int FROM grafana_catalog_meta WHERE grafana_org_id >= 0") + last = await conn.fetchrow( + "SELECT max(synced_at) AS m FROM grafana_catalog_meta WHERE grafana_org_id >= 0" + ) + except Exception: + return '

Таблицы каталога недоступны (миграции?).

' + ts = last["m"].isoformat() if last and last["m"] else "никогда" + return ( + f'

Источников с синхронизацией: {int(n)}. ' + f"Последняя синхр.: {html.escape(ts)}

" + ) diff --git a/onguard24/modules/incidents.py b/onguard24/modules/incidents.py index b54f801..dc16e08 100644 --- a/onguard24/modules/incidents.py +++ b/onguard24/modules/incidents.py @@ -27,6 +27,12 @@ class IncidentCreate(BaseModel): severity: str = Field(default="warning", max_length=32) +class IncidentPatch(BaseModel): + title: str | None = Field(default=None, min_length=1, max_length=500) + status: str | None = Field(default=None, max_length=64) + severity: str | None = Field(default=None, max_length=32) + + def register_events(bus: EventBus, pool: asyncpg.Pool | None = None) -> None: if pool is None: return @@ -41,12 +47,17 @@ def register_events(bus: EventBus, pool: asyncpg.Pool | None = None) -> None: async with pool.acquire() as conn: await conn.execute( """ - INSERT INTO incidents (title, status, severity, source, ingress_event_id) - VALUES ($1, 'open', $2, 'grafana', $3::uuid) + INSERT INTO incidents ( + title, status, severity, source, ingress_event_id, + grafana_org_slug, service_name + ) + VALUES ($1, 'open', $2, 'grafana', $3::uuid, $4, $5) """, title, sev, ev.raw_payload_ref, + ev.grafana_org_slug, + ev.service_name, ) except Exception: log.exception("incidents: не удалось создать инцидент из alert.received") @@ -70,20 +81,33 @@ async def render_home_fragment(request: Request) -> str: async def list_incidents_api( pool: asyncpg.Pool | None = Depends(get_pool), limit: int = 50, + grafana_org_slug: str | None = None, + service_name: str | None = None, ): if pool is None: return {"items": [], "database": "disabled"} limit = min(max(limit, 1), 200) - async with pool.acquire() as conn: - rows = await conn.fetch( - """ - SELECT id, title, status, severity, source, ingress_event_id, created_at + conditions: list[str] = [] + args: list = [] + if grafana_org_slug and grafana_org_slug.strip(): + args.append(grafana_org_slug.strip()) + conditions.append(f"grafana_org_slug = ${len(args)}") + if service_name and service_name.strip(): + args.append(service_name.strip()) + conditions.append(f"service_name = ${len(args)}") + where_sql = ("WHERE " + " AND ".join(conditions)) if conditions else "" + args.append(limit) + lim_ph = f"${len(args)}" + q = f""" + SELECT id, title, status, severity, source, ingress_event_id, created_at, updated_at, + grafana_org_slug, service_name FROM incidents + {where_sql} ORDER BY created_at DESC - LIMIT $1 - """, - limit, - ) + LIMIT {lim_ph} + """ + async with pool.acquire() as conn: + rows = await conn.fetch(q, *args) items = [] for r in rows: items.append( @@ -95,6 +119,9 @@ async def list_incidents_api( "source": r["source"], "ingress_event_id": str(r["ingress_event_id"]) if r["ingress_event_id"] else None, "created_at": r["created_at"].isoformat() if r["created_at"] else None, + "updated_at": r["updated_at"].isoformat() if r.get("updated_at") else None, + "grafana_org_slug": r.get("grafana_org_slug"), + "service_name": r.get("service_name"), } ) return {"items": items} @@ -110,9 +137,10 @@ async def create_incident_api( async with pool.acquire() as conn: row = await conn.fetchrow( """ - INSERT INTO incidents (title, status, severity, source) - VALUES ($1, $2, $3, 'manual') - RETURNING id, title, status, severity, source, ingress_event_id, created_at + INSERT INTO incidents (title, status, severity, source, grafana_org_slug, service_name) + VALUES ($1, $2, $3, 'manual', NULL, NULL) + RETURNING id, title, status, severity, source, ingress_event_id, created_at, updated_at, + grafana_org_slug, service_name """, body.title.strip(), body.status, @@ -126,9 +154,63 @@ async def create_incident_api( "source": row["source"], "ingress_event_id": None, "created_at": row["created_at"].isoformat() if row["created_at"] else None, + "updated_at": row["updated_at"].isoformat() if row.get("updated_at") else None, + "grafana_org_slug": row.get("grafana_org_slug"), + "service_name": row.get("service_name"), } +def _incident_row_dict(row) -> dict: + return { + "id": str(row["id"]), + "title": row["title"], + "status": row["status"], + "severity": row["severity"], + "source": row["source"], + "ingress_event_id": str(row["ingress_event_id"]) if row["ingress_event_id"] else None, + "created_at": row["created_at"].isoformat() if row["created_at"] else None, + "updated_at": row["updated_at"].isoformat() if row.get("updated_at") else None, + "grafana_org_slug": row.get("grafana_org_slug"), + "service_name": row.get("service_name"), + } + + +@router.get("/{incident_id}/tasks") +async def list_incident_tasks_api( + incident_id: UUID, + pool: asyncpg.Pool | None = Depends(get_pool), + limit: int = 100, +): + if pool is None: + raise HTTPException(status_code=503, detail="database disabled") + limit = min(max(limit, 1), 200) + async with pool.acquire() as conn: + exists = await conn.fetchval("SELECT 1 FROM incidents WHERE id = $1::uuid", incident_id) + if not exists: + raise HTTPException(status_code=404, detail="incident not found") + rows = await conn.fetch( + """ + SELECT id, incident_id, title, status, created_at + FROM tasks WHERE incident_id = $1::uuid + ORDER BY created_at DESC LIMIT $2 + """, + incident_id, + limit, + ) + items = [] + for r in rows: + items.append( + { + "id": str(r["id"]), + "incident_id": str(r["incident_id"]) if r["incident_id"] else None, + "title": r["title"], + "status": r["status"], + "created_at": r["created_at"].isoformat() if r["created_at"] else None, + } + ) + return {"incident_id": str(incident_id), "items": items} + + @router.get("/{incident_id}") async def get_incident_api(incident_id: UUID, pool: asyncpg.Pool | None = Depends(get_pool)): if pool is None: @@ -136,22 +218,47 @@ async def get_incident_api(incident_id: UUID, pool: asyncpg.Pool | None = Depend async with pool.acquire() as conn: row = await conn.fetchrow( """ - SELECT id, title, status, severity, source, ingress_event_id, created_at + SELECT id, title, status, severity, source, ingress_event_id, created_at, updated_at, + grafana_org_slug, service_name FROM incidents WHERE id = $1::uuid """, incident_id, ) if not row: raise HTTPException(status_code=404, detail="not found") - return { - "id": str(row["id"]), - "title": row["title"], - "status": row["status"], - "severity": row["severity"], - "source": row["source"], - "ingress_event_id": str(row["ingress_event_id"]) if row["ingress_event_id"] else None, - "created_at": row["created_at"].isoformat() if row["created_at"] else None, - } + return _incident_row_dict(row) + + +@router.patch("/{incident_id}") +async def patch_incident_api( + incident_id: UUID, + body: IncidentPatch, + pool: asyncpg.Pool | None = Depends(get_pool), +): + if pool is None: + raise HTTPException(status_code=503, detail="database disabled") + if body.title is None and body.status is None and body.severity is None: + raise HTTPException(status_code=400, detail="no fields to update") + async with pool.acquire() as conn: + row = await conn.fetchrow( + """ + UPDATE incidents SET + title = COALESCE($2, title), + status = COALESCE($3, status), + severity = COALESCE($4, severity), + updated_at = now() + WHERE id = $1::uuid + RETURNING id, title, status, severity, source, ingress_event_id, created_at, updated_at, + grafana_org_slug, service_name + """, + incident_id, + body.title.strip() if body.title is not None else None, + body.status, + body.severity, + ) + if not row: + raise HTTPException(status_code=404, detail="not found") + return _incident_row_dict(row) @ui_router.get("/", response_class=HTMLResponse) @@ -166,13 +273,15 @@ async def incidents_ui_home(request: Request): async with pool.acquire() as conn: rows = await conn.fetch( """ - SELECT id, title, status, severity, source, created_at + SELECT id, title, status, severity, source, created_at, grafana_org_slug, service_name FROM incidents ORDER BY created_at DESC LIMIT 100 """ ) for r in rows: + org = html.escape(str(r["grafana_org_slug"] or "—")) + svc = html.escape(str(r["service_name"] or "—")) rows_html += ( "" f"{html.escape(str(r['id']))[:8]}…" @@ -180,6 +289,8 @@ async def incidents_ui_home(request: Request): f"{html.escape(r['status'])}" f"{html.escape(r['severity'])}" f"{html.escape(r['source'])}" + f"{org}" + f"{svc}" "" ) except Exception as e: @@ -187,8 +298,8 @@ async def incidents_ui_home(request: Request): inner = f"""

Инциденты

{err} - -{rows_html or ''} + +{rows_html or ''}
IDЗаголовокСтатусВажностьИсточник
Пока нет записей
IDЗаголовокСтатусВажностьИсточникGrafana slugСервис
Пока нет записей

Создание из Grafana: webhook → запись в ingress_events → событие → строка здесь.

""" return HTMLResponse( diff --git a/onguard24/modules/registry.py b/onguard24/modules/registry.py index c2b9bb0..16827d2 100644 --- a/onguard24/modules/registry.py +++ b/onguard24/modules/registry.py @@ -16,6 +16,7 @@ from onguard24.domain.events import EventBus from onguard24.modules import ( contacts, escalations, + grafana_catalog, incidents, schedules, statusboard, @@ -42,6 +43,15 @@ class ModuleMount: def _mounts() -> list[ModuleMount]: return [ + ModuleMount( + router=grafana_catalog.router, + url_prefix="/api/v1/modules/grafana-catalog", + register_events=grafana_catalog.register_events, + slug="grafana-catalog", + title="Каталог Grafana", + ui_router=grafana_catalog.ui_router, + render_home_fragment=grafana_catalog.render_home_fragment, + ), ModuleMount( router=incidents.router, url_prefix="/api/v1/modules/incidents", diff --git a/onguard24/modules/tasks.py b/onguard24/modules/tasks.py index e1e74e0..eb7d04d 100644 --- a/onguard24/modules/tasks.py +++ b/onguard24/modules/tasks.py @@ -23,6 +23,11 @@ class TaskCreate(BaseModel): incident_id: UUID | None = None +class TaskPatch(BaseModel): + title: str | None = Field(default=None, min_length=1, max_length=500) + status: str | None = Field(default=None, max_length=64) + + def register_events(_bus: EventBus, _pool: asyncpg.Pool | None = None) -> None: pass @@ -114,6 +119,63 @@ async def create_task_api(body: TaskCreate, pool: asyncpg.Pool | None = Depends( } +@router.get("/{task_id}") +async def get_task_api(task_id: UUID, pool: asyncpg.Pool | None = Depends(get_pool)): + if pool is None: + raise HTTPException(status_code=503, detail="database disabled") + async with pool.acquire() as conn: + row = await conn.fetchrow( + """ + SELECT id, incident_id, title, status, created_at + FROM tasks WHERE id = $1::uuid + """, + task_id, + ) + if not row: + raise HTTPException(status_code=404, detail="not found") + return { + "id": str(row["id"]), + "incident_id": str(row["incident_id"]) if row["incident_id"] else None, + "title": row["title"], + "status": row["status"], + "created_at": row["created_at"].isoformat() if row["created_at"] else None, + } + + +@router.patch("/{task_id}") +async def patch_task_api( + task_id: UUID, + body: TaskPatch, + pool: asyncpg.Pool | None = Depends(get_pool), +): + if pool is None: + raise HTTPException(status_code=503, detail="database disabled") + if body.title is None and body.status is None: + raise HTTPException(status_code=400, detail="no fields to update") + async with pool.acquire() as conn: + row = await conn.fetchrow( + """ + UPDATE tasks SET + title = COALESCE($2, title), + status = COALESCE($3, status) + WHERE id = $1::uuid + RETURNING id, incident_id, title, status, created_at + """, + task_id, + body.title.strip() if body.title is not None else None, + body.status, + ) + if not row: + raise HTTPException(status_code=404, detail="not found") + return { + "id": str(row["id"]), + "incident_id": str(row["incident_id"]) if row["incident_id"] else None, + "title": row["title"], + "status": row["status"], + "created_at": row["created_at"].isoformat() if row["created_at"] else None, + } + + @ui_router.get("/", response_class=HTMLResponse) async def tasks_ui_home(request: Request): pool = get_pool(request) diff --git a/onguard24/status_snapshot.py b/onguard24/status_snapshot.py index edd2221..327e04d 100644 --- a/onguard24/status_snapshot.py +++ b/onguard24/status_snapshot.py @@ -3,6 +3,7 @@ from fastapi import Request from onguard24.config import Settings +from onguard24.grafana_sources import iter_grafana_sources from onguard24.integrations import forgejo_api, grafana_api from onguard24.vaultcheck import ping as vault_ping @@ -31,31 +32,46 @@ async def build(request: Request) -> dict: else: out["vault"] = "disabled" - gu = settings.grafana_url.strip() - if not gu: + sources = iter_grafana_sources(settings) + if not sources: out["grafana"] = "disabled" - elif settings.grafana_service_account_token.strip(): - ok, err = await grafana_api.ping(gu, settings.grafana_service_account_token) - if ok: - user, _ = await grafana_api.get_signed_in_user(gu, settings.grafana_service_account_token) - entry: dict = {"status": "ok", "url": gu, "api": "authenticated"} - if user: - login = user.get("login") or user.get("email") - if login: - entry["service_account_login"] = login - out["grafana"] = entry - else: - out["grafana"] = {"status": "error", "detail": err, "url": gu} else: - live_ok, live_err = await grafana_api.health_live(gu) - if live_ok: - out["grafana"] = { - "status": "reachable", - "url": gu, - "detail": "задай GRAFANA_SERVICE_ACCOUNT_TOKEN для вызовов API", - } + instances: list[dict] = [] + for src in sources: + entry: dict = {"slug": src.slug, "url": src.api_url} + if src.api_token.strip(): + ok, err = await grafana_api.ping(src.api_url, src.api_token) + if ok: + user, _ = await grafana_api.get_signed_in_user(src.api_url, src.api_token) + entry["status"] = "ok" + entry["api"] = "authenticated" + if user: + login = user.get("login") or user.get("email") + if login: + entry["service_account_login"] = login + else: + entry["status"] = "error" + entry["detail"] = err + else: + live_ok, live_err = await grafana_api.health_live(src.api_url) + if live_ok: + entry["status"] = "reachable" + entry["detail"] = "задай api_token в GRAFANA_SOURCES_JSON для проверки API" + else: + entry["status"] = "error" + entry["detail"] = live_err + instances.append(entry) + + if all(i.get("status") == "ok" for i in instances): + agg = "ok" + elif any(i.get("status") == "error" for i in instances): + agg = "error" else: - out["grafana"] = {"status": "error", "detail": live_err, "url": gu} + agg = "reachable" + out["grafana"] = { + "status": agg, + "instances": instances, + } fj = settings.forgejo_url.strip() if not fj: diff --git a/pyproject.toml b/pyproject.toml index 2585536..0916b31 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "onguard24" -version = "1.6.0" +version = "1.7.0" description = "onGuard24 — модульный сервис (аналог IRM)" readme = "README.md" requires-python = ">=3.11" diff --git a/tests/conftest.py b/tests/conftest.py index 698be3a..64804ee 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -14,6 +14,7 @@ for key in ( "FORGEJO_URL", "FORGEJO_TOKEN", "GRAFANA_WEBHOOK_SECRET", + "GRAFANA_SOURCES_JSON", ): os.environ.pop(key, None) os.environ["DATABASE_URL"] = "" diff --git a/tests/irm_db_fake.py b/tests/irm_db_fake.py new file mode 100644 index 0000000..5d22c1a --- /dev/null +++ b/tests/irm_db_fake.py @@ -0,0 +1,286 @@ +"""In-memory «пул» для тестов IRM без реального PostgreSQL.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import datetime, timezone +from typing import Any +from uuid import UUID, uuid4 + + +def _now() -> datetime: + return datetime.now(timezone.utc) + + +@dataclass +class Row: + """Минимальная обёртка под asyncpg.Record (доступ по ключу).""" + + _data: dict[str, Any] + + def __getitem__(self, key: str) -> Any: + return self._data[key] + + def get(self, key: str, default: Any = None) -> Any: + return self._data.get(key, default) + + +class IrmFakeConn: + def __init__(self, store: IrmFakeStore) -> None: + self.store = store + + def _q(self, query: str) -> str: + return " ".join(query.split()) + + async def execute(self, query: str, *args: Any) -> str: + q = self._q(query) + if "INSERT INTO incidents" in q and "ingress_event_id" in q: + self.store.insert_incident_alert( + args[0], args[1], args[2], args[3], args[4] + ) + return "INSERT 0 1" + raise AssertionError(f"execute not implemented: {q[:80]}") + + async def fetchval(self, query: str, *args: Any) -> Any: + q = self._q(query) + if "count(*)" in q and "FROM incidents" in q and "escalation" not in q: + return len(self.store.incidents) + if "count(*)" in q and "FROM tasks" in q: + return len(self.store.tasks) + if "count(*)" in q and "escalation_policies" in q: + return sum(1 for p in self.store.policies.values() if p["enabled"]) + if "SELECT 1 FROM incidents WHERE id" in q: + uid = args[0] + return 1 if uid in self.store.incidents else None + raise AssertionError(f"fetchval not implemented: {q[:100]}") + + async def fetch(self, query: str, *args: Any) -> list[Row]: + q = self._q(query) + if "FROM incidents" in q and "ORDER BY created_at DESC" in q: + rows = sorted(self.store.incidents.values(), key=lambda x: x["created_at"], reverse=True) + if "grafana_org_slug =" in q and "service_name =" in q: + rows = [ + r + for r in rows + if r.get("grafana_org_slug") == args[0] + and r.get("service_name") == args[1] + ] + lim = args[2] + elif "grafana_org_slug =" in q: + rows = [r for r in rows if r.get("grafana_org_slug") == args[0]] + lim = args[1] + elif "service_name =" in q: + rows = [r for r in rows if r.get("service_name") == args[0]] + lim = args[1] + else: + lim = args[0] + return [Row(dict(r)) for r in rows[:lim]] + if "FROM tasks" in q and "WHERE incident_id" in q and "ORDER BY" in q: + iid, lim = args[0], args[1] + match = [t for t in self.store.tasks.values() if t["incident_id"] == iid] + match.sort(key=lambda x: x["created_at"], reverse=True) + return [Row(dict(t)) for t in match[:lim]] + if "FROM tasks" in q and "WHERE incident_id" in q: + iid, lim = args[0], args[1] + match = [t for t in self.store.tasks.values() if t["incident_id"] == iid] + match.sort(key=lambda x: x["created_at"], reverse=True) + return [Row(dict(t)) for t in match[:lim]] + if "FROM tasks t" in q or ("FROM tasks" in q and "ORDER BY t.created_at" in q): + lim = args[0] + rows = sorted(self.store.tasks.values(), key=lambda x: x["created_at"], reverse=True)[:lim] + return [Row(dict(r)) for r in rows] + if "FROM tasks" in q and "ORDER BY created_at DESC" in q and "WHERE" not in q: + lim = args[0] + rows = sorted(self.store.tasks.values(), key=lambda x: x["created_at"], reverse=True)[:lim] + return [Row(dict(r)) for r in rows] + if "FROM escalation_policies" in q and "ORDER BY name" in q: + rows = sorted(self.store.policies.values(), key=lambda x: x["name"]) + return [Row(dict(r)) for r in rows] + raise AssertionError(f"fetch not implemented: {q[:120]}") + + async def fetchrow(self, query: str, *args: Any) -> Row | None: + q = self._q(query) + if "INSERT INTO incidents" in q and "VALUES ($1, $2, $3, 'manual'" in q: + return Row(self.store.insert_incident_manual(args[0], args[1], args[2])) + if "FROM incidents WHERE id" in q and "UPDATE" not in q and "/tasks" not in query.lower(): + return self.store.get_incident(args[0]) + if "UPDATE incidents SET" in q: + return self.store.update_incident(args[0], args[1], args[2], args[3]) + if "INSERT INTO tasks" in q: + return Row(self.store.insert_task(args[0], args[1])) + if "FROM tasks WHERE id" in q and "UPDATE" not in q: + tid = args[0] + t = self.store.tasks.get(tid) + return Row(dict(t)) if t else None + if "UPDATE tasks SET" in q: + return self.store.update_task(args[0], args[1], args[2]) + if "INSERT INTO escalation_policies" in q: + return Row(self.store.insert_policy(args[0], args[1], args[2])) + if "FROM escalation_policies WHERE id" in q and "UPDATE" not in q and "DELETE" not in q: + pid = args[0] + p = self.store.policies.get(pid) + return Row(dict(p)) if p else None + if "UPDATE escalation_policies SET" in q: + return self.store.update_policy(args[0], args[1], args[2], args[3]) + if "DELETE FROM escalation_policies" in q: + return self.store.delete_policy(args[0]) + raise AssertionError(f"fetchrow not implemented: {q[:120]}") + + +@dataclass +class IrmFakeStore: + incidents: dict[UUID, dict[str, Any]] = field(default_factory=dict) + tasks: dict[UUID, dict[str, Any]] = field(default_factory=dict) + policies: dict[UUID, dict[str, Any]] = field(default_factory=dict) + + def insert_incident_alert( + self, + title: str, + sev: str, + ingress_id: UUID, + grafana_org_slug: Any, + service_name: Any, + ) -> None: + iid = uuid4() + now = _now() + self.incidents[iid] = { + "id": iid, + "title": title, + "status": "open", + "severity": sev, + "source": "grafana", + "ingress_event_id": ingress_id, + "created_at": now, + "updated_at": now, + "grafana_org_slug": grafana_org_slug, + "service_name": service_name, + } + + def insert_incident_manual(self, title: str, status: str, severity: str) -> dict[str, Any]: + iid = uuid4() + now = _now() + row = { + "id": iid, + "title": title, + "status": status, + "severity": severity, + "source": "manual", + "ingress_event_id": None, + "created_at": now, + "updated_at": now, + "grafana_org_slug": None, + "service_name": None, + } + self.incidents[iid] = row + return row + + def get_incident(self, iid: UUID) -> Row | None: + r = self.incidents.get(iid) + return Row(dict(r)) if r else None + + def update_incident( + self, + iid: UUID, + title: str | None, + status: str | None, + severity: str | None, + ) -> Row | None: + r = self.incidents.get(iid) + if not r: + return None + if title is not None: + r["title"] = title + if status is not None: + r["status"] = status + if severity is not None: + r["severity"] = severity + r["updated_at"] = _now() + return Row(dict(r)) + + def insert_task(self, title: str, incident_id: UUID | None) -> dict[str, Any]: + tid = uuid4() + now = _now() + row = { + "id": tid, + "incident_id": incident_id, + "title": title, + "status": "open", + "created_at": now, + } + self.tasks[tid] = row + return row + + def update_task(self, tid: UUID, title: str | None, status: str | None) -> Row | None: + r = self.tasks.get(tid) + if not r: + return None + if title is not None: + r["title"] = title + if status is not None: + r["status"] = status + return Row(dict(r)) + + def insert_policy(self, name: str, enabled: bool, steps_json: str) -> dict[str, Any]: + import json + + pid = uuid4() + now = _now() + steps = json.loads(steps_json) + row = { + "id": pid, + "name": name, + "enabled": enabled, + "steps": steps, + "created_at": now, + } + self.policies[pid] = row + return row + + def update_policy( + self, + pid: UUID, + name: str | None, + enabled: bool | None, + steps_json: str | None, + ) -> Row | None: + import json + + r = self.policies.get(pid) + if not r: + return None + if name is not None: + r["name"] = name + if enabled is not None: + r["enabled"] = enabled + if steps_json is not None: + r["steps"] = json.loads(steps_json) + return Row(dict(r)) + + def delete_policy(self, pid: UUID) -> Row | None: + if pid not in self.policies: + return None + self.policies.pop(pid) + return Row({"id": pid}) + + +class IrmFakeAcquire: + def __init__(self, store: IrmFakeStore) -> None: + self.store = store + + async def __aenter__(self) -> IrmFakeConn: + return IrmFakeConn(self.store) + + async def __aexit__(self, *args: Any) -> None: + pass + + +class IrmFakePool: + def __init__(self, store: IrmFakeStore | None = None) -> None: + self._store = store or IrmFakeStore() + + def acquire(self) -> IrmFakeAcquire: + return IrmFakeAcquire(self._store) + + @property + def store(self) -> IrmFakeStore: + return self._store diff --git a/tests/test_grafana_catalog_api.py b/tests/test_grafana_catalog_api.py new file mode 100644 index 0000000..358cda7 --- /dev/null +++ b/tests/test_grafana_catalog_api.py @@ -0,0 +1,19 @@ +"""HTTP-обёртки каталога Grafana без реальной БД.""" + +from fastapi.testclient import TestClient + + +def test_grafana_catalog_sync_requires_db(client: TestClient) -> None: + r = client.post("/api/v1/modules/grafana-catalog/sync", json={}) + assert r.status_code == 503 + + +def test_grafana_catalog_meta_no_db(client: TestClient) -> None: + r = client.get("/api/v1/modules/grafana-catalog/meta") + assert r.status_code == 200 + assert r.json().get("database") == "disabled" + + +def test_grafana_catalog_tree_requires_db(client: TestClient) -> None: + r = client.get("/api/v1/modules/grafana-catalog/tree?instance_slug=default") + assert r.status_code == 503 diff --git a/tests/test_grafana_topology.py b/tests/test_grafana_topology.py new file mode 100644 index 0000000..23390a0 --- /dev/null +++ b/tests/test_grafana_topology.py @@ -0,0 +1,93 @@ +"""Парсинг Ruler / слияние папок и HTTP-mock синхронизации Grafana.""" + +import httpx +import pytest +import respx + +from onguard24.integrations.grafana_topology import ( + merge_folder_rows, + parse_ruler_rules, +) +from onguard24.modules.grafana_catalog import pull_topology + + +def test_parse_ruler_grafana_managed() -> None: + data = { + "nginx": [ + { + "name": "prometheus (Nginx)", + "interval": "60s", + "rules": [ + { + "grafana_alert": {"uid": "uid1", "title": "Nginx Down"}, + "labels": {"service": "nginx", "severity": "critical"}, + } + ], + } + ] + } + rows = parse_ruler_rules(data) + assert len(rows) == 1 + assert rows[0].namespace_uid == "nginx" + assert rows[0].rule_uid == "uid1" + assert rows[0].rule_group_name == "prometheus (Nginx)" + assert rows[0].labels["service"] == "nginx" + + +def test_parse_ruler_skips_non_grafana_alert() -> None: + data = {"x": [{"name": "g", "rules": [{"expr": "1"}]}]} + assert parse_ruler_rules(data) == [] + + +def test_merge_folder_rows_adds_namespaces() -> None: + api = [{"uid": "system", "title": "System", "parentUid": None}] + merged = merge_folder_rows(api, {"nginx", "system"}) + uids = {m[0] for m in merged} + assert uids == {"system", "nginx"} + titles = {m[0]: m[1] for m in merged} + assert titles["nginx"] == "nginx" + + +@pytest.mark.asyncio +@respx.mock +async def test_pull_topology_end_to_end() -> None: + base = "https://grafana.example.com" + respx.get(f"{base}/api/org").mock( + return_value=httpx.Response(200, json={"id": 3, "name": "adibrov"}) + ) + def _folders(request: httpx.Request) -> httpx.Response: + if "parentUid" in str(request.url): + return httpx.Response(200, json=[]) + return httpx.Response( + 200, + json=[{"uid": "nginx", "title": "Nginx Alerts", "parentUid": None}], + ) + + respx.get(f"{base}/api/folders").mock(side_effect=_folders) + ruler_body = { + "nginx": [ + { + "name": "grp", + "interval": "1m", + "rules": [ + { + "grafana_alert": {"uid": "r1", "title": "Down"}, + "labels": {"service": "nginx"}, + } + ], + } + ] + } + respx.get(f"{base}/api/ruler/grafana/api/v1/rules").mock( + return_value=httpx.Response(200, json=ruler_body) + ) + + out, err = await pull_topology(base, "test-token") + assert err is None + assert out is not None + assert out.org_id == 3 + assert out.org_name == "adibrov" + assert len(out.folder_rows) == 1 + assert out.folder_rows[0][0] == "nginx" + assert len(out.rules) == 1 + assert out.rules[0].rule_uid == "r1" diff --git a/tests/test_ingress.py b/tests/test_ingress.py index da2ad8b..2e402f2 100644 --- a/tests/test_ingress.py +++ b/tests/test_ingress.py @@ -63,6 +63,35 @@ def test_grafana_webhook_inserts_with_mock_pool(client: TestClient) -> None: app.state.pool = real_pool +def test_grafana_webhook_auto_org_from_external_url(client: TestClient) -> None: + from uuid import uuid4 + + mock_conn = AsyncMock() + uid = uuid4() + mock_conn.fetchrow = AsyncMock(return_value={"id": uid}) + mock_cm = AsyncMock() + mock_cm.__aenter__ = AsyncMock(return_value=mock_conn) + mock_cm.__aexit__ = AsyncMock(return_value=None) + mock_pool = MagicMock() + mock_pool.acquire = MagicMock(return_value=mock_cm) + + app = client.app + real_pool = app.state.pool + app.state.pool = mock_pool + try: + r = client.post( + "/api/v1/ingress/grafana", + content=json.dumps( + {"externalURL": "https://grafana-adibrov.example.com/", "title": "x"} + ), + headers={"Content-Type": "application/json"}, + ) + assert r.status_code == 202 + assert mock_conn.fetchrow.call_args[0][3] == "grafana-adibrov.example.com" + finally: + app.state.pool = real_pool + + def test_grafana_webhook_publishes_alert_received(client: TestClient) -> None: from unittest.mock import patch from uuid import uuid4 @@ -92,3 +121,66 @@ def test_grafana_webhook_publishes_alert_received(client: TestClient) -> None: assert spy.await_args.kwargs.get("raw_payload_ref") == uid finally: app.state.pool = real_pool + + +def test_grafana_webhook_org_any_slug_without_json_config(client: TestClient) -> None: + """Путь /{slug} не требует GRAFANA_SOURCES_JSON — slug просто сохраняется.""" + from uuid import uuid4 + + mock_conn = AsyncMock() + uid = uuid4() + mock_conn.fetchrow = AsyncMock(return_value={"id": uid}) + mock_cm = AsyncMock() + mock_cm.__aenter__ = AsyncMock(return_value=mock_conn) + mock_cm.__aexit__ = AsyncMock(return_value=None) + mock_pool = MagicMock() + mock_pool.acquire = MagicMock(return_value=mock_cm) + + app = client.app + real_pool = app.state.pool + app.state.pool = mock_pool + try: + r = client.post( + "/api/v1/ingress/grafana/other", + content=b"{}", + headers={"Content-Type": "application/json"}, + ) + assert r.status_code == 202 + assert mock_conn.fetchrow.call_args[0][3] == "other" + finally: + app.state.pool = real_pool + + +def test_grafana_webhook_org_ok(client: TestClient) -> None: + from uuid import uuid4 + + mock_conn = AsyncMock() + uid = uuid4() + mock_conn.fetchrow = AsyncMock(return_value={"id": uid}) + mock_cm = AsyncMock() + mock_cm.__aenter__ = AsyncMock(return_value=mock_conn) + mock_cm.__aexit__ = AsyncMock(return_value=None) + mock_pool = MagicMock() + mock_pool.acquire = MagicMock(return_value=mock_cm) + + app = client.app + real_json = app.state.settings.grafana_sources_json + real_pool = app.state.pool + app.state.settings.grafana_sources_json = ( + '[{"slug":"adibrov","api_url":"http://192.168.0.1:3000","api_token":"","webhook_secret":""}]' + ) + app.state.pool = mock_pool + try: + r = client.post( + "/api/v1/ingress/grafana/adibrov", + content='{"title":"t"}', + headers={"Content-Type": "application/json"}, + ) + assert r.status_code == 202 + call = mock_conn.fetchrow.call_args + assert "org_slug" in call[0][0].lower() or "org_slug" in str(call) + assert call[0][1] == "grafana" + assert call[0][3] == "adibrov" + finally: + app.state.settings.grafana_sources_json = real_json + app.state.pool = real_pool diff --git a/tests/test_irm_api_with_fake_db.py b/tests/test_irm_api_with_fake_db.py new file mode 100644 index 0000000..39468c2 --- /dev/null +++ b/tests/test_irm_api_with_fake_db.py @@ -0,0 +1,118 @@ +"""IRM API с подменённым пулом БД (без PostgreSQL).""" + +from __future__ import annotations + +import pytest +from fastapi.testclient import TestClient +from starlette.requests import Request + +from onguard24.deps import get_pool +from onguard24.main import app + +from tests.irm_db_fake import IrmFakePool + + +@pytest.fixture +def irm_client() -> tuple[TestClient, IrmFakePool]: + pool = IrmFakePool() + + def override_get_pool(_request: Request): + return pool + + app.dependency_overrides[get_pool] = override_get_pool + with TestClient(app) as client: + yield client, pool + app.dependency_overrides.pop(get_pool, None) + + +def test_irm_incident_crud_and_tasks(irm_client: tuple[TestClient, IrmFakePool]) -> None: + client, _pool = irm_client + r = client.post( + "/api/v1/modules/incidents/", + json={"title": "Сбой API", "status": "open", "severity": "critical"}, + ) + assert r.status_code == 201 + iid = r.json()["id"] + assert r.json()["source"] == "manual" + + r = client.get(f"/api/v1/modules/incidents/{iid}") + assert r.status_code == 200 + assert r.json()["title"] == "Сбой API" + + r = client.patch(f"/api/v1/modules/incidents/{iid}", json={"status": "resolved"}) + assert r.status_code == 200 + assert r.json()["status"] == "resolved" + + r = client.post( + "/api/v1/modules/tasks/", + json={"title": "Разбор логов", "incident_id": iid}, + ) + assert r.status_code == 201 + tid = r.json()["id"] + + r = client.get(f"/api/v1/modules/incidents/{iid}/tasks") + assert r.status_code == 200 + assert len(r.json()["items"]) == 1 + assert r.json()["items"][0]["id"] == tid + + r = client.get(f"/api/v1/modules/tasks/{tid}") + assert r.status_code == 200 + assert r.json()["status"] == "open" + + r = client.patch(f"/api/v1/modules/tasks/{tid}", json={"status": "done"}) + assert r.status_code == 200 + assert r.json()["status"] == "done" + + +def test_irm_task_bad_incident(irm_client: tuple[TestClient, IrmFakePool]) -> None: + client, _ = irm_client + import uuid + + r = client.post( + "/api/v1/modules/tasks/", + json={"title": "x", "incident_id": str(uuid.uuid4())}, + ) + assert r.status_code == 400 + assert r.json()["detail"] == "incident not found" + + +def test_irm_incident_tasks_unknown(irm_client: tuple[TestClient, IrmFakePool]) -> None: + client, _ = irm_client + import uuid + + rid = str(uuid.uuid4()) + r = client.get(f"/api/v1/modules/incidents/{rid}/tasks") + assert r.status_code == 404 + + +def test_irm_patch_validation(irm_client: tuple[TestClient, IrmFakePool]) -> None: + client, _ = irm_client + r = client.post("/api/v1/modules/incidents/", json={"title": "t"}) + iid = r.json()["id"] + r = client.patch(f"/api/v1/modules/incidents/{iid}", json={}) + assert r.status_code == 400 + + +def test_irm_escalations_crud(irm_client: tuple[TestClient, IrmFakePool]) -> None: + client, _ = irm_client + r = client.post( + "/api/v1/modules/escalations/", + json={"name": "L1", "enabled": True, "steps": [{"after_min": 5, "channel": "slack"}]}, + ) + assert r.status_code == 201 + pid = r.json()["id"] + assert r.json()["steps"][0]["channel"] == "slack" + + r = client.get(f"/api/v1/modules/escalations/{pid}") + assert r.status_code == 200 + assert r.json()["name"] == "L1" + + r = client.patch(f"/api/v1/modules/escalations/{pid}", json={"enabled": False}) + assert r.status_code == 200 + assert r.json()["enabled"] is False + + r = client.delete(f"/api/v1/modules/escalations/{pid}") + assert r.status_code == 204 + + r = client.get(f"/api/v1/modules/escalations/{pid}") + assert r.status_code == 404 diff --git a/tests/test_irm_modules.py b/tests/test_irm_modules.py index 61f64b2..5dadf78 100644 --- a/tests/test_irm_modules.py +++ b/tests/test_irm_modules.py @@ -62,6 +62,8 @@ async def test_incident_inserted_on_alert_received() -> None: assert inserted["args"][0] == "CPU high" assert inserted["args"][1] == "warning" assert inserted["args"][2] == uid + assert inserted["args"][3] is None + assert inserted["args"][4] is None def test_incidents_post_requires_db(client: TestClient) -> None: diff --git a/tests/test_root_ui.py b/tests/test_root_ui.py index c3d4467..957e347 100644 --- a/tests/test_root_ui.py +++ b/tests/test_root_ui.py @@ -32,6 +32,7 @@ def test_rail_lists_all_registered_ui_modules(client: TestClient) -> None: assert r.status_code == 200 t = r.text expected = ( + ("grafana-catalog", "Каталог Grafana"), ("incidents", "Инциденты"), ("tasks", "Задачи"), ("escalations", "Эскалации"), @@ -47,6 +48,7 @@ def test_rail_lists_all_registered_ui_modules(client: TestClient) -> None: def test_each_module_page_single_active_nav_item(client: TestClient) -> None: """На странице модуля ровно один пункт с aria-current (текущий раздел).""" for slug in ( + "grafana-catalog", "incidents", "tasks", "escalations", diff --git a/tests/test_status.py b/tests/test_status.py index 52a3661..5879df3 100644 --- a/tests/test_status.py +++ b/tests/test_status.py @@ -42,6 +42,7 @@ def test_status_with_mocks(client: TestClient) -> None: vault_token="t", grafana_url="https://grafana.example", grafana_service_account_token="g", + grafana_sources_json="", forgejo_url="https://git.example", forgejo_token="f", grafana_webhook_secret="", @@ -57,5 +58,6 @@ def test_status_with_mocks(client: TestClient) -> None: d = r.json() assert d["vault"]["status"] == "ok" assert d["grafana"]["status"] == "ok" - assert d["grafana"].get("service_account_login") == "tester" + assert len(d["grafana"]["instances"]) == 1 + assert d["grafana"]["instances"][0].get("service_account_login") == "tester" assert d["forgejo"]["status"] == "ok"