uploader-bot/app/core/network/dht/crypto.py

71 lines
2.0 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from typing import Iterable, Tuple
from app.core._utils.hash import blake3_hex
BLAKE3_DIGEST_SIZE = 32
def _ensure_bytes(data: Iterable[int] | bytes | bytearray) -> bytes:
if isinstance(data, (bytes, bytearray)):
return bytes(data)
if isinstance(data, str):
return data.encode()
return bytes(data)
def digest_hex(data: Iterable[int] | bytes | bytearray | str) -> str:
return blake3_hex(_ensure_bytes(data))
def compute_node_id(public_key: bytes) -> str:
"""NodeID = blake3(pubkey)."""
if not isinstance(public_key, (bytes, bytearray)):
raise TypeError("public_key must be bytes")
return digest_hex(public_key)
def compute_content_id(encrypted_blob: bytes) -> str:
"""ContentID = blake3(encrypted_blob)."""
return digest_hex(encrypted_blob)
def compute_view_id(content_id: str, viewer_salt: bytes) -> str:
"""ViewID = blake3(ContentID||viewer_salt)."""
if not viewer_salt:
raise ValueError("viewer_salt must not be empty")
return digest_hex(content_id.encode() + viewer_salt)
def bits_from_hex(hex_digest: str, prefix_bits: int) -> Tuple[int, int]:
"""Extract first prefix_bits from a hex digest. Returns (prefix, total_bits)."""
if prefix_bits < 0:
raise ValueError("prefix_bits must be >= 0")
bitstring = bin(int(hex_digest, 16))[2:].zfill(len(hex_digest) * 4)
if prefix_bits == 0:
return 0, len(bitstring)
return int(bitstring[:prefix_bits], 2), len(bitstring)
def rendezvous_score(content_id: str, node_id: str) -> int:
"""Return rendezvous score via blake3(ContentID||NodeID)."""
return int(blake3_hex(f"{content_id}:{node_id}".encode()), 16)
@dataclass(frozen=True)
class ContentFingerprint:
content_id: str
node_id_prefix: int
prefix_bits: int
def matches(self, node_id: str) -> bool:
prefix, total = bits_from_hex(node_id, self.prefix_bits)
return prefix == self.node_id_prefix and total >= self.prefix_bits