mindx_backend_service/x402_middleware.py · 424 lines · 16.2 KB · Python source
mindx_backend_service/x402_middleware.py · 424 lines · 16.2 KBrawGitHub
1"""
2I am the x402 paywall.
3 
4When a caller hits a cost-bearing endpoint without a valid payment header,
5I construct a triple-rail 402 envelope (Base USDC, Tempo USDC.e, Algorand
6USDC ASA) and return it. When a caller presents a valid X-PAYMENT header,
7I verify the settlement with the configured facilitator (or accept a
8syntactically-valid stub in dev mode) and let the request through.
9 
10Logged-in callers get a free-quota allowance (10 calls per 24h rolling
11window) before x402 kicks in. Anonymous callers get 0 free calls.
12 
13For the protocol contract see ``docs/services/x402_as_a_service.md``.
14 
15The middleware is a FastAPI ``Depends`` factory:
16 
17    @app.post("/coordinator/query", dependencies=[Depends(x402_required("/coordinator/query"))])
18    async def coordinator_query(...): ...
19 
20The dependency:
21  1. Reads the request's wallet address from the session (X-Session-Token).
22  2. Looks up the per-endpoint price from data/config/x402_pricing.json.
23  3. If the caller has free quota remaining → records the call + lets it through.
24  4. Else if the caller presented a valid X-PAYMENT header → verifies + records.
25  5. Else → raises HTTPException(402, detail=<triple-rail envelope>).
26 
27Records every settlement to ``data/governance/free_quota_ledger.json`` and
28mirrors a ``payment.x402.settled`` catalogue event (best-effort).
29"""
30from __future__ import annotations
31 
32import base64
33import json
34import logging
35import os
36import time
37from pathlib import Path
38from typing import Any, Callable, Dict, List, Optional, Tuple
39 
40from fastapi import HTTPException, Request
41 
42logger = logging.getLogger(__name__)
43 
44 
45# ─── Config loader (hot-reloadable) ──────────────────────────────────────
46 
47 
48_PROJECT_ROOT = Path(__file__).resolve().parent.parent
49_PRICING_PATH = _PROJECT_ROOT / "data" / "config" / "x402_pricing.json"
50_QUOTA_LEDGER_PATH = _PROJECT_ROOT / "data" / "governance" / "free_quota_ledger.json"
51 
52_pricing_cache: Dict[str, Any] = {}
53_pricing_loaded_at: float = 0.0
54 
55 
56def _load_pricing(force: bool = False) -> Dict[str, Any]:
57    """Read pricing config; reload if the file is newer than what we have.
58 
59    Reads the JSON on first call and whenever the file's mtime is newer than
60    the cached load timestamp. Hot-reload contract documented in
61    ``docs/services/x402_as_a_service.md`` §6.
62    """
63    global _pricing_cache, _pricing_loaded_at
64    try:
65        mtime = _PRICING_PATH.stat().st_mtime
66    except OSError:
67        return _pricing_cache
68    if not force and _pricing_cache and mtime <= _pricing_loaded_at:
69        return _pricing_cache
70    try:
71        with _PRICING_PATH.open("r", encoding="utf-8") as fh:
72            _pricing_cache = json.load(fh)
73        _pricing_loaded_at = mtime
74    except Exception as exc:
75        logger.warning(f"x402: failed to load pricing config: {exc}")
76    return _pricing_cache
77 
78 
79# ─── Free-quota ledger (per-wallet 24h rolling window) ───────────────────
80 
81 
82def _load_quota_ledger() -> Dict[str, List[float]]:
83    """Load the per-wallet quota ledger.
84 
85    Shape: ``{ "<wallet_lower>": [<unix_ts>, ...] }`` — a list of timestamps,
86    each representing one free-quota call within the last 24h. Entries older
87    than 24h are pruned on every read.
88    """
89    if not _QUOTA_LEDGER_PATH.exists():
90        return {}
91    try:
92        with _QUOTA_LEDGER_PATH.open("r", encoding="utf-8") as fh:
93            raw = json.load(fh)
94    except Exception:
95        return {}
96    if not isinstance(raw, dict):
97        return {}
98    cutoff = time.time() - 24 * 3600
99    pruned: Dict[str, List[float]] = {}
100    for wallet, ts_list in raw.items():
101        if not isinstance(ts_list, list):
102            continue
103        kept = [float(t) for t in ts_list if isinstance(t, (int, float)) and float(t) >= cutoff]
104        if kept:
105            pruned[wallet] = kept
106    return pruned
107 
108 
109def _save_quota_ledger(ledger: Dict[str, List[float]]) -> None:
110    try:
111        _QUOTA_LEDGER_PATH.parent.mkdir(parents=True, exist_ok=True)
112        tmp = _QUOTA_LEDGER_PATH.with_suffix(".json.tmp")
113        with tmp.open("w", encoding="utf-8") as fh:
114            json.dump(ledger, fh, indent=2)
115        tmp.replace(_QUOTA_LEDGER_PATH)
116    except Exception as exc:
117        logger.warning(f"x402: failed to persist quota ledger: {exc}")
118 
119 
120def _quota_status(wallet: str) -> Tuple[int, int]:
121    """Return ``(used_in_window, limit)`` for ``wallet`` (lowercased).
122 
123    ``limit`` is taken from pricing config. For anonymous callers
124    (empty/None wallet), the anonymous limit applies (0 by default).
125    """
126    cfg = _load_pricing()
127    fq = cfg.get("free_quota", {}) if isinstance(cfg, dict) else {}
128    if not wallet or wallet == "anonymous":
129        limit = int(fq.get("anonymous_calls_per_24h", 0))
130    else:
131        limit = int(fq.get("calls_per_24h", 10))
132    ledger = _load_quota_ledger()
133    used = len(ledger.get((wallet or "").lower(), []))
134    return used, limit
135 
136 
137def _record_quota_use(wallet: str) -> None:
138    if not wallet:
139        return
140    ledger = _load_quota_ledger()
141    key = wallet.lower()
142    ledger.setdefault(key, []).append(time.time())
143    _save_quota_ledger(ledger)
144 
145 
146# ─── Settlement verification ─────────────────────────────────────────────
147 
148 
149_settlement_cache: Dict[str, Tuple[float, Dict[str, Any]]] = {}
150 
151 
152def _settlement_cache_ttl() -> int:
153    cfg = _load_pricing()
154    ido = cfg.get("idempotency", {}) if isinstance(cfg, dict) else {}
155    return int(ido.get("settlement_cache_ttl_seconds", 60))
156 
157 
158def _verify_x_payment(header_value: str, endpoint_id: str, max_amount: int) -> Dict[str, Any]:
159    """Verify a base64-encoded X-PAYMENT envelope.
160 
161    The full contract is in ``docs/services/x402_as_a_service.md`` §3. This
162    implementation does *syntactic* verification on every request and defers
163    *cryptographic* verification to the facilitator at the URL configured in
164    pricing config.
165 
166    In test / dev mode (``MINDX_X402_TEST_MODE=1``), the function accepts any
167    syntactically-valid envelope and returns a stub success — the receipt
168    contains ``tx_hash="0xtest"`` to make the test path observable.
169 
170    Returns the verified settlement record (with ``tx_hash``, ``rail``,
171    ``amount_microusd``) on success. Raises ``HTTPException(402)`` on failure
172    so the caller falls back to the standard 402 path.
173    """
174    try:
175        decoded = base64.b64decode(header_value).decode("utf-8")
176        env = json.loads(decoded)
177    except Exception as exc:
178        raise HTTPException(status_code=402, detail={
179            "code": "x402_malformed_payment",
180            "reason": f"could not decode X-PAYMENT: {exc}",
181        })
182 
183    if not isinstance(env, dict):
184        raise HTTPException(status_code=402, detail={"code": "x402_malformed_payment"})
185 
186    scheme = env.get("scheme")
187    network = env.get("network")
188    payload = env.get("payload") or {}
189    if scheme != "exact" or not network or not isinstance(payload, dict):
190        raise HTTPException(status_code=402, detail={
191            "code": "x402_malformed_payment",
192            "reason": "envelope must have scheme='exact', network, payload",
193        })
194 
195    # Idempotency cache: a verified settlement is honored for ~60s on retry.
196    cache_key = f"{network}:{json.dumps(payload, sort_keys=True)[:256]}"
197    now = time.time()
198    ttl = _settlement_cache_ttl()
199    cached = _settlement_cache.get(cache_key)
200    if cached and (now - cached[0]) < ttl:
201        return cached[1]
202 
203    test_mode = os.environ.get("MINDX_X402_TEST_MODE", "0").strip() == "1"
204    if test_mode:
205        record = {
206            "rail": network,
207            "tx_hash": "0xtest",
208            "amount_microusd": max_amount,
209            "verified_at": now,
210            "facilitator": "test-stub",
211        }
212        _settlement_cache[cache_key] = (now, record)
213        return record
214 
215    # Production path: call the facilitator's /verify endpoint.
216    cfg = _load_pricing()
217    fac = (cfg.get("facilitator") or {}).get("url") if isinstance(cfg, dict) else None
218    if not fac:
219        raise HTTPException(status_code=503, detail={
220            "code": "x402_facilitator_not_configured",
221            "reason": "facilitator URL missing from x402_pricing.json",
222        })
223 
224    try:
225        import httpx
226        with httpx.Client(timeout=10.0) as client:
227            resp = client.post(
228                fac.rstrip("/") + "/verify",
229                json={
230                    "scheme": scheme,
231                    "network": network,
232                    "payload": payload,
233                    "endpoint": endpoint_id,
234                    "max_amount_microusd": max_amount,
235                },
236            )
237    except Exception as exc:
238        raise HTTPException(status_code=503, detail={
239            "code": "x402_facilitator_unreachable",
240            "reason": str(exc),
241        })
242 
243    if resp.status_code != 200:
244        raise HTTPException(status_code=402, detail={
245            "code": "x402_facilitator_rejected",
246            "status": resp.status_code,
247            "body": resp.text[:512],
248        })
249 
250    try:
251        body = resp.json()
252    except Exception:
253        raise HTTPException(status_code=402, detail={"code": "x402_facilitator_bad_response"})
254 
255    if not body.get("verified"):
256        raise HTTPException(status_code=402, detail={
257            "code": "x402_settlement_not_verified",
258            "reason": body.get("reason", ""),
259        })
260 
261    record = {
262        "rail": network,
263        "tx_hash": body.get("txHash", ""),
264        "amount_microusd": int(body.get("amount", max_amount)),
265        "verified_at": now,
266        "facilitator": fac,
267    }
268    _settlement_cache[cache_key] = (now, record)
269    return record
270 
271 
272# ─── 402 envelope builder ────────────────────────────────────────────────
273 
274 
275def _build_402_envelope(endpoint_id: str, max_amount: int) -> Dict[str, Any]:
276    """Construct the triple-rail 402 envelope per the spec.
277 
278    The endpoint_id is used to fill in the ``resource`` field; max_amount
279    overrides the rail's default amount when present.
280    """
281    cfg = _load_pricing()
282    rails_cfg = cfg.get("rails", {}) if isinstance(cfg, dict) else {}
283    requirements: List[Dict[str, Any]] = []
284    for name, rail in rails_cfg.items():
285        if not isinstance(rail, dict):
286            continue
287        # Skip rails whose payTo is empty / zero (rail advertised but not settling yet).
288        pay_to = str(rail.get("payTo", "")).strip()
289        if not pay_to or pay_to == "0x0000000000000000000000000000000000000000":
290            continue
291        requirements.append({
292            "scheme": rail.get("scheme", "exact"),
293            "network": rail.get("network", name),
294            "asset": rail.get("asset", ""),
295            "maxAmountRequired": str(max_amount),
296            "payTo": pay_to,
297            "resource": endpoint_id,
298            "description": rail.get("_comment", ""),
299            "mimeType": "application/json",
300            "extra": rail.get("extra", {}),
301        })
302 
303    return {
304        "code": "x402_payment_required",
305        "message": "This endpoint requires payment. Settle on any of the offered rails and re-submit with X-PAYMENT.",
306        "endpoint": endpoint_id,
307        "paymentRequirements": requirements,
308        "x402Version": 1,
309        "_note": "See docs/services/x402_as_a_service.md for the protocol contract.",
310    }
311 
312 
313# ─── Catalogue mirror ────────────────────────────────────────────────────
314 
315 
316async def _emit_settlement_event(wallet: str, endpoint_id: str, record: Dict[str, Any]) -> None:
317    try:
318        from agents.catalogue.events import emit_catalogue_event
319        await emit_catalogue_event(
320            kind="payment.x402.settled",
321            actor="mindx.gateway",
322            payload={
323                "endpoint": endpoint_id,
324                "wallet": wallet,
325                "rail": record.get("rail"),
326                "amount_microusd": record.get("amount_microusd"),
327                "tx_hash": record.get("tx_hash"),
328                "facilitator": record.get("facilitator"),
329            },
330            source_log="mindx_backend_service.x402_middleware",
331        )
332    except Exception:
333        # Catalogue write failure must NEVER break a paid request.
334        pass
335 
336 
337async def _emit_free_quota_event(wallet: str, endpoint_id: str, used: int, limit: int) -> None:
338    try:
339        from agents.catalogue.events import emit_catalogue_event
340        await emit_catalogue_event(
341            kind="payment.x402.free_quota",
342            actor="mindx.gateway",
343            payload={
344                "endpoint": endpoint_id,
345                "wallet": wallet,
346                "used_in_window": used + 1,
347                "limit": limit,
348            },
349            source_log="mindx_backend_service.x402_middleware",
350        )
351    except Exception:
352        pass
353 
354 
355# ─── Session inspection ──────────────────────────────────────────────────
356 
357 
358def _wallet_from_request(request: Request) -> Optional[str]:
359    """Return the lowercase wallet address from the session, or None."""
360    token = request.headers.get("X-Session-Token")
361    if not token:
362        return None
363    try:
364        # Lazy import to avoid a hard dependency on the vault during tests.
365        from mindx_backend_service.bankon_vault import get_vault_manager
366        vault = get_vault_manager()
367        session = vault.get_user_session(token)
368        if session and session.get("wallet_address"):
369            return str(session["wallet_address"]).lower()
370    except Exception:
371        return None
372    return None
373 
374 
375# ─── The decorator-style factory ─────────────────────────────────────────
376 
377 
378def x402_required(endpoint_id: str, max_amount_microusd: Optional[int] = None) -> Callable:
379    """Return a FastAPI dependency that enforces x402 on the decorated route.
380 
381    The dependency:
382      1. Looks up the wallet from X-Session-Token.
383      2. If wallet has free quota remaining → records and allows.
384      3. Else if X-PAYMENT present and verifies → records and allows.
385      4. Else → raises HTTPException(402) with the triple-rail envelope.
386 
387    The ``endpoint_id`` is the canonical path string used in pricing config
388    and catalogue events (e.g. "/coordinator/query"). When the path contains
389    a path parameter like ``{agent_id}``, pass the templated form
390    (``/agents/{agent_id}/evolve``) — the middleware uses it as a key, not
391    a route match.
392    """
393    async def _dep(request: Request) -> Dict[str, Any]:
394        cfg = _load_pricing()
395        endpoints = cfg.get("endpoints", {}) if isinstance(cfg, dict) else {}
396        rule = endpoints.get(endpoint_id, {})
397        amount = int(rule.get("max_amount_microusd", max_amount_microusd or 2000))
398 
399        wallet = _wallet_from_request(request) or "anonymous"
400        used, limit = _quota_status(wallet)
401 
402        if used < limit:
403            _record_quota_use(wallet)
404            await _emit_free_quota_event(wallet, endpoint_id, used, limit)
405            return {"path": "free_quota", "wallet": wallet, "used": used + 1, "limit": limit}
406 
407        x_payment = request.headers.get("X-PAYMENT")
408        if x_payment:
409            record = _verify_x_payment(x_payment, endpoint_id, amount)
410            await _emit_settlement_event(wallet, endpoint_id, record)
411            return {"path": "x402_settled", "wallet": wallet, **record}
412 
413        envelope = _build_402_envelope(endpoint_id, amount)
414        if not envelope["paymentRequirements"]:
415            # No rails currently settling → upgrade to 503 so the caller knows
416            # the operator hasn't finished configuring x402 yet.
417            raise HTTPException(status_code=503, detail={
418                "code": "x402_no_rails_configured",
419                "reason": "No x402 rails have a payTo address yet. Operator must update data/config/x402_pricing.json.",
420            })
421        raise HTTPException(status_code=402, detail=envelope)
422 
423    _dep.__name__ = f"x402_required_for_{endpoint_id.strip('/').replace('/', '_').replace('{', '').replace('}', '')}"
424    return _dep

All DocumentsBook of mindXAPI Reference