Files
onGuard24/onguard24/integrations/grafana_api.py

59 lines
2.2 KiB
Python

"""HTTP API Grafana: service account token (Bearer), не пароль пользователя."""
import httpx
# Минимальный набор для проверки и будущих вызовов (алерты, папки и т.д.).
async def ping(base_url: str, token: str) -> tuple[bool, str | None]:
if not base_url.strip() or not token.strip():
return False, "grafana url or token empty"
base = base_url.rstrip("/")
try:
async with httpx.AsyncClient(timeout=15.0, verify=True, follow_redirects=True) as client:
r = await client.get(
f"{base}/api/org",
headers={
"Authorization": f"Bearer {token}",
"Accept": "application/json",
},
)
except Exception as e:
return False, str(e)
if r.status_code == 200:
return True, None
body = (r.text or "")[:300]
return False, f"http {r.status_code}: {body}"
async def get_signed_in_user(base_url: str, token: str) -> tuple[dict | None, str | None]:
"""GET /api/user — удобно убедиться, что токен от service account."""
base = base_url.rstrip("/")
try:
async with httpx.AsyncClient(timeout=15.0, verify=True, follow_redirects=True) as client:
r = await client.get(
f"{base}/api/user",
headers={"Authorization": f"Bearer {token}", "Accept": "application/json"},
)
except Exception as e:
return None, str(e)
if r.status_code != 200:
return None, f"http {r.status_code}"
try:
return r.json(), None
except Exception:
return None, "invalid json"
async def health_live(base_url: str) -> tuple[bool, str | None]:
"""GET /api/health — без авторизации, проверка что инстанс отвечает."""
base = base_url.rstrip("/")
try:
async with httpx.AsyncClient(timeout=10.0, verify=True) as client:
r = await client.get(f"{base}/api/health")
except Exception as e:
return False, str(e)
if r.status_code == 200:
return True, None
return False, f"http {r.status_code}"