agents/storage/offload_projector.py · 319 lines · 11.6 KB · Python source
1"""
2OffloadProjector — promote eligible STM memories to IPFS.
3
4For each (agent_id, date) tuple older than the cutoff:
5 1. Pack files into a gzipped JSONL bundle (deterministic, content-addressed).
6 2. Upload to MultiProvider (Lighthouse + nft.storage). Both must agree on CID.
7 3. Verify by retrieving and sha256-comparing.
8 4. Mark each contained memory in pgvector with content_cid + offload_tier='ipfs'.
9 5. Emit `memory.offload` catalogue event with the agent's wallet signature.
10 6. Optionally delete the local date-dir (only if dry_run is False AND verify
11 succeeded).
12
13Phase B ships dry_run=True by default. The destructive backfill that actually
14deletes STM files requires explicit `dry_run=False` from a user-issued
15admin call.
16
17Plan: ~/.claude/plans/whispering-floating-merkle.md
18"""
19
20from __future__ import annotations
21
22import asyncio
23import hashlib
24import json
25import shutil
26import time
27from dataclasses import dataclass, field, asdict
28from pathlib import Path
29from typing import Optional
30
31from utils.logging_config import get_logger
32from utils.config import PROJECT_ROOT
33
34from .anchor import AnchorClient
35from .car_bundle import bundle_iter, manifest, pack_directory
36from .eligibility import OffloadCandidate, list_eligible
37from .multi_provider import MultiProvider
38from .provider import CID, ProviderError
39
40logger = get_logger(__name__)
41
42
43@dataclass
44class OffloadResult:
45 agent_id: str
46 date_str: str
47 file_count: int
48 bytes_packed: int
49 bytes_uploaded: int
50 cid: Optional[str] = None
51 cid_mirror: Optional[str] = None
52 sha256: Optional[str] = None
53 verified: bool = False
54 deleted_local: bool = False
55 memory_ids_marked: int = 0
56 error: Optional[str] = None
57 duration_seconds: float = 0.0
58 dry_run: bool = True
59
60
61@dataclass
62class OffloadRun:
63 started_at: float
64 finished_at: Optional[float] = None
65 candidates_total: int = 0
66 candidates_processed: int = 0
67 bytes_packed_total: int = 0
68 bytes_freed_total: int = 0
69 results: list[OffloadResult] = field(default_factory=list)
70
71
72class OffloadProjector:
73 """Orchestrates per-(agent,date) offload to IPFS."""
74
75 def __init__(
76 self,
77 provider: MultiProvider,
78 memory_agent=None,
79 id_manager=None,
80 project_root: Optional[Path] = None,
81 anchor: Optional[AnchorClient] = None,
82 ):
83 self.provider = provider
84 self.memory_agent = memory_agent
85 self.id_manager = id_manager
86 self.project_root = project_root or PROJECT_ROOT
87 # Anchor is optional: if unconfigured, offload still succeeds without
88 # an on-chain receipt (offload_tx_hash stays NULL).
89 self.anchor = anchor or AnchorClient()
90
91 async def run(
92 self,
93 *,
94 agent_id: Optional[str] = None,
95 min_age_days: float = 14.0,
96 max_batches: int = 50,
97 dry_run: bool = True,
98 ) -> OffloadRun:
99 """
100 Run a single pass of the offload projector.
101
102 Args:
103 agent_id: restrict to one agent (None = all eligible).
104 min_age_days: only offload directories older than this.
105 max_batches: cap on (agent, date) tuples processed in this pass.
106 dry_run: if True (default), upload+verify+mark-DB but DO NOT delete
107 local files. Set to False explicitly to free disk.
108 """
109 run = OffloadRun(started_at=time.time())
110 candidates = list_eligible(
111 self.project_root, min_age_days=min_age_days, agent_id=agent_id,
112 )
113 run.candidates_total = len(candidates)
114 for cand in candidates[:max_batches]:
115 res = await self._process(cand, dry_run=dry_run)
116 run.results.append(res)
117 run.candidates_processed += 1
118 run.bytes_packed_total += res.bytes_packed
119 if res.deleted_local:
120 run.bytes_freed_total += res.bytes_packed
121 # Yield to the event loop between batches so we don't starve
122 # other tasks during a large backfill.
123 await asyncio.sleep(0)
124 run.finished_at = time.time()
125 return run
126
127 async def _process(self, cand: OffloadCandidate, *, dry_run: bool) -> OffloadResult:
128 t0 = time.time()
129 result = OffloadResult(
130 agent_id=cand.agent_id,
131 date_str=cand.date_str,
132 file_count=0,
133 bytes_packed=0,
134 bytes_uploaded=0,
135 dry_run=dry_run,
136 )
137 try:
138 mani = manifest(cand.path)
139 result.file_count = mani["file_count"]
140 blob = pack_directory(cand.path)
141 result.bytes_packed = len(blob)
142 result.sha256 = hashlib.sha256(blob).hexdigest()
143 if not blob:
144 result.error = "empty bundle"
145 return result
146
147 # Upload — MultiProvider may raise if both legs fail
148 cid = await self.provider.upload(
149 blob, name=f"{cand.agent_id}_{cand.date_str}.jsonl.gz",
150 )
151 result.cid = cid.value
152 result.bytes_uploaded = len(blob)
153
154 # Verify by re-downloading; require sha256 match
155 try:
156 back = await self.provider.retrieve(cid, timeout=15.0)
157 back_sha = hashlib.sha256(back).hexdigest()
158 result.verified = back_sha == result.sha256
159 if not result.verified:
160 result.error = (
161 f"verify failed: local_sha={result.sha256[:12]} "
162 f"remote_sha={back_sha[:12]}"
163 )
164 logger.warning(
165 "[offload] verify failed for %s/%s — keeping local",
166 cand.agent_id, cand.date_str,
167 )
168 except ProviderError as ve:
169 result.error = f"verify retrieval failed: {ve}"
170 result.verified = False
171
172 # Mark every memory_id in this batch as offloaded (best-effort)
173 if result.verified:
174 # On-chain anchor (best-effort; skipped if anchor not configured)
175 anchor_receipt: dict = {}
176 tx_hash: Optional[str] = None
177 if self.anchor.configured:
178 try:
179 anchor_receipt = await self.anchor.anchor_dataset_registry(
180 agent_id=cand.agent_id,
181 date_str=cand.date_str,
182 cid=result.cid,
183 )
184 tx_hash = anchor_receipt.get("tx_hash")
185 if tx_hash:
186 logger.info(
187 "[offload] anchored %s/%s -> tx %s",
188 cand.agent_id, cand.date_str, tx_hash,
189 )
190 except Exception as ae:
191 logger.debug("[offload] anchor failed: %s", ae)
192 anchor_receipt = {"error": str(ae)}
193
194 marked = await self._mark_db(
195 blob, cid_value=result.cid, mirror=None,
196 tx_hash=tx_hash,
197 chain="arc" if tx_hash else None,
198 )
199 result.memory_ids_marked = marked
200
201 # Emit catalogue event so /insight/storage/recent can read it
202 await self._emit_offload_event(
203 cand=cand, result=result, anchor_receipt=anchor_receipt,
204 )
205
206 # Delete local files only if not in dry_run mode
207 if not dry_run:
208 try:
209 shutil.rmtree(cand.path)
210 result.deleted_local = True
211 logger.info(
212 "[offload] deleted %s/%s — %d files, %s freed",
213 cand.agent_id, cand.date_str, result.file_count,
214 _human_bytes(result.bytes_packed),
215 )
216 except OSError as de:
217 result.error = f"delete failed: {de}"
218 except ProviderError as pe:
219 result.error = f"provider error: {pe}"
220 logger.warning("[offload] %s/%s upload failed: %s",
221 cand.agent_id, cand.date_str, pe)
222 except Exception as e:
223 result.error = f"{type(e).__name__}: {e}"
224 logger.exception("[offload] %s/%s unexpected failure",
225 cand.agent_id, cand.date_str)
226 finally:
227 result.duration_seconds = time.time() - t0
228 return result
229
230 async def _mark_db(
231 self,
232 blob: bytes,
233 *,
234 cid_value: str,
235 mirror: Optional[str],
236 tx_hash: Optional[str] = None,
237 chain: Optional[str] = None,
238 ) -> int:
239 """For every memory_id in the bundle, update pgvector row."""
240 try:
241 from agents import memory_pgvector
242 except Exception:
243 return 0
244 marked = 0
245 for entry in bundle_iter(blob):
246 rec = entry.get("record") or {}
247 memory_id = rec.get("memory_id")
248 if not memory_id:
249 continue
250 try:
251 ok = await memory_pgvector.mark_memory_offloaded(
252 memory_id=memory_id,
253 content_cid=cid_value,
254 content_cid_mirror=mirror,
255 offload_tier="ipfs",
256 tx_hash=tx_hash,
257 chain=chain,
258 )
259 if ok:
260 marked += 1
261 except Exception:
262 continue
263 return marked
264
265 async def _emit_offload_event(self, *, cand: OffloadCandidate, result: OffloadResult, anchor_receipt: Optional[dict] = None) -> None:
266 try:
267 from agents.catalogue import emit_catalogue_event
268 except Exception:
269 return
270 wallet = None
271 if self.id_manager is not None:
272 try:
273 wallet = self.id_manager.get_public_address(cand.agent_id)
274 except Exception:
275 wallet = None
276 try:
277 await emit_catalogue_event(
278 kind="memory.offload",
279 actor=cand.agent_id,
280 payload={
281 "date_str": cand.date_str,
282 "file_count": result.file_count,
283 "bytes_packed": result.bytes_packed,
284 "cid": result.cid,
285 "sha256": result.sha256,
286 "verified": result.verified,
287 "deleted_local": result.deleted_local,
288 "dry_run": result.dry_run,
289 "anchor": anchor_receipt or None,
290 },
291 source_log="storage/offload_projector",
292 source_ref=f"{cand.agent_id}/{cand.date_str}",
293 actor_wallet=wallet,
294 )
295 except Exception:
296 pass
297
298
299def _human_bytes(n: int) -> str:
300 for unit in ("B", "KB", "MB", "GB", "TB"):
301 if abs(n) < 1024.0:
302 return f"{n:.1f}{unit}"
303 n /= 1024.0 # type: ignore[assignment]
304 return f"{n:.1f}PB"
305
306
307def serialize_run(run: OffloadRun) -> dict:
308 """Convert OffloadRun to a JSON-serializable dict for API responses."""
309 return {
310 "started_at": run.started_at,
311 "finished_at": run.finished_at,
312 "duration_seconds": (run.finished_at or time.time()) - run.started_at,
313 "candidates_total": run.candidates_total,
314 "candidates_processed": run.candidates_processed,
315 "bytes_packed_total": run.bytes_packed_total,
316 "bytes_freed_total": run.bytes_freed_total,
317 "human_freed": _human_bytes(run.bytes_freed_total),
318 "results": [asdict(r) for r in run.results],
319 }