"""IRM: цепочки эскалаций (политики в JSON, дальше — исполнение по шагам).""" from __future__ import annotations import html import json from uuid import UUID import asyncpg from fastapi import APIRouter, Depends, HTTPException, Request from fastapi.responses import HTMLResponse from pydantic import BaseModel, Field from onguard24.deps import get_pool from onguard24.domain.events import EventBus from onguard24.modules.ui_support import wrap_module_html_page router = APIRouter(tags=["module-escalations"]) ui_router = APIRouter(tags=["web-escalations"], include_in_schema=False) class PolicyCreate(BaseModel): name: str = Field(..., min_length=1, max_length=200) enabled: bool = True steps: list[dict] = Field(default_factory=list) class PolicyPatch(BaseModel): name: str | None = Field(default=None, min_length=1, max_length=200) enabled: bool | None = None steps: list[dict] | None = None def register_events(_bus: EventBus, _pool: asyncpg.Pool | None = None) -> None: pass async def render_home_fragment(request: Request) -> str: pool = get_pool(request) if pool is None: return '

Нужна БД для политик эскалации.

' try: async with pool.acquire() as conn: n = await conn.fetchval("SELECT count(*)::int FROM escalation_policies WHERE enabled = true") except Exception: return '

Таблица политик недоступна (миграции?).

' return f'

Активных политик: {int(n)}

' @router.get("/") async def list_policies_api(pool: asyncpg.Pool | None = Depends(get_pool)): if pool is None: return {"items": [], "database": "disabled"} async with pool.acquire() as conn: rows = await conn.fetch( """ SELECT id, name, enabled, steps, created_at FROM escalation_policies ORDER BY name """ ) items = [] for r in rows: steps = r["steps"] if isinstance(steps, str): steps = json.loads(steps) items.append( { "id": str(r["id"]), "name": r["name"], "enabled": r["enabled"], "steps": steps if isinstance(steps, list) else [], "created_at": r["created_at"].isoformat() if r["created_at"] else None, } ) return {"items": items} @router.post("/", status_code=201) async def create_policy_api(body: PolicyCreate, pool: asyncpg.Pool | None = Depends(get_pool)): if pool is None: raise HTTPException(status_code=503, detail="database disabled") async with pool.acquire() as conn: row = await conn.fetchrow( """ INSERT INTO escalation_policies (name, enabled, steps) VALUES ($1, $2, $3::jsonb) RETURNING id, name, enabled, steps, created_at """, body.name.strip(), body.enabled, json.dumps(body.steps), ) steps = row["steps"] return { "id": str(row["id"]), "name": row["name"], "enabled": row["enabled"], "steps": list(steps) if steps else [], "created_at": row["created_at"].isoformat() if row["created_at"] else None, } def _policy_dict(row) -> dict: steps = row["steps"] if isinstance(steps, str): steps = json.loads(steps) return { "id": str(row["id"]), "name": row["name"], "enabled": row["enabled"], "steps": steps if isinstance(steps, list) else [], "created_at": row["created_at"].isoformat() if row["created_at"] else None, } @router.get("/{policy_id}") async def get_policy_api(policy_id: UUID, pool: asyncpg.Pool | None = Depends(get_pool)): if pool is None: raise HTTPException(status_code=503, detail="database disabled") async with pool.acquire() as conn: row = await conn.fetchrow( """ SELECT id, name, enabled, steps, created_at FROM escalation_policies WHERE id = $1::uuid """, policy_id, ) if not row: raise HTTPException(status_code=404, detail="not found") return _policy_dict(row) @router.patch("/{policy_id}") async def patch_policy_api( policy_id: UUID, body: PolicyPatch, pool: asyncpg.Pool | None = Depends(get_pool), ): if pool is None: raise HTTPException(status_code=503, detail="database disabled") if body.name is None and body.enabled is None and body.steps is None: raise HTTPException(status_code=400, detail="no fields to update") steps_json = json.dumps(body.steps) if body.steps is not None else None async with pool.acquire() as conn: row = await conn.fetchrow( """ UPDATE escalation_policies SET name = COALESCE($2, name), enabled = COALESCE($3, enabled), steps = COALESCE($4::jsonb, steps) WHERE id = $1::uuid RETURNING id, name, enabled, steps, created_at """, policy_id, body.name.strip() if body.name is not None else None, body.enabled, steps_json, ) if not row: raise HTTPException(status_code=404, detail="not found") return _policy_dict(row) @router.delete("/{policy_id}", status_code=204) async def delete_policy_api(policy_id: UUID, pool: asyncpg.Pool | None = Depends(get_pool)): if pool is None: raise HTTPException(status_code=503, detail="database disabled") async with pool.acquire() as conn: row = await conn.fetchrow( "DELETE FROM escalation_policies WHERE id = $1::uuid RETURNING id", policy_id, ) if row is None: raise HTTPException(status_code=404, detail="not found") @ui_router.get("/", response_class=HTMLResponse) async def escalations_ui_home(request: Request): pool = get_pool(request) rows_html = "" err = "" if pool is None: err = "

База данных не настроена.

" else: try: async with pool.acquire() as conn: rows = await conn.fetch( "SELECT id, name, enabled, steps FROM escalation_policies ORDER BY name" ) for r in rows: steps = r["steps"] if hasattr(steps, "__iter__") and not isinstance(steps, (str, bytes)): steps_preview = html.escape(json.dumps(steps, ensure_ascii=False)[:120]) else: steps_preview = "—" rows_html += ( "" f"{html.escape(str(r['id']))[:8]}…" f"{html.escape(r['name'])}" f"{'да' if r['enabled'] else 'нет'}" f"{steps_preview}" "" ) except Exception as e: err = f"

{html.escape(str(e))}

" inner = f"""

Цепочки эскалаций

Заготовка: шаги хранятся в JSON; исполнение по таймерам — следующие версии.

{err} {rows_html or ''}
IDИмяВкл.Шаги (фрагмент)
Нет политик — создайте через API POST
""" return HTMLResponse( wrap_module_html_page( document_title="Эскалации — onGuard24", current_slug="escalations", main_inner_html=inner, ) )