agents/storage/anchor.py · 171 lines · 6.2 KB · Python source
1"""
2On-chain anchoring of memory CIDs.
3
4Two tiers:
5- **everyday tier (ARC DatasetRegistry)** — permissionless registerDataset(bytes32,string).
6 Every offload batch lands here.
7- **curated tier (THOT)** — owner-gated mintTHOT(...). Only memories that
8 pass through LTM consolidation and represent durable distilled knowledge.
9 Phase C ships THOT as a no-op stub when no owner key is configured.
10
11Anchor receipts are emitted as `memory.anchor` catalogue events.
12"""
13
14from __future__ import annotations
15
16import hashlib
17import os
18from typing import Optional
19
20from eth_utils import to_checksum_address # type: ignore[import-not-found]
21
22from utils.logging_config import get_logger
23
24from .raw_tx import RawTxClient, RawTxError
25
26logger = get_logger(__name__)
27
28# Function selector for registerDataset(bytes32,string)
29# keccak256("registerDataset(bytes32,string)")[:4] = 0x --- computed once below
30# We compute it lazily to avoid an import-time keccak in cold paths; cache result.
31_REGISTER_DATASET_SELECTOR: Optional[bytes] = None
32
33
34def _selector(signature: str) -> bytes:
35 """Compute a 4-byte function selector via keccak256."""
36 from Crypto.Hash import keccak # type: ignore[import-not-found]
37 k = keccak.new(digest_bits=256)
38 k.update(signature.encode("utf-8"))
39 return k.digest()[:4]
40
41
42def _register_dataset_selector() -> bytes:
43 global _REGISTER_DATASET_SELECTOR
44 if _REGISTER_DATASET_SELECTOR is None:
45 try:
46 _REGISTER_DATASET_SELECTOR = _selector("registerDataset(bytes32,string)")
47 except Exception:
48 # Fallback: hardcoded value computed offline.
49 # keccak256("registerDataset(bytes32,string)")[:4] = 0xf1783fb8
50 _REGISTER_DATASET_SELECTOR = bytes.fromhex("f1783fb8")
51 return _REGISTER_DATASET_SELECTOR
52
53
54def _encode_register_dataset(dataset_id: bytes, root_cid: str) -> str:
55 """ABI-encode calldata for registerDataset(bytes32,string)."""
56 if len(dataset_id) != 32:
57 raise ValueError("dataset_id must be 32 bytes")
58 selector = _register_dataset_selector()
59 # Layout: selector || dataset_id (32B) || offset_to_string (32B = 0x40) ||
60 # string_length (32B) || string_bytes_padded
61 cid_bytes = root_cid.encode("utf-8")
62 pad_len = (32 - (len(cid_bytes) % 32)) % 32
63 encoded = (
64 selector
65 + dataset_id
66 + (0x40).to_bytes(32, "big")
67 + len(cid_bytes).to_bytes(32, "big")
68 + cid_bytes
69 + (b"\x00" * pad_len)
70 )
71 return "0x" + encoded.hex()
72
73
74def derive_dataset_id(agent_id: str, date_str: str, cid: str) -> bytes:
75 """Stable bytes32 dataset id for a given (agent, date, CID) triple."""
76 h = hashlib.sha256(f"{agent_id}|{date_str}|{cid}".encode("utf-8")).digest()
77 return h # already 32 bytes
78
79
80class AnchorClient:
81 """
82 Wraps a RawTxClient with mindX-specific anchor methods.
83
84 Configure either with explicit args or via env:
85 ARC_RPC_URL, ARC_CHAIN_ID, ARC_REGISTRY_ADDRESS, MEMORY_ANCHOR_TREASURY_PK
86 """
87
88 def __init__(
89 self,
90 *,
91 rpc_url: Optional[str] = None,
92 chain_id: Optional[int] = None,
93 registry_address: Optional[str] = None,
94 private_key: Optional[str] = None,
95 ):
96 self.rpc_url = rpc_url or os.environ.get("ARC_RPC_URL", "")
97 self.chain_id = chain_id or int(os.environ.get("ARC_CHAIN_ID", "0") or 0)
98 self.registry_address = registry_address or os.environ.get("ARC_REGISTRY_ADDRESS", "")
99 self.private_key = private_key or os.environ.get("MEMORY_ANCHOR_TREASURY_PK", "")
100 self._client: Optional[RawTxClient] = None
101
102 @property
103 def configured(self) -> bool:
104 return bool(self.rpc_url and self.chain_id and self.registry_address and self.private_key)
105
106 async def _ensure_client(self) -> RawTxClient:
107 if self._client is None:
108 if not self.configured:
109 raise RawTxError(
110 "AnchorClient not configured: need ARC_RPC_URL, ARC_CHAIN_ID, "
111 "ARC_REGISTRY_ADDRESS, MEMORY_ANCHOR_TREASURY_PK"
112 )
113 self._client = RawTxClient(
114 rpc_url=self.rpc_url,
115 chain_id=self.chain_id,
116 private_key=self.private_key,
117 )
118 return self._client
119
120 async def close(self) -> None:
121 if self._client is not None:
122 await self._client.close()
123
124 async def anchor_dataset_registry(
125 self, *, agent_id: str, date_str: str, cid: str,
126 ) -> dict:
127 """
128 Register the (agent_id, date_str, cid) tuple on the ARC DatasetRegistry.
129 Returns receipt dict {tx_hash, dataset_id_hex, status} or {error} on failure.
130 """
131 client = await self._ensure_client()
132 dataset_id = derive_dataset_id(agent_id, date_str, cid)
133 try:
134 data = _encode_register_dataset(dataset_id, cid)
135 tx_hash = await client.send_tx(
136 to=to_checksum_address(self.registry_address),
137 data=data,
138 )
139 return {
140 "tx_hash": tx_hash,
141 "dataset_id_hex": "0x" + dataset_id.hex(),
142 "chain": "arc",
143 "registry": self.registry_address,
144 }
145 except RawTxError as e:
146 return {"error": str(e), "chain": "arc"}
147
148 async def anchor_thot(
149 self, *, agent_id: str, batch_cid: str, dimensions: int = 768,
150 ) -> dict:
151 """
152 Mint a THOT for a curated memory CID. Owner-gated; without mint key
153 configured this returns an explicit not-configured stub.
154
155 The full implementation is out of Phase C scope until access policy is
156 decided (deploy permissive variant vs. centralized minter). Tracked in
157 the plan as "THOT mint at scale" deferred work.
158 """
159 if not os.environ.get("THOT_MINTER_KEY"):
160 return {
161 "stub": True,
162 "agent_id": agent_id,
163 "batch_cid": batch_cid,
164 "dimensions": dimensions,
165 "note": "THOT mint disabled — set THOT_MINTER_KEY to enable",
166 }
167 # Reserved for the Phase 2 permissive-mint deploy.
168 return {
169 "stub": True,
170 "note": "THOT mint owner-gated; permissive contract deploy pending",
171 }