agents/catalogue/log.py · 83 lines · 3.1 KB · Python source
agents/catalogue/log.py · 83 lines · 3.1 KBrawGitHub
1"""
2CatalogueEventLog — append-only JSONL writer with daily rotation.
3 
4Output: data/logs/catalogue_events.jsonl (active)
5Rotates to data/logs/catalogue_events.YYYYMMDD-HHMMSS.jsonl on size threshold
6(default 100 MB) so the active file remains tail-able.
7 
8Thread-safety: a single asyncio.Lock serializes appends, which is sufficient
9because the entire backend service is single-process async. Cross-process
10writers would require flock; mindX does not have any today.
11"""
12 
13from __future__ import annotations
14 
15import asyncio
16import os
17from datetime import datetime
18from pathlib import Path
19from typing import Optional
20 
21import aiofiles  # type: ignore[import-not-found]
22 
23from .events import CatalogueEvent
24 
25ROTATE_BYTES_DEFAULT = 100 * 1024 * 1024  # 100 MB
26 
27 
28class CatalogueEventLog:
29    """Append-only JSONL writer for CatalogueEvents."""
30 
31    _default: Optional["CatalogueEventLog"] = None
32 
33    def __init__(self, path: Path, rotate_bytes: int = ROTATE_BYTES_DEFAULT):
34        self.path = Path(path)
35        self.rotate_bytes = max(1024 * 1024, int(rotate_bytes))
36        self._lock = asyncio.Lock()
37        self.path.parent.mkdir(parents=True, exist_ok=True)
38 
39    @classmethod
40    def default(cls) -> "CatalogueEventLog":
41        """Singleton anchored at <project_root>/data/logs/catalogue_events.jsonl."""
42        if cls._default is not None:
43            return cls._default
44        try:
45            from utils.config import PROJECT_ROOT
46            base = PROJECT_ROOT / "data" / "logs"
47        except Exception:
48            # Fall back to repo-local data/logs when config is unavailable
49            base = Path(__file__).resolve().parents[2] / "data" / "logs"
50        cls._default = cls(base / "catalogue_events.jsonl")
51        return cls._default
52 
53    async def append(self, evt: CatalogueEvent) -> None:
54        """Append one event. Rotates the file if it crosses the size threshold."""
55        line = evt.model_dump_json() + "\n"
56        async with self._lock:
57            await self._maybe_rotate()
58            async with aiofiles.open(self.path, "a", encoding="utf-8") as f:
59                await f.write(line)
60 
61    async def _maybe_rotate(self) -> None:
62        try:
63            if self.path.exists() and self.path.stat().st_size >= self.rotate_bytes:
64                stamp = datetime.utcnow().strftime("%Y%m%d-%H%M%S")
65                rotated = self.path.with_name(f"{self.path.stem}.{stamp}{self.path.suffix}")
66                # If a same-second rotation already happened, suffix the rotated name.
67                n = 1
68                while rotated.exists():
69                    rotated = self.path.with_name(
70                        f"{self.path.stem}.{stamp}-{n}{self.path.suffix}"
71                    )
72                    n += 1
73                os.rename(self.path, rotated)
74        except Exception:
75            # Rotation failures must not block writes; swallow and continue.
76            pass
77 
78    def stats(self) -> dict:
79        try:
80            size = self.path.stat().st_size if self.path.exists() else 0
81        except OSError:
82            size = 0
83        return {"path": str(self.path), "size_bytes": size, "rotate_bytes": self.rotate_bytes}

All DocumentsBook of mindXAPI Reference