71 lines
2.3 KiB
Python
71 lines
2.3 KiB
Python
from __future__ import annotations
|
|
|
|
import base64
|
|
import hashlib
|
|
import json
|
|
import secrets
|
|
import time
|
|
from typing import Dict, Tuple
|
|
|
|
from base58 import b58decode, b58encode
|
|
|
|
from app.core.network.guard import check_timestamp_fresh, check_and_remember_nonce
|
|
|
|
|
|
def _body_sha256(body: bytes) -> str:
|
|
h = hashlib.sha256()
|
|
h.update(body or b"")
|
|
return h.hexdigest()
|
|
|
|
|
|
def canonical_string(method: str, path: str, body: bytes, ts: int, nonce: str, node_id: str) -> bytes:
|
|
parts = [
|
|
method.upper(),
|
|
path,
|
|
_body_sha256(body),
|
|
str(int(ts)),
|
|
str(nonce),
|
|
node_id,
|
|
]
|
|
return ("\n".join(parts)).encode()
|
|
|
|
|
|
def sign_headers(method: str, path: str, body: bytes, sk_bytes: bytes, pk_b58: str) -> Dict[str, str]:
|
|
import nacl.signing
|
|
ts = int(time.time())
|
|
nonce = secrets.token_hex(16)
|
|
msg = canonical_string(method, path, body, ts, nonce, pk_b58)
|
|
sig = nacl.signing.SigningKey(sk_bytes).sign(msg).signature
|
|
return {
|
|
"X-Node-Id": pk_b58,
|
|
"X-Node-Ts": str(ts),
|
|
"X-Node-Nonce": nonce,
|
|
"X-Node-Sig": b58encode(sig).decode(),
|
|
}
|
|
|
|
|
|
def verify_request(request, memory) -> Tuple[bool, str, str]:
|
|
"""Verify NodeSig headers of an incoming Sanic request.
|
|
Returns (ok, node_id, error). ok==True if signature valid, timestamp fresh, nonce unused.
|
|
"""
|
|
try:
|
|
node_id = request.headers.get("X-Node-Id", "").strip()
|
|
ts = int(request.headers.get("X-Node-Ts", "0").strip() or 0)
|
|
nonce = request.headers.get("X-Node-Nonce", "").strip()
|
|
sig_b58 = request.headers.get("X-Node-Sig", "").strip()
|
|
if not node_id or not ts or not nonce or not sig_b58:
|
|
return False, "", "MISSING_HEADERS"
|
|
if not check_timestamp_fresh(ts):
|
|
return False, node_id, "STALE_TS"
|
|
if not check_and_remember_nonce(memory, node_id, nonce):
|
|
return False, node_id, "NONCE_REPLAY"
|
|
import nacl.signing
|
|
vk = nacl.signing.VerifyKey(b58decode(node_id))
|
|
sig = b58decode(sig_b58)
|
|
msg = canonical_string(request.method, request.path, request.body or b"", ts, nonce, node_id)
|
|
vk.verify(msg, sig)
|
|
return True, node_id, ""
|
|
except Exception as e:
|
|
return False, "", f"BAD_SIGNATURE: {e}"
|
|
|