Files
onGuard24/onguard24/log_buffer.py
Alexandr c9b97814a5
All checks were successful
CI / test (push) Successful in 47s
feat: логирование вебхука до БД + файловый лог с ротацией
- Каждый входящий POST /ingress/grafana: INFO-строка (status, кол-во алертов,
  первые лейблы) и DEBUG-блок с полным JSON телом (до 8КБ)
  — видно даже если БД упала с 500
- LOG_FILE в .env / env: RotatingFileHandler 10MB×5 файлов
- LOG_LEVEL=debug теперь показывает полные тела вебхуков
- basicConfig уровень DEBUG (uvicorn.access / asyncio приглушены)

Made-with: Cursor
2026-04-03 15:59:17 +03:00

120 lines
3.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Кольцевой буфер логов + fan-out в SSE-очереди подписчиков.
Подключается через RingBufferHandler в main.py (install_log_handler()).
Потокобезопасен: emit() вызывается в любом потоке, asyncio-очереди
обновляются через loop.call_soon_threadsafe.
"""
from __future__ import annotations
import asyncio
import collections
import logging
import threading
from datetime import datetime, timezone
from typing import Any
MAX_HISTORY = 600
_lock = threading.Lock()
_ring: collections.deque[dict[str, Any]] = collections.deque(maxlen=MAX_HISTORY)
_subscribers: list[asyncio.Queue[dict[str, Any]]] = []
_loop: asyncio.AbstractEventLoop | None = None
def set_event_loop(loop: asyncio.AbstractEventLoop) -> None:
global _loop
_loop = loop
def get_history() -> list[dict[str, Any]]:
with _lock:
return list(_ring)
def subscribe() -> asyncio.Queue[dict[str, Any]]:
q: asyncio.Queue[dict[str, Any]] = asyncio.Queue(maxsize=300)
with _lock:
_subscribers.append(q)
return q
def unsubscribe(q: asyncio.Queue[dict[str, Any]]) -> None:
with _lock:
try:
_subscribers.remove(q)
except ValueError:
pass
def _push_to_subscriber(q: asyncio.Queue[dict[str, Any]], entry: dict[str, Any]) -> None:
try:
q.put_nowait(entry)
except asyncio.QueueFull:
pass
class RingBufferHandler(logging.Handler):
"""Logging handler — пишет в кольцевой буфер и раздаёт SSE-подписчикам."""
def emit(self, record: logging.LogRecord) -> None:
try:
msg = self.format(record)
ts = datetime.fromtimestamp(record.created, tz=timezone.utc).strftime(
"%Y-%m-%d %H:%M:%S"
)
entry: dict[str, Any] = {
"ts": ts,
"level": record.levelname,
"name": record.name,
"msg": msg,
}
with _lock:
_ring.append(entry)
subs = list(_subscribers)
if subs and _loop is not None and _loop.is_running():
for q in subs:
_loop.call_soon_threadsafe(_push_to_subscriber, q, entry)
except Exception:
self.handleError(record)
def install_log_handler(
loop: asyncio.AbstractEventLoop,
log_file: str = "",
) -> None:
"""Вызывается один раз при старте: регистрирует handler на корневом логгере."""
set_event_loop(loop)
fmt = logging.Formatter(
"%(asctime)s %(levelname)-8s %(name)s %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
root = logging.getLogger()
# Кольцевой буфер (SSE-страница логов)
if not any(isinstance(h, RingBufferHandler) for h in root.handlers):
ring_h = RingBufferHandler()
ring_h.setFormatter(logging.Formatter("%(name)s %(message)s"))
ring_h.setLevel(logging.DEBUG)
root.addHandler(ring_h)
# Файл с ротацией (если задан LOG_FILE)
if log_file.strip():
import os
from logging.handlers import RotatingFileHandler
log_path = log_file.strip()
os.makedirs(os.path.dirname(log_path) if os.path.dirname(log_path) else ".", exist_ok=True)
if not any(isinstance(h, RotatingFileHandler) for h in root.handlers):
file_h = RotatingFileHandler(
log_path,
maxBytes=10 * 1024 * 1024, # 10 МБ
backupCount=5,
encoding="utf-8",
)
file_h.setFormatter(fmt)
file_h.setLevel(logging.DEBUG)
root.addHandler(file_h)
logging.getLogger("onguard24").info(
"file logging enabled: %s (rotate 10MB×5)", log_path
)