from __future__ import annotations import asyncio from datetime import datetime, timedelta import json from typing import Dict, Any, Optional, List import httpx from base58 import b58encode from sqlalchemy import select, update from app.core.logger import make_log from app.core.models.my_network import KnownNode from app.core.storage import db_session from app.core._secrets import hot_pubkey from .config import ( HANDSHAKE_INTERVAL_SEC, UNSUPPORTED_RECHECK_INTERVAL_SEC, BOOTSTRAP_SEEDS, BOOTSTRAP_REQUIRED, BOOTSTRAP_TIMEOUT_SEC, NODE_PRIVACY, NETWORK_TLS_VERIFY, ) from .constants import NODE_TYPE_PRIVATE from .semver import compatibility from .constants import CURRENT_PROTOCOL_VERSION def _now() -> datetime: return datetime.utcnow() async def upsert_known_node(session, host: str, port: int, public_key: str, meta: Dict[str, Any]) -> KnownNode: # Host can be full URL; normalize host/ip and port if available host = (host or "").replace("http://", "").replace("https://", "").strip("/") h_only = host if ":" in host: h_only, port_str = host.rsplit(":", 1) try: port = int(port_str) except Exception: pass # Prefer match by public_key (stable identity) if public_key: result = await session.execute(select(KnownNode).where(KnownNode.public_key == public_key)) row = result.scalars().first() if row: row.ip = h_only or row.ip row.port = port or row.port row.public_key = public_key or row.public_key row.meta = {**(row.meta or {}), **(meta or {})} row.last_sync = _now() await session.commit() return row # Fallback by IP/host result = await session.execute(select(KnownNode).where(KnownNode.ip == h_only)) row = result.scalars().first() if row: row.port = port or row.port row.public_key = public_key or row.public_key row.meta = {**(row.meta or {}), **(meta or {})} row.last_sync = _now() await session.commit() return row node = KnownNode( ip=h_only, port=port or 80, public_key=public_key, reputation=0, last_sync=_now(), meta=meta or {}, located_at=_now(), ) session.add(node) await session.commit() return node def _compatibility_for_meta(remote_version: str) -> str: if not remote_version or remote_version == "0.0.0": return "warning" return compatibility(remote_version, CURRENT_PROTOCOL_VERSION) async def list_known_public_nodes(session) -> List[Dict[str, Any]]: rows = (await session.execute(select(KnownNode))).scalars().all() result = [] for r in rows: meta = r.meta or {} if not meta.get("is_public", True): continue result.append({ "host": r.ip, "port": r.port, "public_key": r.public_key, "version": meta.get("version"), "compatibility": _compatibility_for_meta(meta.get("version", "0.0.0")), "last_seen": (r.last_sync.isoformat() + "Z") if r.last_sync else None, "public_host": meta.get("public_host"), "capabilities": meta.get("capabilities") or {}, }) return result async def _handshake_with(session, base_url: str) -> Optional[Dict[str, Any]]: url = base_url.rstrip("/") + "/api/v1/network.handshake" from .handshake import build_handshake_payload payload = await build_handshake_payload(session) timeout = httpx.Timeout(5.0, read=10.0) async with httpx.AsyncClient(timeout=timeout, verify=NETWORK_TLS_VERIFY) as client: r = await client.post(url, json=payload) if r.status_code == 403 and NODE_PRIVACY == NODE_TYPE_PRIVATE: # We are private; outbound is allowed, inbound denied by peers is fine pass r.raise_for_status() data = r.json() # Verify server signature if present try: import nacl.signing from base58 import b58decode required = ["server_signature", "server_public_key", "timestamp"] if all(k in data for k in required): signed_fields = {k: data[k] for k in data if k not in ("server_signature", "server_public_key")} blob = json.dumps(signed_fields, sort_keys=True, separators=(",", ":")).encode() vk = nacl.signing.VerifyKey(b58decode(data["server_public_key"])) vk.verify(blob, b58decode(data["server_signature"])) except Exception as e: make_log("Handshake", f"Server signature verification failed for {base_url}: {e}", level='warning') return data async def pick_next_node(session) -> Optional[KnownNode]: rows = (await session.execute(select(KnownNode))).scalars().all() if not rows: return None # Prefer nodes with oldest last_sync rows.sort(key=lambda r: (r.last_sync or datetime.fromtimestamp(0))) now = _now() for r in rows: meta = r.meta or {} compat = _compatibility_for_meta(meta.get("version", "0.0.0")) if compat == "blocked": last = datetime.fromisoformat(meta.get("unsupported_last_checked_at")) if meta.get("unsupported_last_checked_at") else None if last and (now - last) < timedelta(seconds=UNSUPPORTED_RECHECK_INTERVAL_SEC): continue # Backoff after failures if meta.get("last_failure_at"): try: last_fail = datetime.fromisoformat(meta.get("last_failure_at")) fail_count = int(meta.get("fail_count", 1)) # Exponential backoff: 30s * 2^fail_count, capped 2h wait = min(7200, 30 * (2 ** max(0, fail_count))) if (now - last_fail) < timedelta(seconds=wait): continue except Exception: pass return r # If we only have unsupported nodes and all are within cooldown, skip this round return None async def perform_handshake_round(): async with db_session(auto_commit=True) as session: # Private nodes still do outbound handshakes; inbound typically unreachable without public endpoint node = await pick_next_node(session) if not node: return base_url = node.meta.get("public_host") or f"http://{node.ip}:{node.port}" try: resp = await _handshake_with(session, base_url) # Merge known nodes received for peer in (resp or {}).get("known_public_nodes", []): try: await upsert_known_node( session, host=peer.get("host") or peer.get("public_host") or "", port=int(peer.get("port") or 80), public_key=peer.get("public_key") or "", meta={ "is_public": True, "version": peer.get("version") or "0.0.0", "public_host": peer.get("public_host") or (f"http://{peer.get('host')}:{peer.get('port')}" if peer.get('host') else None), } ) except Exception as e: make_log("Handshake", f"Ignore bad peer from {base_url}: {e}", level='warning') # Update last_sync and meta for node node.last_sync = _now() node.meta = {**(node.meta or {}), "last_response": resp, "fail_count": 0} await session.commit() make_log("Handshake", f"Handshake OK with {base_url}") except Exception as e: make_log("Handshake", f"Handshake failed with {base_url}: {e}", level='warning') # Record incident-lite in meta meta = node.meta or {} meta["last_error"] = str(e) meta["last_failure_at"] = _now().isoformat() meta["fail_count"] = int(meta.get("fail_count", 0)) + 1 node.meta = meta await session.commit() async def network_handshake_daemon(app): # Stagger start a bit to allow HTTP server to come up await asyncio.sleep(3) make_log("Handshake", f"Daemon started; interval={HANDSHAKE_INTERVAL_SEC}s") while True: try: await perform_handshake_round() except Exception as e: make_log("Handshake", f"Round error: {e}", level='error') await asyncio.sleep(HANDSHAKE_INTERVAL_SEC) async def bootstrap_once_and_exit_if_failed(): # Do not try to bootstrap private nodes as inbound is blocked, but outbound required for seeds discovery seeds = BOOTSTRAP_SEEDS or [] if not seeds: return # Nothing to do async with db_session(auto_commit=True) as session: # If we already know nodes, skip bootstrap have_any = (await session.execute(select(KnownNode))).scalars().first() if have_any: return make_log("Bootstrap", f"Starting bootstrap with seeds={seeds}; required={BOOTSTRAP_REQUIRED}") deadline = _now() + timedelta(seconds=BOOTSTRAP_TIMEOUT_SEC) ok = False for seed in seeds: try: async with db_session(auto_commit=True) as session: resp = await _handshake_with(session, seed) if resp: ok = True # Seed itself gets inserted by handshake handling route; also insert it explicitly try: await upsert_known_node( session, host=seed, port=80, public_key=resp.get("node", {}).get("public_key", ""), meta={ "is_public": True, "version": resp.get("node", {}).get("version", "0.0.0"), "public_host": resp.get("node", {}).get("public_host") or seed, } ) except Exception: pass break except Exception as e: make_log("Bootstrap", f"Seed failed {seed}: {e}", level='warning') if _now() > deadline: break if BOOTSTRAP_REQUIRED and not ok: make_log("Bootstrap", "Failed to reach any bootstrap seeds; exiting", level='error') # Hard exit; Sanic won't stop otherwise import os os._exit(2)