"""Извлечение полей для учёта алерта из тела вебхука Grafana (Unified Alerting).""" from __future__ import annotations import json from typing import Any def extract_alert_row_from_grafana_body(body: dict[str, Any]) -> tuple[str, str, dict[str, Any], str | None]: """ Возвращает: title, severity (info|warning|critical), labels (dict), fingerprint. """ title = str(body.get("title") or body.get("ruleName") or "").strip()[:500] alerts = body.get("alerts") labels: dict[str, Any] = {} fingerprint: str | None = None sev = "warning" if isinstance(alerts, list) and alerts and isinstance(alerts[0], dict): a0 = alerts[0] fp = a0.get("fingerprint") if fp is not None: fingerprint = str(fp)[:500] if isinstance(a0.get("labels"), dict): labels.update(a0["labels"]) ann = a0.get("annotations") if isinstance(ann, dict) and not title: title = str(ann.get("summary") or ann.get("description") or "").strip()[:500] cl = body.get("commonLabels") if isinstance(cl, dict): for k, v in cl.items(): labels.setdefault(k, v) if not title and isinstance(alerts, list) and alerts and isinstance(alerts[0], dict): title = str(alerts[0].get("labels", {}).get("alertname") or "").strip()[:500] raw_s = None if isinstance(labels.get("severity"), str): raw_s = labels["severity"].lower() elif isinstance(labels.get("priority"), str): raw_s = labels["priority"].lower() if raw_s in ("critical", "error", "fatal"): sev = "critical" elif raw_s in ("warning", "warn"): sev = "warning" elif raw_s in ("info", "informational", "none"): sev = "info" # JSONB: только JSON-совместимые значения clean_labels = {str(k): v for k, v in labels.items() if isinstance(v, (str, int, float, bool, type(None)))} return title, sev, clean_labels, fingerprint