agents/storage/offload_projector.py · 319 lines · 11.6 KB · Python source
agents/storage/offload_projector.py · 319 lines · 11.6 KBrawGitHub
1"""
2OffloadProjector — promote eligible STM memories to IPFS.
3 
4For each (agent_id, date) tuple older than the cutoff:
5  1. Pack files into a gzipped JSONL bundle (deterministic, content-addressed).
6  2. Upload to MultiProvider (Lighthouse + nft.storage). Both must agree on CID.
7  3. Verify by retrieving and sha256-comparing.
8  4. Mark each contained memory in pgvector with content_cid + offload_tier='ipfs'.
9  5. Emit `memory.offload` catalogue event with the agent's wallet signature.
10  6. Optionally delete the local date-dir (only if dry_run is False AND verify
11     succeeded).
12 
13Phase B ships dry_run=True by default. The destructive backfill that actually
14deletes STM files requires explicit `dry_run=False` from a user-issued
15admin call.
16 
17Plan: ~/.claude/plans/whispering-floating-merkle.md
18"""
19 
20from __future__ import annotations
21 
22import asyncio
23import hashlib
24import json
25import shutil
26import time
27from dataclasses import dataclass, field, asdict
28from pathlib import Path
29from typing import Optional
30 
31from utils.logging_config import get_logger
32from utils.config import PROJECT_ROOT
33 
34from .anchor import AnchorClient
35from .car_bundle import bundle_iter, manifest, pack_directory
36from .eligibility import OffloadCandidate, list_eligible
37from .multi_provider import MultiProvider
38from .provider import CID, ProviderError
39 
40logger = get_logger(__name__)
41 
42 
43@dataclass
44class OffloadResult:
45    agent_id: str
46    date_str: str
47    file_count: int
48    bytes_packed: int
49    bytes_uploaded: int
50    cid: Optional[str] = None
51    cid_mirror: Optional[str] = None
52    sha256: Optional[str] = None
53    verified: bool = False
54    deleted_local: bool = False
55    memory_ids_marked: int = 0
56    error: Optional[str] = None
57    duration_seconds: float = 0.0
58    dry_run: bool = True
59 
60 
61@dataclass
62class OffloadRun:
63    started_at: float
64    finished_at: Optional[float] = None
65    candidates_total: int = 0
66    candidates_processed: int = 0
67    bytes_packed_total: int = 0
68    bytes_freed_total: int = 0
69    results: list[OffloadResult] = field(default_factory=list)
70 
71 
72class OffloadProjector:
73    """Orchestrates per-(agent,date) offload to IPFS."""
74 
75    def __init__(
76        self,
77        provider: MultiProvider,
78        memory_agent=None,
79        id_manager=None,
80        project_root: Optional[Path] = None,
81        anchor: Optional[AnchorClient] = None,
82    ):
83        self.provider = provider
84        self.memory_agent = memory_agent
85        self.id_manager = id_manager
86        self.project_root = project_root or PROJECT_ROOT
87        # Anchor is optional: if unconfigured, offload still succeeds without
88        # an on-chain receipt (offload_tx_hash stays NULL).
89        self.anchor = anchor or AnchorClient()
90 
91    async def run(
92        self,
93        *,
94        agent_id: Optional[str] = None,
95        min_age_days: float = 14.0,
96        max_batches: int = 50,
97        dry_run: bool = True,
98    ) -> OffloadRun:
99        """
100        Run a single pass of the offload projector.
101 
102        Args:
103            agent_id: restrict to one agent (None = all eligible).
104            min_age_days: only offload directories older than this.
105            max_batches: cap on (agent, date) tuples processed in this pass.
106            dry_run: if True (default), upload+verify+mark-DB but DO NOT delete
107                local files. Set to False explicitly to free disk.
108        """
109        run = OffloadRun(started_at=time.time())
110        candidates = list_eligible(
111            self.project_root, min_age_days=min_age_days, agent_id=agent_id,
112        )
113        run.candidates_total = len(candidates)
114        for cand in candidates[:max_batches]:
115            res = await self._process(cand, dry_run=dry_run)
116            run.results.append(res)
117            run.candidates_processed += 1
118            run.bytes_packed_total += res.bytes_packed
119            if res.deleted_local:
120                run.bytes_freed_total += res.bytes_packed
121            # Yield to the event loop between batches so we don't starve
122            # other tasks during a large backfill.
123            await asyncio.sleep(0)
124        run.finished_at = time.time()
125        return run
126 
127    async def _process(self, cand: OffloadCandidate, *, dry_run: bool) -> OffloadResult:
128        t0 = time.time()
129        result = OffloadResult(
130            agent_id=cand.agent_id,
131            date_str=cand.date_str,
132            file_count=0,
133            bytes_packed=0,
134            bytes_uploaded=0,
135            dry_run=dry_run,
136        )
137        try:
138            mani = manifest(cand.path)
139            result.file_count = mani["file_count"]
140            blob = pack_directory(cand.path)
141            result.bytes_packed = len(blob)
142            result.sha256 = hashlib.sha256(blob).hexdigest()
143            if not blob:
144                result.error = "empty bundle"
145                return result
146 
147            # Upload — MultiProvider may raise if both legs fail
148            cid = await self.provider.upload(
149                blob, name=f"{cand.agent_id}_{cand.date_str}.jsonl.gz",
150            )
151            result.cid = cid.value
152            result.bytes_uploaded = len(blob)
153 
154            # Verify by re-downloading; require sha256 match
155            try:
156                back = await self.provider.retrieve(cid, timeout=15.0)
157                back_sha = hashlib.sha256(back).hexdigest()
158                result.verified = back_sha == result.sha256
159                if not result.verified:
160                    result.error = (
161                        f"verify failed: local_sha={result.sha256[:12]} "
162                        f"remote_sha={back_sha[:12]}"
163                    )
164                    logger.warning(
165                        "[offload] verify failed for %s/%s — keeping local",
166                        cand.agent_id, cand.date_str,
167                    )
168            except ProviderError as ve:
169                result.error = f"verify retrieval failed: {ve}"
170                result.verified = False
171 
172            # Mark every memory_id in this batch as offloaded (best-effort)
173            if result.verified:
174                # On-chain anchor (best-effort; skipped if anchor not configured)
175                anchor_receipt: dict = {}
176                tx_hash: Optional[str] = None
177                if self.anchor.configured:
178                    try:
179                        anchor_receipt = await self.anchor.anchor_dataset_registry(
180                            agent_id=cand.agent_id,
181                            date_str=cand.date_str,
182                            cid=result.cid,
183                        )
184                        tx_hash = anchor_receipt.get("tx_hash")
185                        if tx_hash:
186                            logger.info(
187                                "[offload] anchored %s/%s -> tx %s",
188                                cand.agent_id, cand.date_str, tx_hash,
189                            )
190                    except Exception as ae:
191                        logger.debug("[offload] anchor failed: %s", ae)
192                        anchor_receipt = {"error": str(ae)}
193 
194                marked = await self._mark_db(
195                    blob, cid_value=result.cid, mirror=None,
196                    tx_hash=tx_hash,
197                    chain="arc" if tx_hash else None,
198                )
199                result.memory_ids_marked = marked
200 
201                # Emit catalogue event so /insight/storage/recent can read it
202                await self._emit_offload_event(
203                    cand=cand, result=result, anchor_receipt=anchor_receipt,
204                )
205 
206                # Delete local files only if not in dry_run mode
207                if not dry_run:
208                    try:
209                        shutil.rmtree(cand.path)
210                        result.deleted_local = True
211                        logger.info(
212                            "[offload] deleted %s/%s — %d files, %s freed",
213                            cand.agent_id, cand.date_str, result.file_count,
214                            _human_bytes(result.bytes_packed),
215                        )
216                    except OSError as de:
217                        result.error = f"delete failed: {de}"
218        except ProviderError as pe:
219            result.error = f"provider error: {pe}"
220            logger.warning("[offload] %s/%s upload failed: %s",
221                           cand.agent_id, cand.date_str, pe)
222        except Exception as e:
223            result.error = f"{type(e).__name__}: {e}"
224            logger.exception("[offload] %s/%s unexpected failure",
225                             cand.agent_id, cand.date_str)
226        finally:
227            result.duration_seconds = time.time() - t0
228        return result
229 
230    async def _mark_db(
231        self,
232        blob: bytes,
233        *,
234        cid_value: str,
235        mirror: Optional[str],
236        tx_hash: Optional[str] = None,
237        chain: Optional[str] = None,
238    ) -> int:
239        """For every memory_id in the bundle, update pgvector row."""
240        try:
241            from agents import memory_pgvector
242        except Exception:
243            return 0
244        marked = 0
245        for entry in bundle_iter(blob):
246            rec = entry.get("record") or {}
247            memory_id = rec.get("memory_id")
248            if not memory_id:
249                continue
250            try:
251                ok = await memory_pgvector.mark_memory_offloaded(
252                    memory_id=memory_id,
253                    content_cid=cid_value,
254                    content_cid_mirror=mirror,
255                    offload_tier="ipfs",
256                    tx_hash=tx_hash,
257                    chain=chain,
258                )
259                if ok:
260                    marked += 1
261            except Exception:
262                continue
263        return marked
264 
265    async def _emit_offload_event(self, *, cand: OffloadCandidate, result: OffloadResult, anchor_receipt: Optional[dict] = None) -> None:
266        try:
267            from agents.catalogue import emit_catalogue_event
268        except Exception:
269            return
270        wallet = None
271        if self.id_manager is not None:
272            try:
273                wallet = self.id_manager.get_public_address(cand.agent_id)
274            except Exception:
275                wallet = None
276        try:
277            await emit_catalogue_event(
278                kind="memory.offload",
279                actor=cand.agent_id,
280                payload={
281                    "date_str": cand.date_str,
282                    "file_count": result.file_count,
283                    "bytes_packed": result.bytes_packed,
284                    "cid": result.cid,
285                    "sha256": result.sha256,
286                    "verified": result.verified,
287                    "deleted_local": result.deleted_local,
288                    "dry_run": result.dry_run,
289                    "anchor": anchor_receipt or None,
290                },
291                source_log="storage/offload_projector",
292                source_ref=f"{cand.agent_id}/{cand.date_str}",
293                actor_wallet=wallet,
294            )
295        except Exception:
296            pass
297 
298 
299def _human_bytes(n: int) -> str:
300    for unit in ("B", "KB", "MB", "GB", "TB"):
301        if abs(n) < 1024.0:
302            return f"{n:.1f}{unit}"
303        n /= 1024.0  # type: ignore[assignment]
304    return f"{n:.1f}PB"
305 
306 
307def serialize_run(run: OffloadRun) -> dict:
308    """Convert OffloadRun to a JSON-serializable dict for API responses."""
309    return {
310        "started_at": run.started_at,
311        "finished_at": run.finished_at,
312        "duration_seconds": (run.finished_at or time.time()) - run.started_at,
313        "candidates_total": run.candidates_total,
314        "candidates_processed": run.candidates_processed,
315        "bytes_packed_total": run.bytes_packed_total,
316        "bytes_freed_total": run.bytes_freed_total,
317        "human_freed": _human_bytes(run.bytes_freed_total),
318        "results": [asdict(r) for r in run.results],
319    }

All DocumentsBook of mindXAPI Reference