from __future__ import annotations import base64 import hashlib import json import secrets import time from typing import Dict, Tuple from base58 import b58decode, b58encode from app.core.network.guard import check_timestamp_fresh, check_and_remember_nonce def _body_sha256(body: bytes) -> str: h = hashlib.sha256() h.update(body or b"") return h.hexdigest() def canonical_string(method: str, path: str, body: bytes, ts: int, nonce: str, node_id: str) -> bytes: parts = [ method.upper(), path, _body_sha256(body), str(int(ts)), str(nonce), node_id, ] return ("\n".join(parts)).encode() def sign_headers(method: str, path: str, body: bytes, sk_bytes: bytes, pk_b58: str) -> Dict[str, str]: import nacl.signing ts = int(time.time()) nonce = secrets.token_hex(16) msg = canonical_string(method, path, body, ts, nonce, pk_b58) sig = nacl.signing.SigningKey(sk_bytes).sign(msg).signature return { "X-Node-Id": pk_b58, "X-Node-Ts": str(ts), "X-Node-Nonce": nonce, "X-Node-Sig": b58encode(sig).decode(), } def verify_request(request, memory) -> Tuple[bool, str, str]: """Verify NodeSig headers of an incoming Sanic request. Returns (ok, node_id, error). ok==True if signature valid, timestamp fresh, nonce unused. """ try: node_id = request.headers.get("X-Node-Id", "").strip() ts = int(request.headers.get("X-Node-Ts", "0").strip() or 0) nonce = request.headers.get("X-Node-Nonce", "").strip() sig_b58 = request.headers.get("X-Node-Sig", "").strip() if not node_id or not ts or not nonce or not sig_b58: return False, "", "MISSING_HEADERS" if not check_timestamp_fresh(ts): return False, node_id, "STALE_TS" if not check_and_remember_nonce(memory, node_id, nonce): return False, node_id, "NONCE_REPLAY" import nacl.signing vk = nacl.signing.VerifyKey(b58decode(node_id)) sig = b58decode(sig_b58) msg = canonical_string(request.method, request.path, request.body or b"", ts, nonce, node_id) vk.verify(msg, sig) return True, node_id, "" except Exception as e: return False, "", f"BAD_SIGNATURE: {e}"