54 lines
2.0 KiB
Python
54 lines
2.0 KiB
Python
|
|
"""Извлечение полей для учёта алерта из тела вебхука Grafana (Unified Alerting)."""
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import json
|
||
|
|
from typing import Any
|
||
|
|
|
||
|
|
|
||
|
|
def extract_alert_row_from_grafana_body(body: dict[str, Any]) -> tuple[str, str, dict[str, Any], str | None]:
|
||
|
|
"""
|
||
|
|
Возвращает: title, severity (info|warning|critical), labels (dict), fingerprint.
|
||
|
|
"""
|
||
|
|
title = str(body.get("title") or body.get("ruleName") or "").strip()[:500]
|
||
|
|
alerts = body.get("alerts")
|
||
|
|
labels: dict[str, Any] = {}
|
||
|
|
fingerprint: str | None = None
|
||
|
|
sev = "warning"
|
||
|
|
|
||
|
|
if isinstance(alerts, list) and alerts and isinstance(alerts[0], dict):
|
||
|
|
a0 = alerts[0]
|
||
|
|
fp = a0.get("fingerprint")
|
||
|
|
if fp is not None:
|
||
|
|
fingerprint = str(fp)[:500]
|
||
|
|
if isinstance(a0.get("labels"), dict):
|
||
|
|
labels.update(a0["labels"])
|
||
|
|
ann = a0.get("annotations")
|
||
|
|
if isinstance(ann, dict) and not title:
|
||
|
|
title = str(ann.get("summary") or ann.get("description") or "").strip()[:500]
|
||
|
|
|
||
|
|
cl = body.get("commonLabels")
|
||
|
|
if isinstance(cl, dict):
|
||
|
|
for k, v in cl.items():
|
||
|
|
labels.setdefault(k, v)
|
||
|
|
|
||
|
|
if not title and isinstance(alerts, list) and alerts and isinstance(alerts[0], dict):
|
||
|
|
title = str(alerts[0].get("labels", {}).get("alertname") or "").strip()[:500]
|
||
|
|
|
||
|
|
raw_s = None
|
||
|
|
if isinstance(labels.get("severity"), str):
|
||
|
|
raw_s = labels["severity"].lower()
|
||
|
|
elif isinstance(labels.get("priority"), str):
|
||
|
|
raw_s = labels["priority"].lower()
|
||
|
|
if raw_s in ("critical", "error", "fatal"):
|
||
|
|
sev = "critical"
|
||
|
|
elif raw_s in ("warning", "warn"):
|
||
|
|
sev = "warning"
|
||
|
|
elif raw_s in ("info", "informational", "none"):
|
||
|
|
sev = "info"
|
||
|
|
|
||
|
|
# JSONB: только JSON-совместимые значения
|
||
|
|
clean_labels = {str(k): v for k, v in labels.items() if isinstance(v, (str, int, float, bool, type(None)))}
|
||
|
|
|
||
|
|
return title, sev, clean_labels, fingerprint
|