mindx_backend_service/bankon_vault/shadow_overlord.py · 335 lines · 11.7 KB · Python source
mindx_backend_service/bankon_vault/shadow_overlord.py · 335 lines · 11.7 KBrawGitHub
1# ╔══════════════════════════════════════════════════════════════════╗
2# ║  BANKON Vault — Shadow-Overlord Auth Tier                       ║
3# ║                                                                  ║
4# ║  ECDSA-signed challenge → short-lived JWT for the admin UI;     ║
5# ║  fresh per-op signatures gate every state-changing endpoint.    ║
6# ║                                                                  ║
7# ║  See /home/hacker/.claude/plans/splendid-wishing-hejlsberg.md   ║
8# ╚══════════════════════════════════════════════════════════════════╝
9 
10from __future__ import annotations
11 
12import hmac
13import json
14import os
15import secrets
16import time
17from dataclasses import asdict, dataclass, field
18from pathlib import Path
19from typing import Any, Callable, Dict, Optional
20 
21import jwt as pyjwt
22from eth_account import Account
23from eth_account.messages import encode_defunct
24from fastapi import Depends, Header, HTTPException
25from web3 import Web3
26 
27# Allowed scopes — any verify must explicitly require one of these.
28SCOPE_AUTH = "auth"
29SCOPE_CABINET_PROVISION = "cabinet.provision"
30SCOPE_CABINET_CLEAR = "cabinet.clear"
31SCOPE_VAULT_SIGN = "vault.sign"
32SCOPE_RELEASE_KEY = "release.key"
33 
34NONCE_TTL_S = 120
35JWT_TTL_S = 300
36 
37_NONCE_PATH = Path(os.environ.get(
38    "SHADOW_NONCES_PATH",
39    "data/governance/shadow_nonces.json",
40))
41 
42 
43@dataclass
44class NonceRecord:
45    issued_at: float
46    scope: str
47    message: str
48    params: Dict[str, Any] = field(default_factory=dict)
49    consumed: bool = False
50 
51    def is_expired(self, now: float) -> bool:
52        return now - self.issued_at > NONCE_TTL_S
53 
54 
55class NonceStore:
56    """In-memory nonce store with JSONL persistence.
57 
58    Each record binds a nonce to a scope + canonical challenge message + the
59    parameters that operation will mutate. Single-use; expires in NONCE_TTL_S.
60    """
61 
62    def __init__(self, path: Path = _NONCE_PATH):
63        self._path = path
64        self._records: Dict[str, NonceRecord] = {}
65        self._load()
66 
67    def _load(self) -> None:
68        if not self._path.exists():
69            return
70        try:
71            raw = json.loads(self._path.read_text(encoding="utf-8"))
72            now = time.time()
73            for nonce, payload in raw.items():
74                rec = NonceRecord(**payload)
75                if not rec.is_expired(now) and not rec.consumed:
76                    self._records[nonce] = rec
77        except (json.JSONDecodeError, TypeError, ValueError):
78            # Corrupt file: start fresh; old nonces are unrecoverable but
79            # that just forces re-auth — no security impact.
80            self._records = {}
81 
82    def _persist(self) -> None:
83        self._path.parent.mkdir(parents=True, exist_ok=True)
84        tmp = self._path.with_suffix(self._path.suffix + ".tmp")
85        tmp.write_text(
86            json.dumps({k: asdict(v) for k, v in self._records.items()}),
87            encoding="utf-8",
88        )
89        os.replace(tmp, self._path)
90 
91    def issue(self, scope: str, message: str, params: Dict[str, Any]) -> str:
92        self._prune()
93        nonce = "0x" + secrets.token_hex(32)
94        self._records[nonce] = NonceRecord(
95            issued_at=time.time(),
96            scope=scope,
97            message=message,
98            params=params,
99        )
100        self._persist()
101        return nonce
102 
103    def lookup(self, nonce: str) -> Optional[NonceRecord]:
104        self._prune()
105        rec = self._records.get(nonce)
106        if rec is None or rec.is_expired(time.time()) or rec.consumed:
107            return None
108        return rec
109 
110    def consume(self, nonce: str) -> bool:
111        rec = self._records.get(nonce)
112        if rec is None or rec.is_expired(time.time()) or rec.consumed:
113            return False
114        rec.consumed = True
115        self._persist()
116        return True
117 
118    def _prune(self) -> None:
119        now = time.time()
120        stale = [k for k, v in self._records.items() if v.is_expired(now)]
121        for k in stale:
122            self._records.pop(k, None)
123        if stale:
124            self._persist()
125 
126 
127_store: Optional[NonceStore] = None
128 
129 
130def get_store() -> NonceStore:
131    global _store
132    if _store is None:
133        _store = NonceStore()
134    return _store
135 
136 
137def reset_store_for_tests(path: Optional[Path] = None) -> None:
138    global _store
139    _store = NonceStore(path or _NONCE_PATH)
140 
141 
142# ─── canonical challenge message construction ─────────────────────
143 
144 
145def build_challenge_message(scope: str, nonce: str, params: Dict[str, Any]) -> str:
146    """Canonical, scope-bound message text that the operator signs.
147 
148    The shape is human-readable and audit-friendly. The scope tag prevents a
149    sig issued for one operation from authorizing another.
150    """
151    lines = [f"MINDX-SHADOW-OVERLORD scope={scope}", f"nonce: {nonce}"]
152    for key in sorted(params):
153        lines.append(f"{key}: {params[key]}")
154    return "\n".join(lines)
155 
156 
157def issue_challenge(scope: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
158    """Public: issue a fresh challenge for the given scope.
159 
160    Returns {nonce, message, expires_at}. The client signs `message` and
161    posts {nonce, signature} to the corresponding op endpoint.
162    """
163    if scope not in {
164        SCOPE_AUTH,
165        SCOPE_CABINET_PROVISION,
166        SCOPE_CABINET_CLEAR,
167        SCOPE_VAULT_SIGN,
168        SCOPE_RELEASE_KEY,
169    }:
170        raise HTTPException(status_code=400, detail=f"unknown scope: {scope}")
171    params = params or {}
172    store = get_store()
173    # Pre-allocate the nonce to interpolate into the message; store *with* the message.
174    nonce = "0x" + secrets.token_hex(32)
175    message = build_challenge_message(scope, nonce, params)
176    store._records[nonce] = NonceRecord(
177        issued_at=time.time(), scope=scope, message=message, params=params,
178    )
179    store._persist()
180    return {
181        "nonce": nonce,
182        "message": message,
183        "expires_at": int(time.time()) + NONCE_TTL_S,
184    }
185 
186 
187# ─── signature verification ───────────────────────────────────────
188 
189 
190def _shadow_address() -> str:
191    addr = os.environ.get("SHADOW_OVERLORD_ADDRESS", "").strip()
192    if not addr:
193        raise HTTPException(
194            status_code=503,
195            detail="SHADOW_OVERLORD_ADDRESS not configured on this server",
196        )
197    return Web3.to_checksum_address(addr)
198 
199 
200def recover_signer(message: str, signature: str) -> str:
201    try:
202        sig = signature if signature.startswith("0x") else "0x" + signature
203        recovered = Account.recover_message(
204            encode_defunct(text=message),
205            signature=sig,
206        )
207        return Web3.to_checksum_address(recovered)
208    except Exception as e:
209        raise HTTPException(status_code=401, detail=f"invalid signature: {e}")
210 
211 
212def verify_shadow_signature(message: str, signature: str) -> str:
213    """Recover the signer of `message` and assert it is the shadow-overlord.
214 
215    Returns the recovered checksum address. Raises HTTPException on mismatch.
216    """
217    recovered = recover_signer(message, signature)
218    expected = _shadow_address()
219    if not hmac.compare_digest(recovered.lower(), expected.lower()):
220        raise HTTPException(status_code=403, detail="not shadow-overlord")
221    return recovered
222 
223 
224def consume_signed_challenge(
225    nonce: str,
226    signature: str,
227    expected_scope: str,
228    expected_params: Optional[Dict[str, Any]] = None,
229) -> Dict[str, Any]:
230    """Validate a one-time signed challenge.
231 
232    Steps:
233      1. Lookup nonce in store; reject if missing/expired/consumed.
234      2. Confirm scope == expected_scope.
235      3. If expected_params supplied, every key/value must match.
236      4. Recover signer from the *stored* message; confirm == shadow-overlord.
237      5. Mark nonce consumed (single-use).
238 
239    Returns the NonceRecord's params on success (caller can rely on them).
240    """
241    store = get_store()
242    rec = store.lookup(nonce)
243    if rec is None:
244        raise HTTPException(status_code=409, detail="nonce expired, unknown, or already consumed")
245    if rec.scope != expected_scope:
246        raise HTTPException(
247            status_code=403,
248            detail=f"scope mismatch: nonce was issued for {rec.scope!r}, this endpoint requires {expected_scope!r}",
249        )
250    if expected_params is not None:
251        for k, v in expected_params.items():
252            if rec.params.get(k) != v:
253                raise HTTPException(
254                    status_code=400,
255                    detail=f"params mismatch on {k!r}",
256                )
257    verify_shadow_signature(rec.message, signature)
258    if not store.consume(nonce):  # race-safe re-check
259        raise HTTPException(status_code=409, detail="nonce was consumed concurrently")
260    return dict(rec.params)
261 
262 
263# ─── JWT issue / verify ───────────────────────────────────────────
264 
265 
266def _jwt_secret() -> str:
267    secret = os.environ.get("SHADOW_JWT_SECRET", "")
268    if len(secret) < 32:
269        raise HTTPException(
270            status_code=503,
271            detail="SHADOW_JWT_SECRET not configured (need 32+ chars)",
272        )
273    return secret
274 
275 
276def issue_jwt(addr: str, scope: str = SCOPE_AUTH, jti: Optional[str] = None) -> Dict[str, Any]:
277    now = int(time.time())
278    claims = {
279        "sub": addr,
280        "scope": scope,
281        "jti": jti or secrets.token_hex(16),
282        "iat": now,
283        "exp": now + JWT_TTL_S,
284    }
285    token = pyjwt.encode(claims, _jwt_secret(), algorithm="HS256")
286    return {"jwt": token, "exp": claims["exp"]}
287 
288 
289def verify_jwt(token: str, required_scope: Optional[str] = None) -> Dict[str, Any]:
290    try:
291        claims = pyjwt.decode(token, _jwt_secret(), algorithms=["HS256"])
292    except pyjwt.ExpiredSignatureError:
293        raise HTTPException(status_code=401, detail="jwt expired")
294    except pyjwt.InvalidTokenError as e:
295        raise HTTPException(status_code=401, detail=f"invalid jwt: {e}")
296    sub = claims.get("sub", "")
297    if not hmac.compare_digest(sub.lower(), _shadow_address().lower()):
298        raise HTTPException(status_code=403, detail="jwt subject is not shadow-overlord")
299    if required_scope is not None and claims.get("scope") != required_scope:
300        raise HTTPException(status_code=403, detail="jwt scope mismatch")
301    return claims
302 
303 
304def require_shadow_jwt(required_scope: Optional[str] = None) -> Callable:
305    """FastAPI dep factory. Enforces a valid Bearer JWT, optionally scope-bound."""
306 
307    async def _dep(authorization: str = Header(default="")) -> Dict[str, Any]:
308        if not authorization.startswith("Bearer "):
309            raise HTTPException(status_code=401, detail="missing bearer token")
310        return verify_jwt(authorization[7:], required_scope=required_scope)
311 
312    return _dep
313 
314 
315# ─── audit emit (best-effort wrapper) ─────────────────────────────
316 
317 
318async def emit_shadow_audit(action: str, actor: str, payload: Dict[str, Any]) -> None:
319    """Emit an admin.shadow_overlord_action catalogue event (best effort).
320 
321    Catalogue events are pure observability — failure to emit must NEVER
322    block the privileged op.
323    """
324    try:
325        from agents.catalogue.events import emit_catalogue_event
326 
327        await emit_catalogue_event(
328            kind="admin.shadow_overlord_action",
329            actor=actor,
330            payload={"action": action, **payload, "ts": int(time.time())},
331            source_log="bankon_vault.shadow_overlord",
332        )
333    except Exception:
334        # Audit failures should not break the request flow.
335        pass

All DocumentsBook of mindXAPI Reference