"""In-memory «пул» для тестов IRM без реального PostgreSQL.""" from __future__ import annotations from dataclasses import dataclass, field from datetime import datetime, timezone from typing import Any from uuid import UUID, uuid4 def _now() -> datetime: return datetime.now(timezone.utc) @dataclass class Row: """Минимальная обёртка под asyncpg.Record (доступ по ключу).""" _data: dict[str, Any] def __getitem__(self, key: str) -> Any: return self._data[key] def get(self, key: str, default: Any = None) -> Any: return self._data.get(key, default) class IrmFakeConn: def __init__(self, store: IrmFakeStore) -> None: self.store = store def _q(self, query: str) -> str: return " ".join(query.split()) async def execute(self, query: str, *args: Any) -> str: q = self._q(query) if "INSERT INTO incidents" in q and "ingress_event_id" in q: self.store.insert_incident_alert( args[0], args[1], args[2], args[3], args[4] ) return "INSERT 0 1" raise AssertionError(f"execute not implemented: {q[:80]}") async def fetchval(self, query: str, *args: Any) -> Any: q = self._q(query) if "count(*)" in q and "FROM incidents" in q and "escalation" not in q: return len(self.store.incidents) if "count(*)" in q and "FROM tasks" in q: return len(self.store.tasks) if "count(*)" in q and "escalation_policies" in q: return sum(1 for p in self.store.policies.values() if p["enabled"]) if "SELECT 1 FROM incidents WHERE id" in q: uid = args[0] return 1 if uid in self.store.incidents else None raise AssertionError(f"fetchval not implemented: {q[:100]}") async def fetch(self, query: str, *args: Any) -> list[Row]: q = self._q(query) if "FROM incidents" in q and "ORDER BY created_at DESC" in q: rows = sorted(self.store.incidents.values(), key=lambda x: x["created_at"], reverse=True) if "grafana_org_slug =" in q and "service_name =" in q: rows = [ r for r in rows if r.get("grafana_org_slug") == args[0] and r.get("service_name") == args[1] ] lim = args[2] elif "grafana_org_slug =" in q: rows = [r for r in rows if r.get("grafana_org_slug") == args[0]] lim = args[1] elif "service_name =" in q: rows = [r for r in rows if r.get("service_name") == args[0]] lim = args[1] else: lim = args[0] return [Row(dict(r)) for r in rows[:lim]] if "FROM tasks" in q and "WHERE incident_id" in q and "ORDER BY" in q: iid, lim = args[0], args[1] match = [t for t in self.store.tasks.values() if t["incident_id"] == iid] match.sort(key=lambda x: x["created_at"], reverse=True) return [Row(dict(t)) for t in match[:lim]] if "FROM tasks" in q and "WHERE incident_id" in q: iid, lim = args[0], args[1] match = [t for t in self.store.tasks.values() if t["incident_id"] == iid] match.sort(key=lambda x: x["created_at"], reverse=True) return [Row(dict(t)) for t in match[:lim]] if "FROM tasks t" in q or ("FROM tasks" in q and "ORDER BY t.created_at" in q): lim = args[0] rows = sorted(self.store.tasks.values(), key=lambda x: x["created_at"], reverse=True)[:lim] return [Row(dict(r)) for r in rows] if "FROM tasks" in q and "ORDER BY created_at DESC" in q and "WHERE" not in q: lim = args[0] rows = sorted(self.store.tasks.values(), key=lambda x: x["created_at"], reverse=True)[:lim] return [Row(dict(r)) for r in rows] if "FROM escalation_policies" in q and "ORDER BY name" in q: rows = sorted(self.store.policies.values(), key=lambda x: x["name"]) return [Row(dict(r)) for r in rows] raise AssertionError(f"fetch not implemented: {q[:120]}") async def fetchrow(self, query: str, *args: Any) -> Row | None: q = self._q(query) if "INSERT INTO incidents" in q and "VALUES ($1, $2, $3, 'manual'" in q: return Row(self.store.insert_incident_manual(args[0], args[1], args[2])) if "FROM incidents WHERE id" in q and "UPDATE" not in q and "/tasks" not in query.lower(): return self.store.get_incident(args[0]) if "UPDATE incidents SET" in q: return self.store.update_incident(args[0], args[1], args[2], args[3]) if "INSERT INTO tasks" in q: return Row(self.store.insert_task(args[0], args[1])) if "FROM tasks WHERE id" in q and "UPDATE" not in q: tid = args[0] t = self.store.tasks.get(tid) return Row(dict(t)) if t else None if "UPDATE tasks SET" in q: return self.store.update_task(args[0], args[1], args[2]) if "INSERT INTO escalation_policies" in q: return Row(self.store.insert_policy(args[0], args[1], args[2])) if "FROM escalation_policies WHERE id" in q and "UPDATE" not in q and "DELETE" not in q: pid = args[0] p = self.store.policies.get(pid) return Row(dict(p)) if p else None if "UPDATE escalation_policies SET" in q: return self.store.update_policy(args[0], args[1], args[2], args[3]) if "DELETE FROM escalation_policies" in q: return self.store.delete_policy(args[0]) raise AssertionError(f"fetchrow not implemented: {q[:120]}") @dataclass class IrmFakeStore: incidents: dict[UUID, dict[str, Any]] = field(default_factory=dict) tasks: dict[UUID, dict[str, Any]] = field(default_factory=dict) policies: dict[UUID, dict[str, Any]] = field(default_factory=dict) def insert_incident_alert( self, title: str, sev: str, ingress_id: UUID, grafana_org_slug: Any, service_name: Any, ) -> None: iid = uuid4() now = _now() self.incidents[iid] = { "id": iid, "title": title, "status": "open", "severity": sev, "source": "grafana", "ingress_event_id": ingress_id, "created_at": now, "updated_at": now, "grafana_org_slug": grafana_org_slug, "service_name": service_name, } def insert_incident_manual(self, title: str, status: str, severity: str) -> dict[str, Any]: iid = uuid4() now = _now() row = { "id": iid, "title": title, "status": status, "severity": severity, "source": "manual", "ingress_event_id": None, "created_at": now, "updated_at": now, "grafana_org_slug": None, "service_name": None, } self.incidents[iid] = row return row def get_incident(self, iid: UUID) -> Row | None: r = self.incidents.get(iid) return Row(dict(r)) if r else None def update_incident( self, iid: UUID, title: str | None, status: str | None, severity: str | None, ) -> Row | None: r = self.incidents.get(iid) if not r: return None if title is not None: r["title"] = title if status is not None: r["status"] = status if severity is not None: r["severity"] = severity r["updated_at"] = _now() return Row(dict(r)) def insert_task(self, title: str, incident_id: UUID | None) -> dict[str, Any]: tid = uuid4() now = _now() row = { "id": tid, "incident_id": incident_id, "title": title, "status": "open", "created_at": now, } self.tasks[tid] = row return row def update_task(self, tid: UUID, title: str | None, status: str | None) -> Row | None: r = self.tasks.get(tid) if not r: return None if title is not None: r["title"] = title if status is not None: r["status"] = status return Row(dict(r)) def insert_policy(self, name: str, enabled: bool, steps_json: str) -> dict[str, Any]: import json pid = uuid4() now = _now() steps = json.loads(steps_json) row = { "id": pid, "name": name, "enabled": enabled, "steps": steps, "created_at": now, } self.policies[pid] = row return row def update_policy( self, pid: UUID, name: str | None, enabled: bool | None, steps_json: str | None, ) -> Row | None: import json r = self.policies.get(pid) if not r: return None if name is not None: r["name"] = name if enabled is not None: r["enabled"] = enabled if steps_json is not None: r["steps"] = json.loads(steps_json) return Row(dict(r)) def delete_policy(self, pid: UUID) -> Row | None: if pid not in self.policies: return None self.policies.pop(pid) return Row({"id": pid}) class IrmFakeAcquire: def __init__(self, store: IrmFakeStore) -> None: self.store = store async def __aenter__(self) -> IrmFakeConn: return IrmFakeConn(self.store) async def __aexit__(self, *args: Any) -> None: pass class IrmFakePool: def __init__(self, store: IrmFakeStore | None = None) -> None: self._store = store or IrmFakeStore() def acquire(self) -> IrmFakeAcquire: return IrmFakeAcquire(self._store) @property def store(self) -> IrmFakeStore: return self._store