78 lines
2.8 KiB
Python
78 lines
2.8 KiB
Python
"""Единая сборка ответа /api/v1/status (БД, Vault, Grafana, Forgejo)."""
|
|
|
|
from fastapi import Request
|
|
|
|
from onguard24.config import Settings
|
|
from onguard24.integrations import forgejo_api, grafana_api
|
|
from onguard24.vaultcheck import ping as vault_ping
|
|
|
|
|
|
async def build(request: Request) -> dict:
|
|
out: dict = {"service": "onGuard24"}
|
|
pool = getattr(request.app.state, "pool", None)
|
|
settings: Settings = request.app.state.settings
|
|
|
|
if pool:
|
|
try:
|
|
async with pool.acquire() as conn:
|
|
await conn.fetchval("SELECT 1")
|
|
out["database"] = {"status": "ok"}
|
|
except Exception as e:
|
|
out["database"] = {"status": "error", "detail": str(e)}
|
|
else:
|
|
out["database"] = "disabled"
|
|
|
|
if settings.vault_addr and settings.vault_token:
|
|
ok, err = await vault_ping(settings.vault_addr, settings.vault_token)
|
|
if ok:
|
|
out["vault"] = {"status": "ok", "url": settings.vault_addr}
|
|
else:
|
|
out["vault"] = {"status": "error", "detail": err, "url": settings.vault_addr}
|
|
else:
|
|
out["vault"] = "disabled"
|
|
|
|
gu = settings.grafana_url.strip()
|
|
if not gu:
|
|
out["grafana"] = "disabled"
|
|
elif settings.grafana_service_account_token.strip():
|
|
ok, err = await grafana_api.ping(gu, settings.grafana_service_account_token)
|
|
if ok:
|
|
user, _ = await grafana_api.get_signed_in_user(gu, settings.grafana_service_account_token)
|
|
entry: dict = {"status": "ok", "url": gu, "api": "authenticated"}
|
|
if user:
|
|
login = user.get("login") or user.get("email")
|
|
if login:
|
|
entry["service_account_login"] = login
|
|
out["grafana"] = entry
|
|
else:
|
|
out["grafana"] = {"status": "error", "detail": err, "url": gu}
|
|
else:
|
|
live_ok, live_err = await grafana_api.health_live(gu)
|
|
if live_ok:
|
|
out["grafana"] = {
|
|
"status": "reachable",
|
|
"url": gu,
|
|
"detail": "задай GRAFANA_SERVICE_ACCOUNT_TOKEN для вызовов API",
|
|
}
|
|
else:
|
|
out["grafana"] = {"status": "error", "detail": live_err, "url": gu}
|
|
|
|
fj = settings.forgejo_url.strip()
|
|
if not fj:
|
|
out["forgejo"] = "disabled"
|
|
elif settings.forgejo_token.strip():
|
|
entry_fj = await forgejo_api.probe(fj, settings.forgejo_token)
|
|
out["forgejo"] = entry_fj
|
|
else:
|
|
pub_ok, pub_err = await forgejo_api.health_public(fj)
|
|
if pub_ok:
|
|
out["forgejo"] = {
|
|
"status": "reachable",
|
|
"url": fj,
|
|
"detail": "задай FORGEJO_TOKEN (Personal Access Token в Forgejo)",
|
|
}
|
|
else:
|
|
out["forgejo"] = {"status": "error", "detail": pub_err, "url": fj}
|
|
|
|
return out
|