"""Кольцевой буфер логов + fan-out в SSE-очереди подписчиков. Подключается через RingBufferHandler в main.py (install_log_handler()). Потокобезопасен: emit() вызывается в любом потоке, asyncio-очереди обновляются через loop.call_soon_threadsafe. """ from __future__ import annotations import asyncio import collections import logging import threading from datetime import datetime, timezone from typing import Any MAX_HISTORY = 600 _lock = threading.Lock() _ring: collections.deque[dict[str, Any]] = collections.deque(maxlen=MAX_HISTORY) _subscribers: list[asyncio.Queue[dict[str, Any]]] = [] _loop: asyncio.AbstractEventLoop | None = None def set_event_loop(loop: asyncio.AbstractEventLoop) -> None: global _loop _loop = loop def get_history() -> list[dict[str, Any]]: with _lock: return list(_ring) def subscribe() -> asyncio.Queue[dict[str, Any]]: q: asyncio.Queue[dict[str, Any]] = asyncio.Queue(maxsize=300) with _lock: _subscribers.append(q) return q def unsubscribe(q: asyncio.Queue[dict[str, Any]]) -> None: with _lock: try: _subscribers.remove(q) except ValueError: pass def _push_to_subscriber(q: asyncio.Queue[dict[str, Any]], entry: dict[str, Any]) -> None: try: q.put_nowait(entry) except asyncio.QueueFull: pass class RingBufferHandler(logging.Handler): """Logging handler — пишет в кольцевой буфер и раздаёт SSE-подписчикам.""" def emit(self, record: logging.LogRecord) -> None: try: msg = self.format(record) ts = datetime.fromtimestamp(record.created, tz=timezone.utc).strftime( "%Y-%m-%d %H:%M:%S" ) entry: dict[str, Any] = { "ts": ts, "level": record.levelname, "name": record.name, "msg": msg, } with _lock: _ring.append(entry) subs = list(_subscribers) if subs and _loop is not None and _loop.is_running(): for q in subs: _loop.call_soon_threadsafe(_push_to_subscriber, q, entry) except Exception: self.handleError(record) def install_log_handler( loop: asyncio.AbstractEventLoop, log_file: str = "", ) -> None: """Вызывается один раз при старте: регистрирует handler на корневом логгере.""" set_event_loop(loop) fmt = logging.Formatter( "%(asctime)s %(levelname)-8s %(name)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) root = logging.getLogger() # Кольцевой буфер (SSE-страница логов) if not any(isinstance(h, RingBufferHandler) for h in root.handlers): ring_h = RingBufferHandler() ring_h.setFormatter(logging.Formatter("%(name)s %(message)s")) ring_h.setLevel(logging.DEBUG) root.addHandler(ring_h) # Файл с ротацией (если задан LOG_FILE) if log_file.strip(): import os from logging.handlers import RotatingFileHandler log_path = log_file.strip() os.makedirs(os.path.dirname(log_path) if os.path.dirname(log_path) else ".", exist_ok=True) if not any(isinstance(h, RotatingFileHandler) for h in root.handlers): file_h = RotatingFileHandler( log_path, maxBytes=10 * 1024 * 1024, # 10 МБ backupCount=5, encoding="utf-8", ) file_h.setFormatter(fmt) file_h.setLevel(logging.DEBUG) root.addHandler(file_h) logging.getLogger("onguard24").info( "file logging enabled: %s (rotate 10MB×5)", log_path )