mindx_backend_service/bankon_vault/shadow_overlord.py · 335 lines · 11.7 KB · Python source
1# ╔══════════════════════════════════════════════════════════════════╗
2# ║ BANKON Vault — Shadow-Overlord Auth Tier ║
3# ║ ║
4# ║ ECDSA-signed challenge → short-lived JWT for the admin UI; ║
5# ║ fresh per-op signatures gate every state-changing endpoint. ║
6# ║ ║
7# ║ See /home/hacker/.claude/plans/splendid-wishing-hejlsberg.md ║
8# ╚══════════════════════════════════════════════════════════════════╝
9
10from __future__ import annotations
11
12import hmac
13import json
14import os
15import secrets
16import time
17from dataclasses import asdict, dataclass, field
18from pathlib import Path
19from typing import Any, Callable, Dict, Optional
20
21import jwt as pyjwt
22from eth_account import Account
23from eth_account.messages import encode_defunct
24from fastapi import Depends, Header, HTTPException
25from web3 import Web3
26
27# Allowed scopes — any verify must explicitly require one of these.
28SCOPE_AUTH = "auth"
29SCOPE_CABINET_PROVISION = "cabinet.provision"
30SCOPE_CABINET_CLEAR = "cabinet.clear"
31SCOPE_VAULT_SIGN = "vault.sign"
32SCOPE_RELEASE_KEY = "release.key"
33
34NONCE_TTL_S = 120
35JWT_TTL_S = 300
36
37_NONCE_PATH = Path(os.environ.get(
38 "SHADOW_NONCES_PATH",
39 "data/governance/shadow_nonces.json",
40))
41
42
43@dataclass
44class NonceRecord:
45 issued_at: float
46 scope: str
47 message: str
48 params: Dict[str, Any] = field(default_factory=dict)
49 consumed: bool = False
50
51 def is_expired(self, now: float) -> bool:
52 return now - self.issued_at > NONCE_TTL_S
53
54
55class NonceStore:
56 """In-memory nonce store with JSONL persistence.
57
58 Each record binds a nonce to a scope + canonical challenge message + the
59 parameters that operation will mutate. Single-use; expires in NONCE_TTL_S.
60 """
61
62 def __init__(self, path: Path = _NONCE_PATH):
63 self._path = path
64 self._records: Dict[str, NonceRecord] = {}
65 self._load()
66
67 def _load(self) -> None:
68 if not self._path.exists():
69 return
70 try:
71 raw = json.loads(self._path.read_text(encoding="utf-8"))
72 now = time.time()
73 for nonce, payload in raw.items():
74 rec = NonceRecord(**payload)
75 if not rec.is_expired(now) and not rec.consumed:
76 self._records[nonce] = rec
77 except (json.JSONDecodeError, TypeError, ValueError):
78 # Corrupt file: start fresh; old nonces are unrecoverable but
79 # that just forces re-auth — no security impact.
80 self._records = {}
81
82 def _persist(self) -> None:
83 self._path.parent.mkdir(parents=True, exist_ok=True)
84 tmp = self._path.with_suffix(self._path.suffix + ".tmp")
85 tmp.write_text(
86 json.dumps({k: asdict(v) for k, v in self._records.items()}),
87 encoding="utf-8",
88 )
89 os.replace(tmp, self._path)
90
91 def issue(self, scope: str, message: str, params: Dict[str, Any]) -> str:
92 self._prune()
93 nonce = "0x" + secrets.token_hex(32)
94 self._records[nonce] = NonceRecord(
95 issued_at=time.time(),
96 scope=scope,
97 message=message,
98 params=params,
99 )
100 self._persist()
101 return nonce
102
103 def lookup(self, nonce: str) -> Optional[NonceRecord]:
104 self._prune()
105 rec = self._records.get(nonce)
106 if rec is None or rec.is_expired(time.time()) or rec.consumed:
107 return None
108 return rec
109
110 def consume(self, nonce: str) -> bool:
111 rec = self._records.get(nonce)
112 if rec is None or rec.is_expired(time.time()) or rec.consumed:
113 return False
114 rec.consumed = True
115 self._persist()
116 return True
117
118 def _prune(self) -> None:
119 now = time.time()
120 stale = [k for k, v in self._records.items() if v.is_expired(now)]
121 for k in stale:
122 self._records.pop(k, None)
123 if stale:
124 self._persist()
125
126
127_store: Optional[NonceStore] = None
128
129
130def get_store() -> NonceStore:
131 global _store
132 if _store is None:
133 _store = NonceStore()
134 return _store
135
136
137def reset_store_for_tests(path: Optional[Path] = None) -> None:
138 global _store
139 _store = NonceStore(path or _NONCE_PATH)
140
141
142# ─── canonical challenge message construction ─────────────────────
143
144
145def build_challenge_message(scope: str, nonce: str, params: Dict[str, Any]) -> str:
146 """Canonical, scope-bound message text that the operator signs.
147
148 The shape is human-readable and audit-friendly. The scope tag prevents a
149 sig issued for one operation from authorizing another.
150 """
151 lines = [f"MINDX-SHADOW-OVERLORD scope={scope}", f"nonce: {nonce}"]
152 for key in sorted(params):
153 lines.append(f"{key}: {params[key]}")
154 return "\n".join(lines)
155
156
157def issue_challenge(scope: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
158 """Public: issue a fresh challenge for the given scope.
159
160 Returns {nonce, message, expires_at}. The client signs `message` and
161 posts {nonce, signature} to the corresponding op endpoint.
162 """
163 if scope not in {
164 SCOPE_AUTH,
165 SCOPE_CABINET_PROVISION,
166 SCOPE_CABINET_CLEAR,
167 SCOPE_VAULT_SIGN,
168 SCOPE_RELEASE_KEY,
169 }:
170 raise HTTPException(status_code=400, detail=f"unknown scope: {scope}")
171 params = params or {}
172 store = get_store()
173 # Pre-allocate the nonce to interpolate into the message; store *with* the message.
174 nonce = "0x" + secrets.token_hex(32)
175 message = build_challenge_message(scope, nonce, params)
176 store._records[nonce] = NonceRecord(
177 issued_at=time.time(), scope=scope, message=message, params=params,
178 )
179 store._persist()
180 return {
181 "nonce": nonce,
182 "message": message,
183 "expires_at": int(time.time()) + NONCE_TTL_S,
184 }
185
186
187# ─── signature verification ───────────────────────────────────────
188
189
190def _shadow_address() -> str:
191 addr = os.environ.get("SHADOW_OVERLORD_ADDRESS", "").strip()
192 if not addr:
193 raise HTTPException(
194 status_code=503,
195 detail="SHADOW_OVERLORD_ADDRESS not configured on this server",
196 )
197 return Web3.to_checksum_address(addr)
198
199
200def recover_signer(message: str, signature: str) -> str:
201 try:
202 sig = signature if signature.startswith("0x") else "0x" + signature
203 recovered = Account.recover_message(
204 encode_defunct(text=message),
205 signature=sig,
206 )
207 return Web3.to_checksum_address(recovered)
208 except Exception as e:
209 raise HTTPException(status_code=401, detail=f"invalid signature: {e}")
210
211
212def verify_shadow_signature(message: str, signature: str) -> str:
213 """Recover the signer of `message` and assert it is the shadow-overlord.
214
215 Returns the recovered checksum address. Raises HTTPException on mismatch.
216 """
217 recovered = recover_signer(message, signature)
218 expected = _shadow_address()
219 if not hmac.compare_digest(recovered.lower(), expected.lower()):
220 raise HTTPException(status_code=403, detail="not shadow-overlord")
221 return recovered
222
223
224def consume_signed_challenge(
225 nonce: str,
226 signature: str,
227 expected_scope: str,
228 expected_params: Optional[Dict[str, Any]] = None,
229) -> Dict[str, Any]:
230 """Validate a one-time signed challenge.
231
232 Steps:
233 1. Lookup nonce in store; reject if missing/expired/consumed.
234 2. Confirm scope == expected_scope.
235 3. If expected_params supplied, every key/value must match.
236 4. Recover signer from the *stored* message; confirm == shadow-overlord.
237 5. Mark nonce consumed (single-use).
238
239 Returns the NonceRecord's params on success (caller can rely on them).
240 """
241 store = get_store()
242 rec = store.lookup(nonce)
243 if rec is None:
244 raise HTTPException(status_code=409, detail="nonce expired, unknown, or already consumed")
245 if rec.scope != expected_scope:
246 raise HTTPException(
247 status_code=403,
248 detail=f"scope mismatch: nonce was issued for {rec.scope!r}, this endpoint requires {expected_scope!r}",
249 )
250 if expected_params is not None:
251 for k, v in expected_params.items():
252 if rec.params.get(k) != v:
253 raise HTTPException(
254 status_code=400,
255 detail=f"params mismatch on {k!r}",
256 )
257 verify_shadow_signature(rec.message, signature)
258 if not store.consume(nonce): # race-safe re-check
259 raise HTTPException(status_code=409, detail="nonce was consumed concurrently")
260 return dict(rec.params)
261
262
263# ─── JWT issue / verify ───────────────────────────────────────────
264
265
266def _jwt_secret() -> str:
267 secret = os.environ.get("SHADOW_JWT_SECRET", "")
268 if len(secret) < 32:
269 raise HTTPException(
270 status_code=503,
271 detail="SHADOW_JWT_SECRET not configured (need 32+ chars)",
272 )
273 return secret
274
275
276def issue_jwt(addr: str, scope: str = SCOPE_AUTH, jti: Optional[str] = None) -> Dict[str, Any]:
277 now = int(time.time())
278 claims = {
279 "sub": addr,
280 "scope": scope,
281 "jti": jti or secrets.token_hex(16),
282 "iat": now,
283 "exp": now + JWT_TTL_S,
284 }
285 token = pyjwt.encode(claims, _jwt_secret(), algorithm="HS256")
286 return {"jwt": token, "exp": claims["exp"]}
287
288
289def verify_jwt(token: str, required_scope: Optional[str] = None) -> Dict[str, Any]:
290 try:
291 claims = pyjwt.decode(token, _jwt_secret(), algorithms=["HS256"])
292 except pyjwt.ExpiredSignatureError:
293 raise HTTPException(status_code=401, detail="jwt expired")
294 except pyjwt.InvalidTokenError as e:
295 raise HTTPException(status_code=401, detail=f"invalid jwt: {e}")
296 sub = claims.get("sub", "")
297 if not hmac.compare_digest(sub.lower(), _shadow_address().lower()):
298 raise HTTPException(status_code=403, detail="jwt subject is not shadow-overlord")
299 if required_scope is not None and claims.get("scope") != required_scope:
300 raise HTTPException(status_code=403, detail="jwt scope mismatch")
301 return claims
302
303
304def require_shadow_jwt(required_scope: Optional[str] = None) -> Callable:
305 """FastAPI dep factory. Enforces a valid Bearer JWT, optionally scope-bound."""
306
307 async def _dep(authorization: str = Header(default="")) -> Dict[str, Any]:
308 if not authorization.startswith("Bearer "):
309 raise HTTPException(status_code=401, detail="missing bearer token")
310 return verify_jwt(authorization[7:], required_scope=required_scope)
311
312 return _dep
313
314
315# ─── audit emit (best-effort wrapper) ─────────────────────────────
316
317
318async def emit_shadow_audit(action: str, actor: str, payload: Dict[str, Any]) -> None:
319 """Emit an admin.shadow_overlord_action catalogue event (best effort).
320
321 Catalogue events are pure observability — failure to emit must NEVER
322 block the privileged op.
323 """
324 try:
325 from agents.catalogue.events import emit_catalogue_event
326
327 await emit_catalogue_event(
328 kind="admin.shadow_overlord_action",
329 actor=actor,
330 payload={"action": action, **payload, "ts": int(time.time())},
331 source_log="bankon_vault.shadow_overlord",
332 )
333 except Exception:
334 # Audit failures should not break the request flow.
335 pass