agents/catalogue/log.py · 83 lines · 3.1 KB · Python source
1"""
2CatalogueEventLog — append-only JSONL writer with daily rotation.
3
4Output: data/logs/catalogue_events.jsonl (active)
5Rotates to data/logs/catalogue_events.YYYYMMDD-HHMMSS.jsonl on size threshold
6(default 100 MB) so the active file remains tail-able.
7
8Thread-safety: a single asyncio.Lock serializes appends, which is sufficient
9because the entire backend service is single-process async. Cross-process
10writers would require flock; mindX does not have any today.
11"""
12
13from __future__ import annotations
14
15import asyncio
16import os
17from datetime import datetime
18from pathlib import Path
19from typing import Optional
20
21import aiofiles # type: ignore[import-not-found]
22
23from .events import CatalogueEvent
24
25ROTATE_BYTES_DEFAULT = 100 * 1024 * 1024 # 100 MB
26
27
28class CatalogueEventLog:
29 """Append-only JSONL writer for CatalogueEvents."""
30
31 _default: Optional["CatalogueEventLog"] = None
32
33 def __init__(self, path: Path, rotate_bytes: int = ROTATE_BYTES_DEFAULT):
34 self.path = Path(path)
35 self.rotate_bytes = max(1024 * 1024, int(rotate_bytes))
36 self._lock = asyncio.Lock()
37 self.path.parent.mkdir(parents=True, exist_ok=True)
38
39 @classmethod
40 def default(cls) -> "CatalogueEventLog":
41 """Singleton anchored at <project_root>/data/logs/catalogue_events.jsonl."""
42 if cls._default is not None:
43 return cls._default
44 try:
45 from utils.config import PROJECT_ROOT
46 base = PROJECT_ROOT / "data" / "logs"
47 except Exception:
48 # Fall back to repo-local data/logs when config is unavailable
49 base = Path(__file__).resolve().parents[2] / "data" / "logs"
50 cls._default = cls(base / "catalogue_events.jsonl")
51 return cls._default
52
53 async def append(self, evt: CatalogueEvent) -> None:
54 """Append one event. Rotates the file if it crosses the size threshold."""
55 line = evt.model_dump_json() + "\n"
56 async with self._lock:
57 await self._maybe_rotate()
58 async with aiofiles.open(self.path, "a", encoding="utf-8") as f:
59 await f.write(line)
60
61 async def _maybe_rotate(self) -> None:
62 try:
63 if self.path.exists() and self.path.stat().st_size >= self.rotate_bytes:
64 stamp = datetime.utcnow().strftime("%Y%m%d-%H%M%S")
65 rotated = self.path.with_name(f"{self.path.stem}.{stamp}{self.path.suffix}")
66 # If a same-second rotation already happened, suffix the rotated name.
67 n = 1
68 while rotated.exists():
69 rotated = self.path.with_name(
70 f"{self.path.stem}.{stamp}-{n}{self.path.suffix}"
71 )
72 n += 1
73 os.rename(self.path, rotated)
74 except Exception:
75 # Rotation failures must not block writes; swallow and continue.
76 pass
77
78 def stats(self) -> dict:
79 try:
80 size = self.path.stat().st_size if self.path.exists() else 0
81 except OSError:
82 size = 0
83 return {"path": str(self.path), "size_bytes": size, "rotate_bytes": self.rotate_bytes}