agents/storage/anchor.py · 171 lines · 6.2 KB · Python source
agents/storage/anchor.py · 171 lines · 6.2 KBrawGitHub
1"""
2On-chain anchoring of memory CIDs.
3 
4Two tiers:
5- **everyday tier (ARC DatasetRegistry)** — permissionless registerDataset(bytes32,string).
6  Every offload batch lands here.
7- **curated tier (THOT)** — owner-gated mintTHOT(...). Only memories that
8  pass through LTM consolidation and represent durable distilled knowledge.
9  Phase C ships THOT as a no-op stub when no owner key is configured.
10 
11Anchor receipts are emitted as `memory.anchor` catalogue events.
12"""
13 
14from __future__ import annotations
15 
16import hashlib
17import os
18from typing import Optional
19 
20from eth_utils import to_checksum_address  # type: ignore[import-not-found]
21 
22from utils.logging_config import get_logger
23 
24from .raw_tx import RawTxClient, RawTxError
25 
26logger = get_logger(__name__)
27 
28# Function selector for registerDataset(bytes32,string)
29# keccak256("registerDataset(bytes32,string)")[:4] = 0x  --- computed once below
30# We compute it lazily to avoid an import-time keccak in cold paths; cache result.
31_REGISTER_DATASET_SELECTOR: Optional[bytes] = None
32 
33 
34def _selector(signature: str) -> bytes:
35    """Compute a 4-byte function selector via keccak256."""
36    from Crypto.Hash import keccak  # type: ignore[import-not-found]
37    k = keccak.new(digest_bits=256)
38    k.update(signature.encode("utf-8"))
39    return k.digest()[:4]
40 
41 
42def _register_dataset_selector() -> bytes:
43    global _REGISTER_DATASET_SELECTOR
44    if _REGISTER_DATASET_SELECTOR is None:
45        try:
46            _REGISTER_DATASET_SELECTOR = _selector("registerDataset(bytes32,string)")
47        except Exception:
48            # Fallback: hardcoded value computed offline.
49            # keccak256("registerDataset(bytes32,string)")[:4] = 0xf1783fb8
50            _REGISTER_DATASET_SELECTOR = bytes.fromhex("f1783fb8")
51    return _REGISTER_DATASET_SELECTOR
52 
53 
54def _encode_register_dataset(dataset_id: bytes, root_cid: str) -> str:
55    """ABI-encode calldata for registerDataset(bytes32,string)."""
56    if len(dataset_id) != 32:
57        raise ValueError("dataset_id must be 32 bytes")
58    selector = _register_dataset_selector()
59    # Layout: selector || dataset_id (32B) || offset_to_string (32B = 0x40) ||
60    #         string_length (32B) || string_bytes_padded
61    cid_bytes = root_cid.encode("utf-8")
62    pad_len = (32 - (len(cid_bytes) % 32)) % 32
63    encoded = (
64        selector
65        + dataset_id
66        + (0x40).to_bytes(32, "big")
67        + len(cid_bytes).to_bytes(32, "big")
68        + cid_bytes
69        + (b"\x00" * pad_len)
70    )
71    return "0x" + encoded.hex()
72 
73 
74def derive_dataset_id(agent_id: str, date_str: str, cid: str) -> bytes:
75    """Stable bytes32 dataset id for a given (agent, date, CID) triple."""
76    h = hashlib.sha256(f"{agent_id}|{date_str}|{cid}".encode("utf-8")).digest()
77    return h  # already 32 bytes
78 
79 
80class AnchorClient:
81    """
82    Wraps a RawTxClient with mindX-specific anchor methods.
83 
84    Configure either with explicit args or via env:
85      ARC_RPC_URL, ARC_CHAIN_ID, ARC_REGISTRY_ADDRESS, MEMORY_ANCHOR_TREASURY_PK
86    """
87 
88    def __init__(
89        self,
90        *,
91        rpc_url: Optional[str] = None,
92        chain_id: Optional[int] = None,
93        registry_address: Optional[str] = None,
94        private_key: Optional[str] = None,
95    ):
96        self.rpc_url = rpc_url or os.environ.get("ARC_RPC_URL", "")
97        self.chain_id = chain_id or int(os.environ.get("ARC_CHAIN_ID", "0") or 0)
98        self.registry_address = registry_address or os.environ.get("ARC_REGISTRY_ADDRESS", "")
99        self.private_key = private_key or os.environ.get("MEMORY_ANCHOR_TREASURY_PK", "")
100        self._client: Optional[RawTxClient] = None
101 
102    @property
103    def configured(self) -> bool:
104        return bool(self.rpc_url and self.chain_id and self.registry_address and self.private_key)
105 
106    async def _ensure_client(self) -> RawTxClient:
107        if self._client is None:
108            if not self.configured:
109                raise RawTxError(
110                    "AnchorClient not configured: need ARC_RPC_URL, ARC_CHAIN_ID, "
111                    "ARC_REGISTRY_ADDRESS, MEMORY_ANCHOR_TREASURY_PK"
112                )
113            self._client = RawTxClient(
114                rpc_url=self.rpc_url,
115                chain_id=self.chain_id,
116                private_key=self.private_key,
117            )
118        return self._client
119 
120    async def close(self) -> None:
121        if self._client is not None:
122            await self._client.close()
123 
124    async def anchor_dataset_registry(
125        self, *, agent_id: str, date_str: str, cid: str,
126    ) -> dict:
127        """
128        Register the (agent_id, date_str, cid) tuple on the ARC DatasetRegistry.
129        Returns receipt dict {tx_hash, dataset_id_hex, status} or {error} on failure.
130        """
131        client = await self._ensure_client()
132        dataset_id = derive_dataset_id(agent_id, date_str, cid)
133        try:
134            data = _encode_register_dataset(dataset_id, cid)
135            tx_hash = await client.send_tx(
136                to=to_checksum_address(self.registry_address),
137                data=data,
138            )
139            return {
140                "tx_hash": tx_hash,
141                "dataset_id_hex": "0x" + dataset_id.hex(),
142                "chain": "arc",
143                "registry": self.registry_address,
144            }
145        except RawTxError as e:
146            return {"error": str(e), "chain": "arc"}
147 
148    async def anchor_thot(
149        self, *, agent_id: str, batch_cid: str, dimensions: int = 768,
150    ) -> dict:
151        """
152        Mint a THOT for a curated memory CID. Owner-gated; without mint key
153        configured this returns an explicit not-configured stub.
154 
155        The full implementation is out of Phase C scope until access policy is
156        decided (deploy permissive variant vs. centralized minter). Tracked in
157        the plan as "THOT mint at scale" deferred work.
158        """
159        if not os.environ.get("THOT_MINTER_KEY"):
160            return {
161                "stub": True,
162                "agent_id": agent_id,
163                "batch_cid": batch_cid,
164                "dimensions": dimensions,
165                "note": "THOT mint disabled — set THOT_MINTER_KEY to enable",
166            }
167        # Reserved for the Phase 2 permissive-mint deploy.
168        return {
169            "stub": True,
170            "note": "THOT mint owner-gated; permissive contract deploy pending",
171        }

All DocumentsBook of mindXAPI Reference