from __future__ import annotations import base64 import json import os from datetime import datetime from typing import Dict, Any from base58 import b58encode from sanic import response from sqlalchemy import select from app.core._secrets import hot_pubkey from app.core.logger import make_log from app.core.models.content_v3 import EncryptedContent, ContentKey, KeyGrant from app.core.network.nodesig import verify_request from app.core.network.guard import check_rate_limit from app.core.models.my_network import KnownNode from app.core.crypto.keywrap import unwrap_dek, KeyWrapError def _b64(b: bytes) -> str: return base64.b64encode(b).decode() async def s_api_v1_keys_request(request): # Rate limit per remote IP (reuse handshake limiter) remote_ip = (request.headers.get('X-Forwarded-For') or request.remote_addr or request.ip or '').split(',')[0].strip() if not check_rate_limit(request.app.ctx.memory, remote_ip): return response.json({"error": "RATE_LIMIT"}, status=429) # Verify NodeSig ok, hdr_node, reason = verify_request(request, request.app.ctx.memory) if not ok: return response.json({"error": reason or "UNAUTHORIZED"}, status=401) data: Dict[str, Any] = request.json or {} cid = data.get("encrypted_cid") requester_node = data.get("requestor_node_id") recipient_box_pub_b64 = data.get("recipient_box_pub") if not cid or not requester_node or not recipient_box_pub_b64: return response.json({"error": "BAD_REQUEST"}, status=400) if requester_node != hdr_node: return response.json({"error": "NODE_ID_MISMATCH"}, status=401) session = request.ctx.db_session row = (await session.execute(select(EncryptedContent, ContentKey).join(ContentKey, ContentKey.content_id == EncryptedContent.id).where(EncryptedContent.encrypted_cid == cid))).first() if not row: return response.json({"error": "NOT_FOUND"}, status=404) ec: EncryptedContent = row[0] ck: ContentKey = row[1] # Allow only trusted nodes unless explicitly disabled via env TRUSTED_ONLY = (os.getenv('KEY_AUTO_GRANT_TRUSTED_ONLY', '1') == '1') if TRUSTED_ONLY: kn = (await session.execute(select(KnownNode).where(KnownNode.public_key == requester_node))).scalars().first() role = (kn.meta or {}).get('role') if kn else None if role != 'trusted': return response.json({"error": "DENIED_NOT_TRUSTED"}, status=403) if not ck.allow_auto_grant: return response.json({"error": "DENIED"}, status=403) # Seal the DEK for recipient using libsodium sealed box try: dek_plain = unwrap_dek(ck.key_ciphertext_b64) import nacl.public pk = nacl.public.PublicKey(base64.b64decode(recipient_box_pub_b64)) box = nacl.public.SealedBox(pk) sealed = box.encrypt(dek_plain) sealed_b64 = _b64(sealed) except KeyWrapError as e: make_log("keys", f"unwrap failed: {e}", level="error") return response.json({"error": "KEY_UNWRAP_FAILED"}, status=500) except Exception as e: make_log("keys", f"seal failed: {e}", level="error") return response.json({"error": "SEAL_FAILED"}, status=500) issuer = b58encode(hot_pubkey).decode() purpose = (data.get('purpose') or 'full') ttl_sec = int(os.getenv('KEY_GRANT_PREVIEW_TTL_SEC', '0')) if purpose == 'preview' else 0 grant_body = { "encrypted_cid": cid, "to_node_id": requester_node, "sealed_key_b64": sealed_b64, "aead_scheme": ec.aead_scheme, "chunk_bytes": ec.chunk_bytes, "constraints": {"ttl_sec": ttl_sec, "scope": purpose}, "issued_at": datetime.utcnow().isoformat(), "issuer_node_id": issuer, } try: from app.core._crypto.signer import Signer from app.core._secrets import hot_seed signer = Signer(hot_seed) blob = json.dumps(grant_body, sort_keys=True, separators=(",", ":")).encode() sig = signer.sign(blob) except Exception: sig = "" grant = KeyGrant( encrypted_cid=cid, issuer_node_id=issuer, to_node_id=requester_node, sealed_key_b64=sealed_b64, aead_scheme=ec.aead_scheme, chunk_bytes=ec.chunk_bytes, constraints={"ttl_sec": 0, "scope": "full"}, sig=sig, ) session.add(grant) await session.commit() grant_row = { **grant_body, "sig": sig, "grant_id": grant.id, } return response.json(grant_row)