"""Кольцевой буфер логов + fan-out в SSE-очереди подписчиков. Подключается через RingBufferHandler в main.py (install_log_handler()). Потокобезопасен: emit() вызывается в любом потоке, asyncio-очереди обновляются через loop.call_soon_threadsafe. """ from __future__ import annotations import asyncio import collections import logging import threading from datetime import datetime, timezone from typing import Any MAX_HISTORY = 600 _lock = threading.Lock() _ring: collections.deque[dict[str, Any]] = collections.deque(maxlen=MAX_HISTORY) _subscribers: list[asyncio.Queue[dict[str, Any]]] = [] _loop: asyncio.AbstractEventLoop | None = None def set_event_loop(loop: asyncio.AbstractEventLoop) -> None: global _loop _loop = loop def get_history() -> list[dict[str, Any]]: with _lock: return list(_ring) def subscribe() -> asyncio.Queue[dict[str, Any]]: q: asyncio.Queue[dict[str, Any]] = asyncio.Queue(maxsize=300) with _lock: _subscribers.append(q) return q def unsubscribe(q: asyncio.Queue[dict[str, Any]]) -> None: with _lock: try: _subscribers.remove(q) except ValueError: pass def _push_to_subscriber(q: asyncio.Queue[dict[str, Any]], entry: dict[str, Any]) -> None: try: q.put_nowait(entry) except asyncio.QueueFull: pass class RingBufferHandler(logging.Handler): """Logging handler — пишет в кольцевой буфер и раздаёт SSE-подписчикам.""" def emit(self, record: logging.LogRecord) -> None: try: msg = self.format(record) ts = datetime.fromtimestamp(record.created, tz=timezone.utc).strftime( "%Y-%m-%d %H:%M:%S" ) entry: dict[str, Any] = { "ts": ts, "level": record.levelname, "name": record.name, "msg": msg, } with _lock: _ring.append(entry) subs = list(_subscribers) if subs and _loop is not None and _loop.is_running(): for q in subs: _loop.call_soon_threadsafe(_push_to_subscriber, q, entry) except Exception: self.handleError(record) def install_log_handler(loop: asyncio.AbstractEventLoop) -> None: """Вызывается один раз при старте: регистрирует handler на корневом логгере.""" set_event_loop(loop) handler = RingBufferHandler() handler.setFormatter( logging.Formatter("%(name)s %(message)s") ) handler.setLevel(logging.DEBUG) root = logging.getLogger() if not any(isinstance(h, RingBufferHandler) for h in root.handlers): root.addHandler(handler)