release: v1.9.0 — IRM-алерты отдельно от инцидентов
- Alembic 005: таблицы irm_alerts и incident_alert_links - Модуль alerts: API/UI, Ack/Resolve, привязка к инциденту через alert_ids - Вебхук Grafana: одна транзакция ingress + irm_alerts; разбор payload в grafana_payload - По умолчанию инцидент из вебхука не создаётся (AUTO_INCIDENT_FROM_ALERT) - Документация IRM_GRAFANA_PARITY.md, обновления IRM.md и CHANGELOG Made-with: Cursor
This commit is contained in:
53
onguard24/ingress/grafana_payload.py
Normal file
53
onguard24/ingress/grafana_payload.py
Normal file
@ -0,0 +1,53 @@
|
||||
"""Извлечение полей для учёта алерта из тела вебхука Grafana (Unified Alerting)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from typing import Any
|
||||
|
||||
|
||||
def extract_alert_row_from_grafana_body(body: dict[str, Any]) -> tuple[str, str, dict[str, Any], str | None]:
|
||||
"""
|
||||
Возвращает: title, severity (info|warning|critical), labels (dict), fingerprint.
|
||||
"""
|
||||
title = str(body.get("title") or body.get("ruleName") or "").strip()[:500]
|
||||
alerts = body.get("alerts")
|
||||
labels: dict[str, Any] = {}
|
||||
fingerprint: str | None = None
|
||||
sev = "warning"
|
||||
|
||||
if isinstance(alerts, list) and alerts and isinstance(alerts[0], dict):
|
||||
a0 = alerts[0]
|
||||
fp = a0.get("fingerprint")
|
||||
if fp is not None:
|
||||
fingerprint = str(fp)[:500]
|
||||
if isinstance(a0.get("labels"), dict):
|
||||
labels.update(a0["labels"])
|
||||
ann = a0.get("annotations")
|
||||
if isinstance(ann, dict) and not title:
|
||||
title = str(ann.get("summary") or ann.get("description") or "").strip()[:500]
|
||||
|
||||
cl = body.get("commonLabels")
|
||||
if isinstance(cl, dict):
|
||||
for k, v in cl.items():
|
||||
labels.setdefault(k, v)
|
||||
|
||||
if not title and isinstance(alerts, list) and alerts and isinstance(alerts[0], dict):
|
||||
title = str(alerts[0].get("labels", {}).get("alertname") or "").strip()[:500]
|
||||
|
||||
raw_s = None
|
||||
if isinstance(labels.get("severity"), str):
|
||||
raw_s = labels["severity"].lower()
|
||||
elif isinstance(labels.get("priority"), str):
|
||||
raw_s = labels["priority"].lower()
|
||||
if raw_s in ("critical", "error", "fatal"):
|
||||
sev = "critical"
|
||||
elif raw_s in ("warning", "warn"):
|
||||
sev = "warning"
|
||||
elif raw_s in ("info", "informational", "none"):
|
||||
sev = "info"
|
||||
|
||||
# JSONB: только JSON-совместимые значения
|
||||
clean_labels = {str(k): v for k, v in labels.items() if isinstance(v, (str, int, float, bool, type(None)))}
|
||||
|
||||
return title, sev, clean_labels, fingerprint
|
||||
Reference in New Issue
Block a user