"""IRM: цепочки эскалаций (политики в JSON, дальше — исполнение по шагам)."""
from __future__ import annotations
import html
import json
from uuid import UUID
import asyncpg
from fastapi import APIRouter, Depends, HTTPException, Request
from fastapi.responses import HTMLResponse
from pydantic import BaseModel, Field
from onguard24.deps import get_pool
from onguard24.domain.events import EventBus
from onguard24.modules.ui_support import wrap_module_html_page
router = APIRouter(tags=["module-escalations"])
ui_router = APIRouter(tags=["web-escalations"], include_in_schema=False)
class PolicyCreate(BaseModel):
name: str = Field(..., min_length=1, max_length=200)
enabled: bool = True
steps: list[dict] = Field(default_factory=list)
def register_events(_bus: EventBus, _pool: asyncpg.Pool | None = None) -> None:
pass
async def render_home_fragment(request: Request) -> str:
pool = get_pool(request)
if pool is None:
return '
Нужна БД для политик эскалации.
'
try:
async with pool.acquire() as conn:
n = await conn.fetchval("SELECT count(*)::int FROM escalation_policies WHERE enabled = true")
except Exception:
return 'Таблица политик недоступна (миграции?).
'
return f'Активных политик: {int(n)}
'
@router.get("/")
async def list_policies_api(pool: asyncpg.Pool | None = Depends(get_pool)):
if pool is None:
return {"items": [], "database": "disabled"}
async with pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT id, name, enabled, steps, created_at
FROM escalation_policies
ORDER BY name
"""
)
items = []
for r in rows:
steps = r["steps"]
if isinstance(steps, str):
steps = json.loads(steps)
items.append(
{
"id": str(r["id"]),
"name": r["name"],
"enabled": r["enabled"],
"steps": steps if isinstance(steps, list) else [],
"created_at": r["created_at"].isoformat() if r["created_at"] else None,
}
)
return {"items": items}
@router.post("/", status_code=201)
async def create_policy_api(body: PolicyCreate, pool: asyncpg.Pool | None = Depends(get_pool)):
if pool is None:
raise HTTPException(status_code=503, detail="database disabled")
async with pool.acquire() as conn:
row = await conn.fetchrow(
"""
INSERT INTO escalation_policies (name, enabled, steps)
VALUES ($1, $2, $3::jsonb)
RETURNING id, name, enabled, steps, created_at
""",
body.name.strip(),
body.enabled,
json.dumps(body.steps),
)
steps = row["steps"]
return {
"id": str(row["id"]),
"name": row["name"],
"enabled": row["enabled"],
"steps": list(steps) if steps else [],
"created_at": row["created_at"].isoformat() if row["created_at"] else None,
}
@router.delete("/{policy_id}", status_code=204)
async def delete_policy_api(policy_id: UUID, pool: asyncpg.Pool | None = Depends(get_pool)):
if pool is None:
raise HTTPException(status_code=503, detail="database disabled")
async with pool.acquire() as conn:
row = await conn.fetchrow(
"DELETE FROM escalation_policies WHERE id = $1::uuid RETURNING id",
policy_id,
)
if row is None:
raise HTTPException(status_code=404, detail="not found")
@ui_router.get("/", response_class=HTMLResponse)
async def escalations_ui_home(request: Request):
pool = get_pool(request)
rows_html = ""
err = ""
if pool is None:
err = "База данных не настроена.
"
else:
try:
async with pool.acquire() as conn:
rows = await conn.fetch(
"SELECT id, name, enabled, steps FROM escalation_policies ORDER BY name"
)
for r in rows:
steps = r["steps"]
if hasattr(steps, "__iter__") and not isinstance(steps, (str, bytes)):
steps_preview = html.escape(json.dumps(steps, ensure_ascii=False)[:120])
else:
steps_preview = "—"
rows_html += (
""
f"| {html.escape(str(r['id']))[:8]}… | "
f"{html.escape(r['name'])} | "
f"{'да' if r['enabled'] else 'нет'} | "
f"{steps_preview} | "
"
"
)
except Exception as e:
err = f"{html.escape(str(e))}
"
inner = f"""Цепочки эскалаций
Заготовка: шаги хранятся в JSON; исполнение по таймерам — следующие версии.
{err}
| ID | Имя | Вкл. | Шаги (фрагмент) |
{rows_html or '| Нет политик — создайте через API POST |
'}
"""
return HTMLResponse(
wrap_module_html_page(
document_title="Эскалации — onGuard24",
current_slug="escalations",
main_inner_html=inner,
)
)