"""Чтение иерархии Grafana: org, папки, managed alert rules (Ruler API).""" from __future__ import annotations import json import logging from dataclasses import dataclass from typing import Any import httpx log = logging.getLogger(__name__) _HEADERS_JSON = {"Accept": "application/json"} def _auth_headers(token: str) -> dict[str, str]: return {**_HEADERS_JSON, "Authorization": f"Bearer {token.strip()}"} @dataclass class ParsedRuleRow: namespace_uid: str rule_group_name: str rule_group_interval: str | None rule_uid: str title: str labels: dict[str, Any] async def fetch_org(base_url: str, token: str) -> tuple[dict[str, Any] | None, str | None]: base = base_url.rstrip("/") try: async with httpx.AsyncClient(timeout=30.0, verify=True, follow_redirects=True) as client: r = await client.get(f"{base}/api/org", headers=_auth_headers(token)) except Exception as e: return None, str(e) if r.status_code != 200: return None, f"http {r.status_code}: {(r.text or '')[:300]}" try: return r.json(), None except Exception: return None, "invalid json from /api/org" async def fetch_all_folders(base_url: str, token: str) -> tuple[list[dict[str, Any]], str | None]: """Обход дерева папок через /api/folders?parentUid=…""" base = base_url.rstrip("/") out: list[dict[str, Any]] = [] seen: set[str] = set() queue: list[str | None] = [None] try: async with httpx.AsyncClient(timeout=60.0, verify=True, follow_redirects=True) as client: while queue: parent_uid = queue.pop(0) params: dict[str, str] = {} if parent_uid is not None: params["parentUid"] = parent_uid r = await client.get(f"{base}/api/folders", headers=_auth_headers(token), params=params) if r.status_code != 200: return out, f"folders http {r.status_code}: {(r.text or '')[:200]}" try: chunk = r.json() except Exception: return out, "invalid json from /api/folders" if not isinstance(chunk, list): return out, "folders response is not a list" for f in chunk: if not isinstance(f, dict): continue uid = f.get("uid") if not uid or uid in seen: continue seen.add(str(uid)) out.append(f) queue.append(str(uid)) except Exception as e: return out, str(e) return out, None async def fetch_ruler_rules_raw(base_url: str, token: str) -> tuple[dict[str, Any] | None, str | None]: """GET Ruler API для Grafana-managed правил (namespace → группы → rules).""" base = base_url.rstrip("/") paths = ( "/api/ruler/grafana/api/v1/rules", "/api/ruler/Grafana/api/v1/rules", ) last_err: str | None = None try: async with httpx.AsyncClient(timeout=90.0, verify=True, follow_redirects=True) as client: for path in paths: r = await client.get(f"{base}{path}", headers=_auth_headers(token)) if r.status_code == 200: try: data = r.json() except Exception: return None, "invalid json from ruler" if isinstance(data, dict): return data, None return None, "ruler response is not an object" last_err = f"ruler {path} http {r.status_code}: {(r.text or '')[:200]}" except Exception as e: return None, str(e) return None, last_err or "ruler: no path matched" def parse_ruler_rules(data: dict[str, Any]) -> list[ParsedRuleRow]: rows: list[ParsedRuleRow] = [] for namespace_uid, groups in data.items(): if not isinstance(namespace_uid, str) or not namespace_uid.strip(): continue ns = namespace_uid.strip() if not isinstance(groups, list): continue for grp in groups: if not isinstance(grp, dict): continue gname = str(grp.get("name") or "group") interval = grp.get("interval") interval_s = str(interval) if interval is not None else None rules = grp.get("rules") if not isinstance(rules, list): continue for rule in rules: if not isinstance(rule, dict): continue ga = rule.get("grafana_alert") if not isinstance(ga, dict): continue uid = ga.get("uid") or rule.get("uid") if not uid: continue title = str(ga.get("title") or gname or uid) labels = rule.get("labels") if not isinstance(labels, dict): labels = {} lbl = {str(k): str(v) for k, v in labels.items() if v is not None} rows.append( ParsedRuleRow( namespace_uid=ns, rule_group_name=gname, rule_group_interval=interval_s, rule_uid=str(uid), title=title[:500], labels=lbl, ) ) return rows def merge_folder_rows( api_folders: list[dict[str, Any]], rule_namespaces: set[str], ) -> list[tuple[str, str, str | None]]: """(uid, title, parent_uid); добавляем namespace из ruler без записи в /api/folders.""" by_uid: dict[str, tuple[str, str | None]] = {} for f in api_folders: uid = f.get("uid") if not uid: continue p = f.get("parentUid") parent = str(p) if p else None by_uid[str(uid)] = (str(f.get("title") or uid), parent) for ns in rule_namespaces: if ns not in by_uid: by_uid[ns] = (ns, None) return [(uid, t[0], t[1]) for uid, t in sorted(by_uid.items(), key=lambda x: x[0])]