Files
onGuard24/onguard24/integrations/grafana_topology.py

170 lines
6.2 KiB
Python
Raw Normal View History

"""Чтение иерархии Grafana: org, папки, managed alert rules (Ruler API)."""
from __future__ import annotations
import json
import logging
from dataclasses import dataclass
from typing import Any
import httpx
log = logging.getLogger(__name__)
_HEADERS_JSON = {"Accept": "application/json"}
def _auth_headers(token: str) -> dict[str, str]:
return {**_HEADERS_JSON, "Authorization": f"Bearer {token.strip()}"}
@dataclass
class ParsedRuleRow:
namespace_uid: str
rule_group_name: str
rule_group_interval: str | None
rule_uid: str
title: str
labels: dict[str, Any]
async def fetch_org(base_url: str, token: str) -> tuple[dict[str, Any] | None, str | None]:
base = base_url.rstrip("/")
try:
async with httpx.AsyncClient(timeout=30.0, verify=True, follow_redirects=True) as client:
r = await client.get(f"{base}/api/org", headers=_auth_headers(token))
except Exception as e:
return None, str(e)
if r.status_code != 200:
return None, f"http {r.status_code}: {(r.text or '')[:300]}"
try:
return r.json(), None
except Exception:
return None, "invalid json from /api/org"
async def fetch_all_folders(base_url: str, token: str) -> tuple[list[dict[str, Any]], str | None]:
"""Обход дерева папок через /api/folders?parentUid=…"""
base = base_url.rstrip("/")
out: list[dict[str, Any]] = []
seen: set[str] = set()
queue: list[str | None] = [None]
try:
async with httpx.AsyncClient(timeout=60.0, verify=True, follow_redirects=True) as client:
while queue:
parent_uid = queue.pop(0)
params: dict[str, str] = {}
if parent_uid is not None:
params["parentUid"] = parent_uid
r = await client.get(f"{base}/api/folders", headers=_auth_headers(token), params=params)
if r.status_code != 200:
return out, f"folders http {r.status_code}: {(r.text or '')[:200]}"
try:
chunk = r.json()
except Exception:
return out, "invalid json from /api/folders"
if not isinstance(chunk, list):
return out, "folders response is not a list"
for f in chunk:
if not isinstance(f, dict):
continue
uid = f.get("uid")
if not uid or uid in seen:
continue
seen.add(str(uid))
out.append(f)
queue.append(str(uid))
except Exception as e:
return out, str(e)
return out, None
async def fetch_ruler_rules_raw(base_url: str, token: str) -> tuple[dict[str, Any] | None, str | None]:
"""GET Ruler API для Grafana-managed правил (namespace → группы → rules)."""
base = base_url.rstrip("/")
paths = (
"/api/ruler/grafana/api/v1/rules",
"/api/ruler/Grafana/api/v1/rules",
)
last_err: str | None = None
try:
async with httpx.AsyncClient(timeout=90.0, verify=True, follow_redirects=True) as client:
for path in paths:
r = await client.get(f"{base}{path}", headers=_auth_headers(token))
if r.status_code == 200:
try:
data = r.json()
except Exception:
return None, "invalid json from ruler"
if isinstance(data, dict):
return data, None
return None, "ruler response is not an object"
last_err = f"ruler {path} http {r.status_code}: {(r.text or '')[:200]}"
except Exception as e:
return None, str(e)
return None, last_err or "ruler: no path matched"
def parse_ruler_rules(data: dict[str, Any]) -> list[ParsedRuleRow]:
rows: list[ParsedRuleRow] = []
for namespace_uid, groups in data.items():
if not isinstance(namespace_uid, str) or not namespace_uid.strip():
continue
ns = namespace_uid.strip()
if not isinstance(groups, list):
continue
for grp in groups:
if not isinstance(grp, dict):
continue
gname = str(grp.get("name") or "group")
interval = grp.get("interval")
interval_s = str(interval) if interval is not None else None
rules = grp.get("rules")
if not isinstance(rules, list):
continue
for rule in rules:
if not isinstance(rule, dict):
continue
ga = rule.get("grafana_alert")
if not isinstance(ga, dict):
continue
uid = ga.get("uid") or rule.get("uid")
if not uid:
continue
title = str(ga.get("title") or gname or uid)
labels = rule.get("labels")
if not isinstance(labels, dict):
labels = {}
lbl = {str(k): str(v) for k, v in labels.items() if v is not None}
rows.append(
ParsedRuleRow(
namespace_uid=ns,
rule_group_name=gname,
rule_group_interval=interval_s,
rule_uid=str(uid),
title=title[:500],
labels=lbl,
)
)
return rows
def merge_folder_rows(
api_folders: list[dict[str, Any]],
rule_namespaces: set[str],
) -> list[tuple[str, str, str | None]]:
"""(uid, title, parent_uid); добавляем namespace из ruler без записи в /api/folders."""
by_uid: dict[str, tuple[str, str | None]] = {}
for f in api_folders:
uid = f.get("uid")
if not uid:
continue
p = f.get("parentUid")
parent = str(p) if p else None
by_uid[str(uid)] = (str(f.get("title") or uid), parent)
for ns in rule_namespaces:
if ns not in by_uid:
by_uid[ns] = (ns, None)
return [(uid, t[0], t[1]) for uid, t in sorted(by_uid.items(), key=lambda x: x[0])]