mindx_backend_service/x402_middleware.py · 424 lines · 16.2 KB · Python source
1"""
2I am the x402 paywall.
3
4When a caller hits a cost-bearing endpoint without a valid payment header,
5I construct a triple-rail 402 envelope (Base USDC, Tempo USDC.e, Algorand
6USDC ASA) and return it. When a caller presents a valid X-PAYMENT header,
7I verify the settlement with the configured facilitator (or accept a
8syntactically-valid stub in dev mode) and let the request through.
9
10Logged-in callers get a free-quota allowance (10 calls per 24h rolling
11window) before x402 kicks in. Anonymous callers get 0 free calls.
12
13For the protocol contract see ``docs/services/x402_as_a_service.md``.
14
15The middleware is a FastAPI ``Depends`` factory:
16
17 @app.post("/coordinator/query", dependencies=[Depends(x402_required("/coordinator/query"))])
18 async def coordinator_query(...): ...
19
20The dependency:
21 1. Reads the request's wallet address from the session (X-Session-Token).
22 2. Looks up the per-endpoint price from data/config/x402_pricing.json.
23 3. If the caller has free quota remaining → records the call + lets it through.
24 4. Else if the caller presented a valid X-PAYMENT header → verifies + records.
25 5. Else → raises HTTPException(402, detail=<triple-rail envelope>).
26
27Records every settlement to ``data/governance/free_quota_ledger.json`` and
28mirrors a ``payment.x402.settled`` catalogue event (best-effort).
29"""
30from __future__ import annotations
31
32import base64
33import json
34import logging
35import os
36import time
37from pathlib import Path
38from typing import Any, Callable, Dict, List, Optional, Tuple
39
40from fastapi import HTTPException, Request
41
42logger = logging.getLogger(__name__)
43
44
45# ─── Config loader (hot-reloadable) ──────────────────────────────────────
46
47
48_PROJECT_ROOT = Path(__file__).resolve().parent.parent
49_PRICING_PATH = _PROJECT_ROOT / "data" / "config" / "x402_pricing.json"
50_QUOTA_LEDGER_PATH = _PROJECT_ROOT / "data" / "governance" / "free_quota_ledger.json"
51
52_pricing_cache: Dict[str, Any] = {}
53_pricing_loaded_at: float = 0.0
54
55
56def _load_pricing(force: bool = False) -> Dict[str, Any]:
57 """Read pricing config; reload if the file is newer than what we have.
58
59 Reads the JSON on first call and whenever the file's mtime is newer than
60 the cached load timestamp. Hot-reload contract documented in
61 ``docs/services/x402_as_a_service.md`` §6.
62 """
63 global _pricing_cache, _pricing_loaded_at
64 try:
65 mtime = _PRICING_PATH.stat().st_mtime
66 except OSError:
67 return _pricing_cache
68 if not force and _pricing_cache and mtime <= _pricing_loaded_at:
69 return _pricing_cache
70 try:
71 with _PRICING_PATH.open("r", encoding="utf-8") as fh:
72 _pricing_cache = json.load(fh)
73 _pricing_loaded_at = mtime
74 except Exception as exc:
75 logger.warning(f"x402: failed to load pricing config: {exc}")
76 return _pricing_cache
77
78
79# ─── Free-quota ledger (per-wallet 24h rolling window) ───────────────────
80
81
82def _load_quota_ledger() -> Dict[str, List[float]]:
83 """Load the per-wallet quota ledger.
84
85 Shape: ``{ "<wallet_lower>": [<unix_ts>, ...] }`` — a list of timestamps,
86 each representing one free-quota call within the last 24h. Entries older
87 than 24h are pruned on every read.
88 """
89 if not _QUOTA_LEDGER_PATH.exists():
90 return {}
91 try:
92 with _QUOTA_LEDGER_PATH.open("r", encoding="utf-8") as fh:
93 raw = json.load(fh)
94 except Exception:
95 return {}
96 if not isinstance(raw, dict):
97 return {}
98 cutoff = time.time() - 24 * 3600
99 pruned: Dict[str, List[float]] = {}
100 for wallet, ts_list in raw.items():
101 if not isinstance(ts_list, list):
102 continue
103 kept = [float(t) for t in ts_list if isinstance(t, (int, float)) and float(t) >= cutoff]
104 if kept:
105 pruned[wallet] = kept
106 return pruned
107
108
109def _save_quota_ledger(ledger: Dict[str, List[float]]) -> None:
110 try:
111 _QUOTA_LEDGER_PATH.parent.mkdir(parents=True, exist_ok=True)
112 tmp = _QUOTA_LEDGER_PATH.with_suffix(".json.tmp")
113 with tmp.open("w", encoding="utf-8") as fh:
114 json.dump(ledger, fh, indent=2)
115 tmp.replace(_QUOTA_LEDGER_PATH)
116 except Exception as exc:
117 logger.warning(f"x402: failed to persist quota ledger: {exc}")
118
119
120def _quota_status(wallet: str) -> Tuple[int, int]:
121 """Return ``(used_in_window, limit)`` for ``wallet`` (lowercased).
122
123 ``limit`` is taken from pricing config. For anonymous callers
124 (empty/None wallet), the anonymous limit applies (0 by default).
125 """
126 cfg = _load_pricing()
127 fq = cfg.get("free_quota", {}) if isinstance(cfg, dict) else {}
128 if not wallet or wallet == "anonymous":
129 limit = int(fq.get("anonymous_calls_per_24h", 0))
130 else:
131 limit = int(fq.get("calls_per_24h", 10))
132 ledger = _load_quota_ledger()
133 used = len(ledger.get((wallet or "").lower(), []))
134 return used, limit
135
136
137def _record_quota_use(wallet: str) -> None:
138 if not wallet:
139 return
140 ledger = _load_quota_ledger()
141 key = wallet.lower()
142 ledger.setdefault(key, []).append(time.time())
143 _save_quota_ledger(ledger)
144
145
146# ─── Settlement verification ─────────────────────────────────────────────
147
148
149_settlement_cache: Dict[str, Tuple[float, Dict[str, Any]]] = {}
150
151
152def _settlement_cache_ttl() -> int:
153 cfg = _load_pricing()
154 ido = cfg.get("idempotency", {}) if isinstance(cfg, dict) else {}
155 return int(ido.get("settlement_cache_ttl_seconds", 60))
156
157
158def _verify_x_payment(header_value: str, endpoint_id: str, max_amount: int) -> Dict[str, Any]:
159 """Verify a base64-encoded X-PAYMENT envelope.
160
161 The full contract is in ``docs/services/x402_as_a_service.md`` §3. This
162 implementation does *syntactic* verification on every request and defers
163 *cryptographic* verification to the facilitator at the URL configured in
164 pricing config.
165
166 In test / dev mode (``MINDX_X402_TEST_MODE=1``), the function accepts any
167 syntactically-valid envelope and returns a stub success — the receipt
168 contains ``tx_hash="0xtest"`` to make the test path observable.
169
170 Returns the verified settlement record (with ``tx_hash``, ``rail``,
171 ``amount_microusd``) on success. Raises ``HTTPException(402)`` on failure
172 so the caller falls back to the standard 402 path.
173 """
174 try:
175 decoded = base64.b64decode(header_value).decode("utf-8")
176 env = json.loads(decoded)
177 except Exception as exc:
178 raise HTTPException(status_code=402, detail={
179 "code": "x402_malformed_payment",
180 "reason": f"could not decode X-PAYMENT: {exc}",
181 })
182
183 if not isinstance(env, dict):
184 raise HTTPException(status_code=402, detail={"code": "x402_malformed_payment"})
185
186 scheme = env.get("scheme")
187 network = env.get("network")
188 payload = env.get("payload") or {}
189 if scheme != "exact" or not network or not isinstance(payload, dict):
190 raise HTTPException(status_code=402, detail={
191 "code": "x402_malformed_payment",
192 "reason": "envelope must have scheme='exact', network, payload",
193 })
194
195 # Idempotency cache: a verified settlement is honored for ~60s on retry.
196 cache_key = f"{network}:{json.dumps(payload, sort_keys=True)[:256]}"
197 now = time.time()
198 ttl = _settlement_cache_ttl()
199 cached = _settlement_cache.get(cache_key)
200 if cached and (now - cached[0]) < ttl:
201 return cached[1]
202
203 test_mode = os.environ.get("MINDX_X402_TEST_MODE", "0").strip() == "1"
204 if test_mode:
205 record = {
206 "rail": network,
207 "tx_hash": "0xtest",
208 "amount_microusd": max_amount,
209 "verified_at": now,
210 "facilitator": "test-stub",
211 }
212 _settlement_cache[cache_key] = (now, record)
213 return record
214
215 # Production path: call the facilitator's /verify endpoint.
216 cfg = _load_pricing()
217 fac = (cfg.get("facilitator") or {}).get("url") if isinstance(cfg, dict) else None
218 if not fac:
219 raise HTTPException(status_code=503, detail={
220 "code": "x402_facilitator_not_configured",
221 "reason": "facilitator URL missing from x402_pricing.json",
222 })
223
224 try:
225 import httpx
226 with httpx.Client(timeout=10.0) as client:
227 resp = client.post(
228 fac.rstrip("/") + "/verify",
229 json={
230 "scheme": scheme,
231 "network": network,
232 "payload": payload,
233 "endpoint": endpoint_id,
234 "max_amount_microusd": max_amount,
235 },
236 )
237 except Exception as exc:
238 raise HTTPException(status_code=503, detail={
239 "code": "x402_facilitator_unreachable",
240 "reason": str(exc),
241 })
242
243 if resp.status_code != 200:
244 raise HTTPException(status_code=402, detail={
245 "code": "x402_facilitator_rejected",
246 "status": resp.status_code,
247 "body": resp.text[:512],
248 })
249
250 try:
251 body = resp.json()
252 except Exception:
253 raise HTTPException(status_code=402, detail={"code": "x402_facilitator_bad_response"})
254
255 if not body.get("verified"):
256 raise HTTPException(status_code=402, detail={
257 "code": "x402_settlement_not_verified",
258 "reason": body.get("reason", ""),
259 })
260
261 record = {
262 "rail": network,
263 "tx_hash": body.get("txHash", ""),
264 "amount_microusd": int(body.get("amount", max_amount)),
265 "verified_at": now,
266 "facilitator": fac,
267 }
268 _settlement_cache[cache_key] = (now, record)
269 return record
270
271
272# ─── 402 envelope builder ────────────────────────────────────────────────
273
274
275def _build_402_envelope(endpoint_id: str, max_amount: int) -> Dict[str, Any]:
276 """Construct the triple-rail 402 envelope per the spec.
277
278 The endpoint_id is used to fill in the ``resource`` field; max_amount
279 overrides the rail's default amount when present.
280 """
281 cfg = _load_pricing()
282 rails_cfg = cfg.get("rails", {}) if isinstance(cfg, dict) else {}
283 requirements: List[Dict[str, Any]] = []
284 for name, rail in rails_cfg.items():
285 if not isinstance(rail, dict):
286 continue
287 # Skip rails whose payTo is empty / zero (rail advertised but not settling yet).
288 pay_to = str(rail.get("payTo", "")).strip()
289 if not pay_to or pay_to == "0x0000000000000000000000000000000000000000":
290 continue
291 requirements.append({
292 "scheme": rail.get("scheme", "exact"),
293 "network": rail.get("network", name),
294 "asset": rail.get("asset", ""),
295 "maxAmountRequired": str(max_amount),
296 "payTo": pay_to,
297 "resource": endpoint_id,
298 "description": rail.get("_comment", ""),
299 "mimeType": "application/json",
300 "extra": rail.get("extra", {}),
301 })
302
303 return {
304 "code": "x402_payment_required",
305 "message": "This endpoint requires payment. Settle on any of the offered rails and re-submit with X-PAYMENT.",
306 "endpoint": endpoint_id,
307 "paymentRequirements": requirements,
308 "x402Version": 1,
309 "_note": "See docs/services/x402_as_a_service.md for the protocol contract.",
310 }
311
312
313# ─── Catalogue mirror ────────────────────────────────────────────────────
314
315
316async def _emit_settlement_event(wallet: str, endpoint_id: str, record: Dict[str, Any]) -> None:
317 try:
318 from agents.catalogue.events import emit_catalogue_event
319 await emit_catalogue_event(
320 kind="payment.x402.settled",
321 actor="mindx.gateway",
322 payload={
323 "endpoint": endpoint_id,
324 "wallet": wallet,
325 "rail": record.get("rail"),
326 "amount_microusd": record.get("amount_microusd"),
327 "tx_hash": record.get("tx_hash"),
328 "facilitator": record.get("facilitator"),
329 },
330 source_log="mindx_backend_service.x402_middleware",
331 )
332 except Exception:
333 # Catalogue write failure must NEVER break a paid request.
334 pass
335
336
337async def _emit_free_quota_event(wallet: str, endpoint_id: str, used: int, limit: int) -> None:
338 try:
339 from agents.catalogue.events import emit_catalogue_event
340 await emit_catalogue_event(
341 kind="payment.x402.free_quota",
342 actor="mindx.gateway",
343 payload={
344 "endpoint": endpoint_id,
345 "wallet": wallet,
346 "used_in_window": used + 1,
347 "limit": limit,
348 },
349 source_log="mindx_backend_service.x402_middleware",
350 )
351 except Exception:
352 pass
353
354
355# ─── Session inspection ──────────────────────────────────────────────────
356
357
358def _wallet_from_request(request: Request) -> Optional[str]:
359 """Return the lowercase wallet address from the session, or None."""
360 token = request.headers.get("X-Session-Token")
361 if not token:
362 return None
363 try:
364 # Lazy import to avoid a hard dependency on the vault during tests.
365 from mindx_backend_service.bankon_vault import get_vault_manager
366 vault = get_vault_manager()
367 session = vault.get_user_session(token)
368 if session and session.get("wallet_address"):
369 return str(session["wallet_address"]).lower()
370 except Exception:
371 return None
372 return None
373
374
375# ─── The decorator-style factory ─────────────────────────────────────────
376
377
378def x402_required(endpoint_id: str, max_amount_microusd: Optional[int] = None) -> Callable:
379 """Return a FastAPI dependency that enforces x402 on the decorated route.
380
381 The dependency:
382 1. Looks up the wallet from X-Session-Token.
383 2. If wallet has free quota remaining → records and allows.
384 3. Else if X-PAYMENT present and verifies → records and allows.
385 4. Else → raises HTTPException(402) with the triple-rail envelope.
386
387 The ``endpoint_id`` is the canonical path string used in pricing config
388 and catalogue events (e.g. "/coordinator/query"). When the path contains
389 a path parameter like ``{agent_id}``, pass the templated form
390 (``/agents/{agent_id}/evolve``) — the middleware uses it as a key, not
391 a route match.
392 """
393 async def _dep(request: Request) -> Dict[str, Any]:
394 cfg = _load_pricing()
395 endpoints = cfg.get("endpoints", {}) if isinstance(cfg, dict) else {}
396 rule = endpoints.get(endpoint_id, {})
397 amount = int(rule.get("max_amount_microusd", max_amount_microusd or 2000))
398
399 wallet = _wallet_from_request(request) or "anonymous"
400 used, limit = _quota_status(wallet)
401
402 if used < limit:
403 _record_quota_use(wallet)
404 await _emit_free_quota_event(wallet, endpoint_id, used, limit)
405 return {"path": "free_quota", "wallet": wallet, "used": used + 1, "limit": limit}
406
407 x_payment = request.headers.get("X-PAYMENT")
408 if x_payment:
409 record = _verify_x_payment(x_payment, endpoint_id, amount)
410 await _emit_settlement_event(wallet, endpoint_id, record)
411 return {"path": "x402_settled", "wallet": wallet, **record}
412
413 envelope = _build_402_envelope(endpoint_id, amount)
414 if not envelope["paymentRequirements"]:
415 # No rails currently settling → upgrade to 503 so the caller knows
416 # the operator hasn't finished configuring x402 yet.
417 raise HTTPException(status_code=503, detail={
418 "code": "x402_no_rails_configured",
419 "reason": "No x402 rails have a payTo address yet. Operator must update data/config/x402_pricing.json.",
420 })
421 raise HTTPException(status_code=402, detail=envelope)
422
423 _dep.__name__ = f"x402_required_for_{endpoint_id.strip('/').replace('/', '_').replace('{', '').replace('}', '')}"
424 return _dep