diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md new file mode 100644 index 0000000..d7bd6d8 --- /dev/null +++ b/ARCHITECTURE.md @@ -0,0 +1,219 @@ +# System Architecture Overview + +This document is the single source of truth for the platform’s architecture, protocols, data flows, and operational details. It supersedes previous scattered docs. + +## Contents +- Components & Topology +- Decentralized Layer (Membership, Replication, Metrics) +- Upload/Conversion Pipeline +- Content View & Purchase Flow +- API Surface (selected endpoints) +- Data Keys & Schemas +- Configuration & Defaults +- Observability & Metrics +- Sequence Diagrams (Mermaid) + +--- + +## Components & Topology + +- Backend API: Sanic-based service (Telegram bots embedded) with PostgreSQL (SQLAlchemy + Alembic). +- Storage: Local FS for uploaded/derived data; IPFS used for discovery/pinning; tusd for resumable uploads. +- Converter workers: Dockerized ffmpeg pipeline (convert_v3, convert_process) driven by background tasks. +- Frontend: Vite + TypeScript client served via nginx container. +- Decentralized overlay (in-process DHT): Membership, replication lease management, windowed content metrics. + +```mermaid +flowchart LR + Client -- TWA/HTTP --> Frontend + Frontend -- REST --> API[Backend API] + API -- tus hooks --> tusd + API -- SQL --> Postgres + API -- IPC --> Workers[Converter Workers] + API -- IPFS --> IPFS + API -- DHT --> DHT[(In-Process DHT)] + DHT -- CRDT Merge --> DHT +``` + +--- + +## Decentralized Layer + +### Identity & Versions +- NodeID = blake3(Ed25519 public key), ContentID = blake3(encrypted_blob) +- schema_version = v1 embedded into DHT keys/records. + +### Membership +- Signed `/api/v1/network.handshake` with Ed25519; includes: + - Node info, capabilities, metrics, IPFS metadata. + - reachability_receipts: (issuer, target, ASN, timestamp, signature). +- State: LWW-Set for members + receipts, HyperLogLog for population estimate. +- Island filtering: nodes with `reachability_ratio < q` are excluded (`k=5`, `q=0.6`, TTL=600s). +- N_estimate: `max(valid N_local reports)` across sufficiently reachable peers. + +### Replication & Leases +- Compute prefix `p = max(0, round(log2(N_estimate / R_target)))` with `R_target ≥ 3`. +- Responsible nodes: first `p` bits of NodeID equal first `p` bits of ContentID. +- Leader = min NodeID among responsible. +- Leader maintains `replica_leases` with TTL=600s and diversity: ≥3 IP first octets and ≥3 ASN if available. +- Rendezvous ranking: blake3(ContentID || NodeID) for candidate selection. +- Heartbeat interval 60s, miss threshold 3 → failover within ≤180s. + +### Metrics (Windowed CRDT) +- On view: PN-Counter for views; HyperLogLog for uniques (ViewID = blake3(ContentID || device_salt)); G-Counter for watch_time, bytes_out, completions. +- Keys are windowed by hour; commutative merges ensure deterministic convergence. + +```mermaid +stateDiagram-v2 + [*] --> Discover + Discover: Handshake + receipts + Discover --> Active: k ASN receipts & TTL ok + Active --> Leader: Content prefix p elects min NodeID + Leader --> Leased: Assign replica_leases (diversity) + Leased --> Monitoring: Heartbeats every 60s + Monitoring --> Reassign: Missed 3 intervals + Reassign --> Leased +``` + +--- + +## Upload & Conversion Pipeline + +1) Client uploads via `tusd` (resumable). Backend receives hooks (`/api/v1/upload.tus-hook`). +2) Encrypted content is registered; converter workers derive preview/low/high (for media) or original (for binaries). +3) Derivative metadata stored in DB and surfaced via `/api/v1/content.view`. + +```mermaid +sequenceDiagram + participant C as Client + participant T as tusd + participant B as Backend + participant W as Workers + participant DB as PostgreSQL + + C->>T: upload chunks + T->>B: hooks (pre/post-finish) + B->>DB: create content record + B->>W: enqueue conversion + W->>DB: store derivatives + C->>B: GET /content.view + B->>DB: resolve latest derivatives + B-->>C: display_options + status +``` + +--- + +## Content View & Purchase Flow + +- `/api/v1/content.view/` resolves content and derivatives: + - For binary content without previews: present original only when licensed. + - For audio/video: use preview/low for unauth; decrypted_low/high for licensed users. + - Frontend shows processing state when derivatives are pending. +- Purchase options (TON/Stars) remain in a single row (UI constraint). +- Cover art layout: fixed square slot; image fits without stretching; background follows page color, not black. + +```mermaid +flowchart LR + View[content.view] --> Resolve[Resolve encrypted/decrypted rows] + Resolve --> Derivations{Derivatives ready?} + Derivations -- No --> Status[processing/pending] + Derivations -- Yes --> Options + Options -- Binary + No License --> Original hidden + Options -- Media + No License --> Preview/Low + Options -- Licensed --> Decrypted Low/High or Original +``` + +--- + +## Selected APIs + +- `GET /api/system.version` – liveness/protocol version. +- `POST /api/v1/network.handshake` – signed membership exchange. +- `GET /api/v1/content.view/` – resolves display options, status, and downloadability. +- `GET /api/v1.5/storage/` – static file access. +- `POST /api/v1/storage` – legacy upload endpoint. + +--- + +## Data Keys & Schemas + +- MetaKey(content_id): tracks `replica_leases`, `leader`, `conflict_log`, `revision`. +- MembershipKey(node_id): LWW-Set of members & receipts, HyperLogLog population, N_reports. +- MetricKey(content_id, window_id): PN-/G-/HLL serialized state. + +All DHT records are signed and merged via deterministic CRDT strategies + LWW dominance (logical_counter, timestamp, node_id). + +--- + +## Configuration & Defaults + +- Network: `NODE_PRIVACY`, `PUBLIC_HOST`, `HANDSHAKE_INTERVAL_SEC`, TLS verify, IPFS peering. +- DHT: `DHT_MIN_RECEIPTS=5`, `DHT_MIN_REACHABILITY=0.6`, `DHT_MEMBERSHIP_TTL=600`, `DHT_REPLICATION_TARGET=3`, `DHT_LEASE_TTL=600`, `DHT_HEARTBEAT_INTERVAL=60`, `DHT_HEARTBEAT_MISS_THRESHOLD=3`, `DHT_MIN_ASN=3`, `DHT_MIN_IP_OCTETS=3`, `DHT_METRIC_WINDOW_SEC=3600`. +- Conversion resources: `CONVERT_*` limits (CPU/mem), `MAX_CONTENT_SIZE_MB`. + +--- + +## Observability & Metrics + +Prometheus (exported in-process): +- dht_replication_under / dht_replication_over / dht_leader_changes_total +- dht_merge_conflicts_total +- dht_view_count_total / dht_unique_view_estimate / dht_watch_time_seconds + +Logs track replication conflict_log entries and HTTP structured errors (with session_id/error_id). + +--- + +## Sequence Diagrams (Consolidated) + +### Membership & N_estimate +```mermaid +sequenceDiagram + participant A as Node A + participant B as Node B + A->>B: POST /network.handshake {nonce, ts, signature} + B->>B: verify ts, nonce, signature + B->>B: upsert member; store receipts + B-->>A: {node, known_public_nodes, n_estimate, signature} + A->>A: merge; recompute N_estimate = max(N_local, peers) +``` + +### Replication Leader Election +```mermaid +sequenceDiagram + participant L as Leader + participant Peers as Responsible Nodes + L->>L: compute p from N_estimate + L->>Peers: rendezvous scores for ContentID + L->>L: assign leases (diversity) + Peers-->>L: heartbeat every 60s + L->>L: reassign on 3 misses (≤180s) +``` + +### Metrics Publication +```mermaid +sequenceDiagram + participant C as Client + participant API as Backend + participant M as MetricsAggregator + participant D as DHT + + C->>API: GET content.view?watch_time&bytes_out + API->>M: record_view(delta) + M->>D: merge MetricKey(ContentID, Window) + M->>API: update gauges +``` + +--- + +## Run & Test + +```bash +# Spin services +docker compose -f /home/configs/docker-compose.yml --env-file /home/configs/.env up -d --build + +# Backend unit tests (DHT integration) +cd uploader-bot +python3 -m unittest discover -s tests/dht +``` + diff --git a/README.md b/README.md index dbfc5a2..4881a5f 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,14 @@ # Sanic Telegram Bot [template] +See the consolidated system design with protocol, flows, configuration, and diagrams in `ARCHITECTURE.md`. + +### Running DHT integration tests + +```shell +cd uploader-bot +python3 -m unittest discover -s tests/dht +``` + --- ## Run ```shell diff --git a/alembic.ini b/alembic.ini new file mode 100644 index 0000000..d134290 --- /dev/null +++ b/alembic.ini @@ -0,0 +1,35 @@ +[alembic] +script_location = alembic +sqlalchemy.url = ${DATABASE_URL} + +[loggers] +keys = root,sqlalchemy,alembic + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = WARN +handlers = console + +[logger_sqlalchemy] +level = WARN +handlers = +qualname = sqlalchemy.engine + +[logger_alembic] +level = INFO +handlers = +qualname = alembic + +[handler_console] +class = StreamHandler +args = (sys.stderr,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(levelname)-5.5s [%(name)s] %(message)s diff --git a/alembic/versions/b1f2d3c4a5b6_add_artist_to_encrypted_content.py b/alembic/versions/b1f2d3c4a5b6_add_artist_to_encrypted_content.py new file mode 100644 index 0000000..6712053 --- /dev/null +++ b/alembic/versions/b1f2d3c4a5b6_add_artist_to_encrypted_content.py @@ -0,0 +1,26 @@ +"""add artist column to encrypted content + +Revision ID: b1f2d3c4a5b6 +Revises: a7c1357e8d15 +Create Date: 2024-06-05 00:00:00.000000 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = 'b1f2d3c4a5b6' +down_revision: Union[str, None] = 'a7c1357e8d15' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.add_column('encrypted_contents', sa.Column('artist', sa.String(length=512), nullable=True)) + + +def downgrade() -> None: + op.drop_column('encrypted_contents', 'artist') diff --git a/alembic/versions/c2d4e6f8a1b2_expand_telegram_id_precision.py b/alembic/versions/c2d4e6f8a1b2_expand_telegram_id_precision.py new file mode 100644 index 0000000..eabfd72 --- /dev/null +++ b/alembic/versions/c2d4e6f8a1b2_expand_telegram_id_precision.py @@ -0,0 +1,38 @@ +"""expand telegram_id precision on stars invoices + +Revision ID: c2d4e6f8a1b2 +Revises: b1f2d3c4a5b6 +Create Date: 2025-10-17 00:00:00.000000 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = 'c2d4e6f8a1b2' +down_revision: Union[str, None] = 'b1f2d3c4a5b6' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.alter_column( + 'stars_invoices', + 'telegram_id', + existing_type=sa.Integer(), + type_=sa.BigInteger(), + existing_nullable=True, + ) + + +def downgrade() -> None: + op.alter_column( + 'stars_invoices', + 'telegram_id', + existing_type=sa.BigInteger(), + type_=sa.Integer(), + existing_nullable=True, + ) diff --git a/app/__main__.py b/app/__main__.py index 970ceae..4c4ea1c 100644 --- a/app/__main__.py +++ b/app/__main__.py @@ -104,6 +104,7 @@ if __name__ == '__main__': from app.client_bot import dp as client_bot_dp from app.core._config import SANIC_PORT, PROJECT_HOST, DATABASE_URL from app.core.network.nodes import network_handshake_daemon, bootstrap_once_and_exit_if_failed + from app.core.network.maintenance import replication_daemon, heartbeat_daemon app.ctx.memory = main_memory for _target in [uploader_bot_dp, client_bot_dp]: @@ -121,6 +122,8 @@ if __name__ == '__main__': # Start network handshake daemon and bootstrap step app.add_task(network_handshake_daemon(app)) app.add_task(bootstrap_once_and_exit_if_failed()) + app.add_task(replication_daemon(app)) + app.add_task(heartbeat_daemon(app)) app.run(host='0.0.0.0', port=SANIC_PORT) else: diff --git a/app/api/__init__.py b/app/api/__init__.py index 4a47eb9..cdca550 100644 --- a/app/api/__init__.py +++ b/app/api/__init__.py @@ -57,6 +57,7 @@ from app.api.routes.tonconnect import s_api_v1_tonconnect_new, s_api_v1_tonconne from app.api.routes.keys import s_api_v1_keys_request from app.api.routes.sync import s_api_v1_sync_pin, s_api_v1_sync_status from app.api.routes.upload_status import s_api_v1_upload_status +from app.api.routes.metrics import s_api_metrics app.add_route(s_index, "/", methods=["GET", "OPTIONS"]) @@ -127,6 +128,7 @@ app.add_route(s_api_v1_keys_request, "/api/v1/keys.request", methods=["POST", "O app.add_route(s_api_v1_sync_pin, "/api/v1/sync.pin", methods=["POST", "OPTIONS"]) app.add_route(s_api_v1_sync_status, "/api/v1/sync.status", methods=["GET", "OPTIONS"]) app.add_route(s_api_v1_upload_status, "/api/v1/upload.status/", methods=["GET", "OPTIONS"]) +app.add_route(s_api_metrics, "/metrics", methods=["GET", "OPTIONS"]) @app.exception(BaseException) diff --git a/app/api/routes/content.py b/app/api/routes/content.py index 323e8c8..a7d3dd2 100644 --- a/app/api/routes/content.py +++ b/app/api/routes/content.py @@ -12,6 +12,7 @@ from app.core.models.content.user_content import UserContent from app.core._config import CLIENT_TELEGRAM_API_KEY, CLIENT_TELEGRAM_BOT_USERNAME, PROJECT_HOST from app.core.models.content_v3 import EncryptedContent as ECv3, ContentDerivative as CDv3, UploadSession from app.core.content.content_id import ContentId +from app.core.network.dht import MetricsAggregator import json import uuid @@ -418,6 +419,35 @@ async def s_api_v1_content_view(request, content_address: str): if not opts.get('content_ext') and '/' in content_mime: opts['content_ext'] = content_mime.split('/')[-1] + metrics_mgr: MetricsAggregator | None = getattr(request.app.ctx.memory, "metrics", None) + if metrics_mgr: + viewer_salt_raw = request.headers.get("X-View-Salt") + if viewer_salt_raw: + try: + viewer_salt = bytes.fromhex(viewer_salt_raw) + except ValueError: + viewer_salt = viewer_salt_raw.encode() + elif request.ctx.user: + viewer_salt = f"user:{request.ctx.user.id}".encode() + else: + viewer_salt = (request.remote_addr or request.ip or "anonymous").encode() + try: + watch_time_param = int(request.args.get("watch_time", 0)) + except (TypeError, ValueError): + watch_time_param = 0 + try: + bytes_out_param = int(request.args.get("bytes_out", 0)) + except (TypeError, ValueError): + bytes_out_param = 0 + completed_param = request.args.get("completed", "0") in ("1", "true", "True") + metrics_mgr.record_view( + content_id=content['encrypted_content'].hash, + viewer_salt=viewer_salt, + watch_time=watch_time_param, + bytes_out=bytes_out_param, + completed=completed_param, + ) + return response.json({ **opts, 'encrypted': content['encrypted_content'].json_format(), diff --git a/app/api/routes/metrics.py b/app/api/routes/metrics.py new file mode 100644 index 0000000..d82f396 --- /dev/null +++ b/app/api/routes/metrics.py @@ -0,0 +1,39 @@ +from __future__ import annotations + +from sanic import response + + +async def s_api_metrics(request): + try: + from prometheus_client import generate_latest, CONTENT_TYPE_LATEST # type: ignore + data = generate_latest() + return response.raw(data, content_type=CONTENT_TYPE_LATEST) + except Exception: + # Fallback: export minimal in-process counters from DHT module, if available + try: + from app.core.network.dht import prometheus as dprom + + def dump(metric_obj, metric_name): + lines = [] + values = getattr(metric_obj, "_values", {}) + for labels, value in values.items(): + label_str = ",".join(f'{k}="{v}"' for k, v in labels) + if label_str: + lines.append(f"{metric_name}{{{label_str}}} {value}") + else: + lines.append(f"{metric_name} {value}") + return lines + + parts = [] + parts += dump(dprom.replication_under, "dht_replication_under_total") + parts += dump(dprom.replication_over, "dht_replication_over_total") + parts += dump(dprom.leader_changes, "dht_leader_changes_total") + parts += dump(dprom.merge_conflicts, "dht_merge_conflicts_total") + parts += dump(dprom.view_count_total, "dht_view_count_total") + parts += dump(dprom.unique_estimate, "dht_unique_view_estimate") + parts += dump(dprom.watch_time_seconds, "dht_watch_time_seconds") + body = "\n".join(parts) + ("\n" if parts else "") + return response.text(body, content_type="text/plain; version=0.0.4") + except Exception: + return response.text("") + diff --git a/app/api/routes/network.py b/app/api/routes/network.py index e338f7f..9f47b53 100644 --- a/app/api/routes/network.py +++ b/app/api/routes/network.py @@ -4,7 +4,7 @@ import json from datetime import datetime from typing import Dict, Any -from base58 import b58decode +from app.core._utils.b58 import b58decode from sanic import response from urllib.parse import urlparse @@ -19,6 +19,8 @@ from app.core.network.config import HANDSHAKE_TS_TOLERANCE_SEC from app.core.ipfs_client import swarm_connect from app.core._config import PROJECT_HOST from app.core.events.service import record_event +from app.core.network.asn import resolver as asn_resolver +from app.core.network.dht import compute_node_id, dht_config, ReachabilityReceipt def _port_from_public_host(public_host: str) -> int: @@ -91,7 +93,7 @@ async def s_api_v1_network_handshake(request): return response.json({"error": "RATE_LIMIT"}, status=429) data = request.json or {} - required = ["version", "public_key", "node_type", "metrics", "timestamp", "signature"] + required = ["version", "schema_version", "public_key", "node_id", "node_type", "metrics", "timestamp", "signature"] for f in required: if f not in data: return response.json({"error": f"Missing field {f}"}, status=400) @@ -137,22 +139,62 @@ async def s_api_v1_network_handshake(request): "peer": peer_version, }, status=409) - # Verify signature + # Verify signature (Ed25519). If libsodium not available, accept but log a warning. + signed_fields = {k: v for (k, v) in data.items() if k != "signature"} + blob = json.dumps(signed_fields, sort_keys=True, separators=(",", ":")).encode() + ok = False try: - # Verify signature over the entire payload except the signature itself - signed_fields = {k: v for (k, v) in data.items() if k != "signature"} - blob = json.dumps(signed_fields, sort_keys=True, separators=(",", ":")).encode() - import nacl.signing, nacl.encoding - vk = nacl.signing.VerifyKey(b58decode(data["public_key"])) - sig = b58decode(data["signature"]) + import nacl.signing, nacl.encoding # type: ignore + vk = nacl.signing.VerifyKey(b58decode(data.get("public_key", ""))) + sig = b58decode(data.get("signature", "")) vk.verify(blob, sig) ok = True - except Exception: + except Exception as e: ok = False if not ok: make_log("Handshake", f"Signature verification failed from {data.get('public_host')}", level='warning') return response.json({"error": "BAD_SIGNATURE"}, status=400) + # Update membership / reachability information + try: + membership_mgr = getattr(request.app.ctx.memory, "membership", None) + if membership_mgr: + remote_ip = (request.headers.get('X-Forwarded-For') or request.remote_addr or request.ip or '').split(',')[0].strip() or None + remote_asn = data.get("asn") + if remote_asn is None: + remote_asn = asn_resolver.resolve(remote_ip) + else: + if remote_ip: + asn_resolver.learn(remote_ip, int(remote_asn)) + membership_mgr.update_member( + node_id=data["node_id"], + public_key=data["public_key"], + ip=remote_ip, + asn=int(remote_asn) if remote_asn is not None else None, + metadata={ + "capabilities": data.get("capabilities", {}), + "metrics": data.get("metrics", {}), + "public_host": data.get("public_host"), + }, + ) + for receipt in data.get("reachability_receipts") or []: + if not receipt.get("target_id") or not receipt.get("issuer_id"): + continue + try: + membership_mgr.record_receipt( + ReachabilityReceipt( + target_id=str(receipt.get("target_id")), + issuer_id=str(receipt.get("issuer_id")), + asn=int(receipt["asn"]) if receipt.get("asn") is not None else None, + timestamp=float(receipt.get("timestamp", data.get("timestamp"))), + signature=str(receipt.get("signature", "")), + ) + ) + except Exception: + continue + except Exception as exc: + make_log("Handshake", f"Membership ingest failed: {exc}", level='warning') + # Upsert node and respond with our info + known public nodes # Do not persist private peers (ephemeral) if data.get("node_type") != "private" and data.get("public_host"): @@ -215,10 +257,13 @@ async def s_api_v1_network_handshake(request): node = await compute_node_info(request.ctx.db_session) known = await list_known_public_nodes(request.ctx.db_session) + membership_mgr = getattr(request.app.ctx.memory, "membership", None) + n_estimate = membership_mgr.n_estimate() if membership_mgr else 0 resp = sign_response({ "compatibility": comp, "node": node, "known_public_nodes": known, + "n_estimate": n_estimate, }) make_log("Handshake", f"OK with {data.get('public_host')} compat={comp}") status = 200 @@ -226,3 +271,8 @@ async def s_api_v1_network_handshake(request): status = 200 resp["warning"] = "MINOR version differs; proceed with caution" return response.json(resp, status=status) + if data.get("schema_version") != dht_config.schema_version: + return response.json({"error": "UNSUPPORTED_SCHEMA_VERSION"}, status=400) + expected_node_id = compute_node_id(b58decode(data["public_key"])) + if data.get("node_id") != expected_node_id: + return response.json({"error": "NODE_ID_MISMATCH"}, status=400) diff --git a/app/core/__pycache__/__init__.cpython-310.pyc b/app/core/__pycache__/__init__.cpython-310.pyc new file mode 100644 index 0000000..f2b0218 Binary files /dev/null and b/app/core/__pycache__/__init__.cpython-310.pyc differ diff --git a/app/core/_crypto/__pycache__/__init__.cpython-310.pyc b/app/core/_crypto/__pycache__/__init__.cpython-310.pyc new file mode 100644 index 0000000..71529a4 Binary files /dev/null and b/app/core/_crypto/__pycache__/__init__.cpython-310.pyc differ diff --git a/app/core/_crypto/__pycache__/signer.cpython-310.pyc b/app/core/_crypto/__pycache__/signer.cpython-310.pyc new file mode 100644 index 0000000..55f7076 Binary files /dev/null and b/app/core/_crypto/__pycache__/signer.cpython-310.pyc differ diff --git a/app/core/_crypto/signer.py b/app/core/_crypto/signer.py index f67e39e..3198d80 100644 --- a/app/core/_crypto/signer.py +++ b/app/core/_crypto/signer.py @@ -1,24 +1,58 @@ -import base58 -import nacl.encoding -import nacl.signing +from app.core._utils.b58 import b58encode, b58decode + +try: + import nacl.encoding + import nacl.signing + import nacl.exceptions + _HAS_NACL = True +except Exception: # pragma: no cover - fallback path + _HAS_NACL = False + +from app.core._utils.hash import blake3_digest -class Signer: - def __init__(self, seed: bytes): - if len(seed) != 32: - raise ValueError("Seed must be 32 bytes") - self.signing_key = nacl.signing.SigningKey(seed) - self.verify_key = self.signing_key.verify_key +if _HAS_NACL: - def sign(self, data_bytes: bytes) -> str: - signed_message = self.signing_key.sign(data_bytes) - signature = signed_message.signature - return base58.b58encode(signature).decode() + class Signer: + def __init__(self, seed: bytes): + if len(seed) != 32: + raise ValueError("Seed must be 32 bytes") + self.signing_key = nacl.signing.SigningKey(seed) + self.verify_key = self.signing_key.verify_key - def verify(self, data_bytes: bytes, signature: str) -> bool: - signature_bytes = base58.b58decode(signature) - try: - self.verify_key.verify(data_bytes, signature_bytes) - return True - except nacl.exceptions.BadSignatureError: - return False + def sign(self, data_bytes: bytes) -> str: + signed_message = self.signing_key.sign(data_bytes) + signature = signed_message.signature + return b58encode(signature).decode() + + def verify(self, data_bytes: bytes, signature: str) -> bool: + signature_bytes = b58decode(signature) + try: + self.verify_key.verify(data_bytes, signature_bytes) + return True + except nacl.exceptions.BadSignatureError: + return False + +else: + + class _VerifyKey: + def __init__(self, key_bytes: bytes): + self._key_bytes = key_bytes + + def encode(self) -> bytes: + return self._key_bytes + + class Signer: + def __init__(self, seed: bytes): + if len(seed) != 32: + raise ValueError("Seed must be 32 bytes") + self.seed = seed + self.verify_key = _VerifyKey(seed) + + def sign(self, data_bytes: bytes) -> str: + digest = blake3_digest(self.seed + data_bytes) + return b58encode(digest).decode() + + def verify(self, data_bytes: bytes, signature: str) -> bool: + expected = self.sign(data_bytes) + return expected == signature diff --git a/app/core/_utils/__pycache__/__init__.cpython-310.pyc b/app/core/_utils/__pycache__/__init__.cpython-310.pyc new file mode 100644 index 0000000..ab8c37b Binary files /dev/null and b/app/core/_utils/__pycache__/__init__.cpython-310.pyc differ diff --git a/app/core/_utils/__pycache__/b58.cpython-310.pyc b/app/core/_utils/__pycache__/b58.cpython-310.pyc new file mode 100644 index 0000000..0a8647c Binary files /dev/null and b/app/core/_utils/__pycache__/b58.cpython-310.pyc differ diff --git a/app/core/_utils/__pycache__/hash.cpython-310.pyc b/app/core/_utils/__pycache__/hash.cpython-310.pyc new file mode 100644 index 0000000..f1bead9 Binary files /dev/null and b/app/core/_utils/__pycache__/hash.cpython-310.pyc differ diff --git a/app/core/_utils/__pycache__/tg_process_template.cpython-310.pyc b/app/core/_utils/__pycache__/tg_process_template.cpython-310.pyc new file mode 100644 index 0000000..f730ca3 Binary files /dev/null and b/app/core/_utils/__pycache__/tg_process_template.cpython-310.pyc differ diff --git a/app/core/_utils/b58.py b/app/core/_utils/b58.py new file mode 100644 index 0000000..e49a4da --- /dev/null +++ b/app/core/_utils/b58.py @@ -0,0 +1,51 @@ +from __future__ import annotations + +try: + # Prefer external package if available + from base58 import b58encode, b58decode # type: ignore +except Exception: + # Minimal fallback (compatible subset) + ALPHABET = "123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz" + ALPHABET_INDEX = {c: i for i, c in enumerate(ALPHABET)} + + def _to_bytes(value: bytes | bytearray | str) -> bytes: + if isinstance(value, (bytes, bytearray)): + return bytes(value) + if isinstance(value, str): + return value.encode() + raise TypeError("value must be bytes or str") + + def b58encode(data: bytes | bytearray | str) -> bytes: + data = _to_bytes(data) + if not data: + return b"" + n = int.from_bytes(data, "big") + out = [] + while n > 0: + n, rem = divmod(n, 58) + out.append(ALPHABET[rem]) + enc = "".join(reversed(out)) + leading = 0 + for b in data: + if b == 0: + leading += 1 + else: + break + return ("1" * leading + enc).encode() + + def b58decode(data: bytes | bytearray | str) -> bytes: + data_b = _to_bytes(data) + if not data_b: + return b"" + num = 0 + for ch in data_b.decode(): + num = num * 58 + ALPHABET_INDEX[ch] + full = num.to_bytes((num.bit_length() + 7) // 8, "big") + leading = 0 + for ch in data_b: + if ch == ord('1'): + leading += 1 + else: + break + return b"\x00" * leading + full + diff --git a/app/core/_utils/hash.py b/app/core/_utils/hash.py new file mode 100644 index 0000000..b3a95a3 --- /dev/null +++ b/app/core/_utils/hash.py @@ -0,0 +1,29 @@ +from __future__ import annotations + +import hashlib +from typing import Iterable + + +def _to_bytes(data: Iterable[int] | bytes | bytearray | str) -> bytes: + if isinstance(data, (bytes, bytearray)): + return bytes(data) + if isinstance(data, str): + return data.encode() + return bytes(data) + + +def blake3_digest(data: Iterable[int] | bytes | bytearray | str) -> bytes: + try: + from blake3 import blake3 # type: ignore + return blake3(_to_bytes(data)).digest() + except Exception: + return hashlib.blake2s(_to_bytes(data)).digest() + + +def blake3_hex(data: Iterable[int] | bytes | bytearray | str) -> str: + try: + from blake3 import blake3 # type: ignore + return blake3(_to_bytes(data)).hexdigest() + except Exception: + return hashlib.blake2s(_to_bytes(data)).hexdigest() + diff --git a/app/core/models/memory.py b/app/core/models/memory.py index 949748e..55d44c7 100644 --- a/app/core/models/memory.py +++ b/app/core/models/memory.py @@ -4,9 +4,19 @@ from datetime import datetime from datetime import timedelta from aiogram import Bot +from app.core._utils.b58 import b58encode from app.core._config import TELEGRAM_API_KEY, CLIENT_TELEGRAM_API_KEY +from app.core._crypto.signer import Signer +from app.core._secrets import hot_pubkey, hot_seed from app.core.logger import make_log +from app.core.network.dht import ( + DHTStore, + MembershipManager, + ReplicationManager, + MetricsAggregator, + compute_node_id, +) class Memory: @@ -46,6 +56,15 @@ class Memory: self._handshake_rl = {"minute": 0, "counts": {}} self._handshake_nonces = {} + # Decentralised storage components + self.node_id = compute_node_id(hot_pubkey) + self.signer = Signer(hot_seed) + self.dht_store = DHTStore(self.node_id, self.signer) + self.membership = MembershipManager(self.node_id, self.signer, self.dht_store) + self.replication = ReplicationManager(self.node_id, self.signer, self.dht_store) + self.metrics = MetricsAggregator(self.node_id, self.signer, self.dht_store) + self.membership.register_local(public_key=b58encode(hot_pubkey).decode(), ip=None, asn=None) + @asynccontextmanager async def transaction(self, desc=""): make_log("Memory.transaction", f"Starting transaction; {desc}", level='debug') @@ -80,4 +99,3 @@ class Memory: make_log("Queue.add_task", f"Error when adding task to memory: {e}", level='error') self._execute_queue.append([_fn, args, kwargs]) - diff --git a/app/core/network/__pycache__/__init__.cpython-310.pyc b/app/core/network/__pycache__/__init__.cpython-310.pyc new file mode 100644 index 0000000..f3f2ef8 Binary files /dev/null and b/app/core/network/__pycache__/__init__.cpython-310.pyc differ diff --git a/app/core/network/asn.py b/app/core/network/asn.py new file mode 100644 index 0000000..704ce7f --- /dev/null +++ b/app/core/network/asn.py @@ -0,0 +1,37 @@ +from __future__ import annotations + +import ipaddress +from dataclasses import dataclass, field +from typing import Dict, Optional + +from app.core.logger import make_log + + +@dataclass +class ASNResolver: + cache: Dict[str, int] = field(default_factory=dict) + + def normalise(self, ip: str | None) -> Optional[str]: + if not ip: + return None + try: + return str(ipaddress.ip_address(ip)) + except Exception: + return None + + def resolve(self, ip: str | None) -> Optional[int]: + norm = self.normalise(ip) + if not norm: + return None + return self.cache.get(norm) + + def learn(self, ip: str, asn: int) -> None: + norm = self.normalise(ip) + if not norm: + make_log("ASNResolver", f"Invalid IP provided for learn: {ip}", level="warning") + return + self.cache[norm] = asn + + +resolver = ASNResolver() + diff --git a/app/core/network/dht/__init__.py b/app/core/network/dht/__init__.py new file mode 100644 index 0000000..0d8adb6 --- /dev/null +++ b/app/core/network/dht/__init__.py @@ -0,0 +1,35 @@ +""" +Decentralised storage, replication, and metrics layer. +""" + +from .config import dht_config, DHTConfig +from .crypto import compute_node_id, compute_content_id, compute_view_id, bits_from_hex, rendezvous_score +from .keys import MetaKey, MetricKey, MembershipKey +from .membership import MembershipManager, MembershipState, ReachabilityReceipt +from .replication import ReplicationManager, ReplicationState, ReplicaLease +from .metrics import MetricsAggregator, ContentMetricsState, MetricDelta +from .store import DHTStore + +__all__ = [ + "dht_config", + "DHTConfig", + "compute_node_id", + "compute_content_id", + "compute_view_id", + "bits_from_hex", + "rendezvous_score", + "MetaKey", + "MetricKey", + "MembershipKey", + "MembershipManager", + "MembershipState", + "ReachabilityReceipt", + "ReplicationManager", + "ReplicationState", + "ReplicaLease", + "MetricsAggregator", + "ContentMetricsState", + "MetricDelta", + "DHTStore", +] + diff --git a/app/core/network/dht/__pycache__/__init__.cpython-310.pyc b/app/core/network/dht/__pycache__/__init__.cpython-310.pyc new file mode 100644 index 0000000..b688479 Binary files /dev/null and b/app/core/network/dht/__pycache__/__init__.cpython-310.pyc differ diff --git a/app/core/network/dht/__pycache__/config.cpython-310.pyc b/app/core/network/dht/__pycache__/config.cpython-310.pyc new file mode 100644 index 0000000..87c49bc Binary files /dev/null and b/app/core/network/dht/__pycache__/config.cpython-310.pyc differ diff --git a/app/core/network/dht/__pycache__/crdt.cpython-310.pyc b/app/core/network/dht/__pycache__/crdt.cpython-310.pyc new file mode 100644 index 0000000..da69d41 Binary files /dev/null and b/app/core/network/dht/__pycache__/crdt.cpython-310.pyc differ diff --git a/app/core/network/dht/__pycache__/crypto.cpython-310.pyc b/app/core/network/dht/__pycache__/crypto.cpython-310.pyc new file mode 100644 index 0000000..8e88e5f Binary files /dev/null and b/app/core/network/dht/__pycache__/crypto.cpython-310.pyc differ diff --git a/app/core/network/dht/__pycache__/keys.cpython-310.pyc b/app/core/network/dht/__pycache__/keys.cpython-310.pyc new file mode 100644 index 0000000..6bb6d72 Binary files /dev/null and b/app/core/network/dht/__pycache__/keys.cpython-310.pyc differ diff --git a/app/core/network/dht/__pycache__/membership.cpython-310.pyc b/app/core/network/dht/__pycache__/membership.cpython-310.pyc new file mode 100644 index 0000000..7e11208 Binary files /dev/null and b/app/core/network/dht/__pycache__/membership.cpython-310.pyc differ diff --git a/app/core/network/dht/__pycache__/metrics.cpython-310.pyc b/app/core/network/dht/__pycache__/metrics.cpython-310.pyc new file mode 100644 index 0000000..f115219 Binary files /dev/null and b/app/core/network/dht/__pycache__/metrics.cpython-310.pyc differ diff --git a/app/core/network/dht/__pycache__/prometheus.cpython-310.pyc b/app/core/network/dht/__pycache__/prometheus.cpython-310.pyc new file mode 100644 index 0000000..aab32c3 Binary files /dev/null and b/app/core/network/dht/__pycache__/prometheus.cpython-310.pyc differ diff --git a/app/core/network/dht/__pycache__/records.cpython-310.pyc b/app/core/network/dht/__pycache__/records.cpython-310.pyc new file mode 100644 index 0000000..2f2e39e Binary files /dev/null and b/app/core/network/dht/__pycache__/records.cpython-310.pyc differ diff --git a/app/core/network/dht/__pycache__/replication.cpython-310.pyc b/app/core/network/dht/__pycache__/replication.cpython-310.pyc new file mode 100644 index 0000000..f4924e0 Binary files /dev/null and b/app/core/network/dht/__pycache__/replication.cpython-310.pyc differ diff --git a/app/core/network/dht/__pycache__/store.cpython-310.pyc b/app/core/network/dht/__pycache__/store.cpython-310.pyc new file mode 100644 index 0000000..57a229b Binary files /dev/null and b/app/core/network/dht/__pycache__/store.cpython-310.pyc differ diff --git a/app/core/network/dht/config.py b/app/core/network/dht/config.py new file mode 100644 index 0000000..ac199a2 --- /dev/null +++ b/app/core/network/dht/config.py @@ -0,0 +1,54 @@ +from __future__ import annotations + +import os +from dataclasses import dataclass +from functools import lru_cache + + +SCHEMA_VERSION = "v1" + + +def _env_int(name: str, default: int) -> int: + try: + return int(os.getenv(name, default)) + except Exception: + return default + + +def _env_float(name: str, default: float) -> float: + try: + return float(os.getenv(name, default)) + except Exception: + return default + + +@dataclass(frozen=True) +class DHTConfig: + """Runtime configuration for the decentralized storage layer.""" + + schema_version: str = SCHEMA_VERSION + min_receipts: int = _env_int("DHT_MIN_RECEIPTS", 5) + min_reachability_ratio: float = _env_float("DHT_MIN_REACHABILITY", 0.6) + membership_ttl: int = _env_int("DHT_MEMBERSHIP_TTL", 600) + replication_target: int = max(3, _env_int("DHT_REPLICATION_TARGET", 3)) + lease_ttl: int = _env_int("DHT_LEASE_TTL", 600) + heartbeat_interval: int = _env_int("DHT_HEARTBEAT_INTERVAL", 60) + heartbeat_miss_threshold: int = _env_int("DHT_HEARTBEAT_MISS_THRESHOLD", 3) + rendezvous_base: str = os.getenv("DHT_RENDEZVOUS_HASH", "blake3") + pow_difficulty: int = _env_int("DHT_POW_DIFFICULTY", 4) + min_asn_diversity: int = _env_int("DHT_MIN_ASN", 3) + min_ip_octet_diversity: int = _env_int("DHT_MIN_IP_OCTETS", 3) + window_size: int = _env_int("DHT_METRIC_WINDOW_SEC", 3600) + default_q: float = _env_float("DHT_MIN_Q", 0.6) + seed_refresh_interval: int = _env_int("DHT_SEED_REFRESH_INTERVAL", 30) + + +@lru_cache +def load_config() -> DHTConfig: + """Load configuration with process-wide memoisation.""" + + return DHTConfig() + + +dht_config = load_config() + diff --git a/app/core/network/dht/crdt.py b/app/core/network/dht/crdt.py new file mode 100644 index 0000000..de484ac --- /dev/null +++ b/app/core/network/dht/crdt.py @@ -0,0 +1,278 @@ +from __future__ import annotations + +import math +import time +from dataclasses import dataclass, field +from typing import Dict, Any, Iterable, Tuple + +from app.core._utils.hash import blake3_hex + + +class CRDTMergeError(RuntimeError): + pass + + +class CRDT: + def merge(self, other: "CRDT") -> "CRDT": + raise NotImplementedError + + def to_dict(self) -> Dict[str, Any]: + raise NotImplementedError + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "CRDT": + raise NotImplementedError + + +@dataclass +class LWWElement: + value: Any + logical_counter: int + timestamp: float + node_id: str + + def dominates(self, other: "LWWElement") -> bool: + if self.logical_counter > other.logical_counter: + return True + if self.logical_counter < other.logical_counter: + return False + if self.timestamp > other.timestamp: + return True + if self.timestamp < other.timestamp: + return False + # Break all ties by NodeID ordering to guarantee determinism + return self.node_id > other.node_id + + +class LWWRegister(CRDT): + def __init__(self, element: LWWElement | None = None): + self.element = element + + def assign(self, value: Any, logical_counter: int, node_id: str, timestamp: float | None = None) -> None: + new_el = LWWElement(value=value, logical_counter=logical_counter, timestamp=timestamp or time.time(), node_id=node_id) + if self.element is None or new_el.dominates(self.element): + self.element = new_el + + def merge(self, other: "LWWRegister") -> "LWWRegister": + if other.element and (self.element is None or other.element.dominates(self.element)): + self.element = other.element + return self + + def value(self) -> Any: + return self.element.value if self.element else None + + def to_dict(self) -> Dict[str, Any]: + if not self.element: + return {} + return { + "value": self.element.value, + "logical_counter": self.element.logical_counter, + "timestamp": self.element.timestamp, + "node_id": self.element.node_id, + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "LWWRegister": + if not data: + return cls() + element = LWWElement( + value=data.get("value"), + logical_counter=int(data["logical_counter"]), + timestamp=float(data["timestamp"]), + node_id=str(data["node_id"]), + ) + return cls(element=element) + + +class LWWSet(CRDT): + def __init__(self, adds: Dict[str, LWWElement] | None = None, removes: Dict[str, LWWElement] | None = None): + self.adds: Dict[str, LWWElement] = adds or {} + self.removes: Dict[str, LWWElement] = removes or {} + + def add(self, element_id: str, value: Any, logical_counter: int, node_id: str, timestamp: float | None = None) -> None: + elem = LWWElement(value=value, logical_counter=logical_counter, timestamp=timestamp or time.time(), node_id=node_id) + existing = self.adds.get(element_id) + if not existing or elem.dominates(existing): + self.adds[element_id] = elem + + def remove(self, element_id: str, logical_counter: int, node_id: str, timestamp: float | None = None) -> None: + elem = LWWElement(value=None, logical_counter=logical_counter, timestamp=timestamp or time.time(), node_id=node_id) + existing = self.removes.get(element_id) + if not existing or elem.dominates(existing): + self.removes[element_id] = elem + + def lookup(self, element_id: str) -> Any | None: + add = self.adds.get(element_id) + remove = self.removes.get(element_id) + if add and (not remove or add.dominates(remove)): + return add.value + return None + + def elements(self) -> Dict[str, Any]: + return {eid: elem.value for eid, elem in self.adds.items() if self.lookup(eid) is not None} + + def merge(self, other: "LWWSet") -> "LWWSet": + for eid, elem in other.adds.items(): + current = self.adds.get(eid) + if not current or elem.dominates(current): + self.adds[eid] = elem + for eid, elem in other.removes.items(): + current = self.removes.get(eid) + if not current or elem.dominates(current): + self.removes[eid] = elem + return self + + def to_dict(self) -> Dict[str, Any]: + def serialize_map(source: Dict[str, LWWElement]) -> Dict[str, Dict[str, Any]]: + return { + eid: { + "value": elem.value, + "logical_counter": elem.logical_counter, + "timestamp": elem.timestamp, + "node_id": elem.node_id, + } + for eid, elem in source.items() + } + + return {"adds": serialize_map(self.adds), "removes": serialize_map(self.removes)} + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "LWWSet": + adds = { + eid: LWWElement( + value=elem.get("value"), + logical_counter=int(elem["logical_counter"]), + timestamp=float(elem["timestamp"]), + node_id=str(elem["node_id"]), + ) + for eid, elem in (data.get("adds") or {}).items() + } + removes = { + eid: LWWElement( + value=elem.get("value"), + logical_counter=int(elem["logical_counter"]), + timestamp=float(elem["timestamp"]), + node_id=str(elem["node_id"]), + ) + for eid, elem in (data.get("removes") or {}).items() + } + return cls(adds=adds, removes=removes) + + +class PNCounter(CRDT): + def __init__(self, increments: Dict[str, int] | None = None, decrements: Dict[str, int] | None = None): + self.increments = increments or {} + self.decrements = decrements or {} + + def increment(self, node_id: str, value: int = 1) -> None: + if value < 0: + raise ValueError("value must be non-negative for increment") + self.increments[node_id] = self.increments.get(node_id, 0) + value + + def decrement(self, node_id: str, value: int = 1) -> None: + if value < 0: + raise ValueError("value must be non-negative for decrement") + self.decrements[node_id] = self.decrements.get(node_id, 0) + value + + def value(self) -> int: + return sum(self.increments.values()) - sum(self.decrements.values()) + + def merge(self, other: "PNCounter") -> "PNCounter": + for nid, val in other.increments.items(): + self.increments[nid] = max(self.increments.get(nid, 0), val) + for nid, val in other.decrements.items(): + self.decrements[nid] = max(self.decrements.get(nid, 0), val) + return self + + def to_dict(self) -> Dict[str, Any]: + return {"inc": dict(self.increments), "dec": dict(self.decrements)} + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "PNCounter": + return cls(increments=dict(data.get("inc") or {}), decrements=dict(data.get("dec") or {})) + + +class GCounter(CRDT): + def __init__(self, counters: Dict[str, int] | None = None): + self.counters = counters or {} + + def increment(self, node_id: str, value: int = 1) -> None: + if value < 0: + raise ValueError("value must be non-negative") + self.counters[node_id] = self.counters.get(node_id, 0) + value + + def value(self) -> int: + return sum(self.counters.values()) + + def merge(self, other: "GCounter") -> "GCounter": + for nid, val in other.counters.items(): + self.counters[nid] = max(self.counters.get(nid, 0), val) + return self + + def to_dict(self) -> Dict[str, Any]: + return dict(self.counters) + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "GCounter": + return cls(counters=dict(data or {})) + + +def _leading_zeros(value: int, width: int) -> int: + if value == 0: + return width + return width - value.bit_length() + + +@dataclass +class HyperLogLog(CRDT): + precision: int = 12 + registers: Tuple[int, ...] = field(default_factory=tuple) + + def __post_init__(self) -> None: + if not self.registers: + self.registers = tuple([0] * (1 << self.precision)) + else: + self.registers = tuple(self.registers) + + @property + def m(self) -> int: + return len(self.registers) + + def add(self, value: Any) -> None: + if value is None: + return + hashed = int(blake3_hex(str(value).encode()), 16) + index = hashed & (self.m - 1) + w = hashed >> self.precision + rank = _leading_zeros(w, 256 - self.precision) + 1 + current = self.registers[index] + if rank > current: + regs = list(self.registers) + regs[index] = rank + self.registers = tuple(regs) + + def estimate(self) -> float: + alpha = 0.7213 / (1 + 1.079 / self.m) + indicator = sum(2.0 ** (-r) for r in self.registers) + raw = alpha * (self.m ** 2) / indicator + if raw <= 2.5 * self.m: + zeros = self.registers.count(0) + if zeros: + return self.m * math.log(self.m / zeros) + return raw + + def merge(self, other: "HyperLogLog") -> "HyperLogLog": + if self.m != other.m: + raise CRDTMergeError("Cannot merge HyperLogLog instances with different precision") + merged = [max(a, b) for a, b in zip(self.registers, other.registers)] + self.registers = tuple(merged) + return self + + def to_dict(self) -> Dict[str, Any]: + return {"precision": self.precision, "registers": list(self.registers)} + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "HyperLogLog": + if not data: + return cls() + return cls(precision=int(data.get("precision", 12)), registers=tuple(int(x) for x in data.get("registers", []))) diff --git a/app/core/network/dht/crypto.py b/app/core/network/dht/crypto.py new file mode 100644 index 0000000..503db99 --- /dev/null +++ b/app/core/network/dht/crypto.py @@ -0,0 +1,70 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Iterable, Tuple + +from app.core._utils.hash import blake3_hex + + +BLAKE3_DIGEST_SIZE = 32 + + +def _ensure_bytes(data: Iterable[int] | bytes | bytearray) -> bytes: + if isinstance(data, (bytes, bytearray)): + return bytes(data) + if isinstance(data, str): + return data.encode() + return bytes(data) + + +def digest_hex(data: Iterable[int] | bytes | bytearray | str) -> str: + return blake3_hex(_ensure_bytes(data)) + + +def compute_node_id(public_key: bytes) -> str: + """NodeID = blake3(pubkey).""" + + if not isinstance(public_key, (bytes, bytearray)): + raise TypeError("public_key must be bytes") + return digest_hex(public_key) + + +def compute_content_id(encrypted_blob: bytes) -> str: + """ContentID = blake3(encrypted_blob).""" + + return digest_hex(encrypted_blob) + + +def compute_view_id(content_id: str, viewer_salt: bytes) -> str: + """ViewID = blake3(ContentID||viewer_salt).""" + + if not viewer_salt: + raise ValueError("viewer_salt must not be empty") + return digest_hex(content_id.encode() + viewer_salt) + + +def bits_from_hex(hex_digest: str, prefix_bits: int) -> Tuple[int, int]: + """Extract first prefix_bits from a hex digest. Returns (prefix, total_bits).""" + + if prefix_bits < 0: + raise ValueError("prefix_bits must be >= 0") + bitstring = bin(int(hex_digest, 16))[2:].zfill(len(hex_digest) * 4) + if prefix_bits == 0: + return 0, len(bitstring) + return int(bitstring[:prefix_bits], 2), len(bitstring) + + +def rendezvous_score(content_id: str, node_id: str) -> int: + """Return rendezvous score via blake3(ContentID||NodeID).""" + return int(blake3_hex(f"{content_id}:{node_id}".encode()), 16) + + +@dataclass(frozen=True) +class ContentFingerprint: + content_id: str + node_id_prefix: int + prefix_bits: int + + def matches(self, node_id: str) -> bool: + prefix, total = bits_from_hex(node_id, self.prefix_bits) + return prefix == self.node_id_prefix and total >= self.prefix_bits diff --git a/app/core/network/dht/keys.py b/app/core/network/dht/keys.py new file mode 100644 index 0000000..f8bcfbd --- /dev/null +++ b/app/core/network/dht/keys.py @@ -0,0 +1,72 @@ +from __future__ import annotations + +import json +from dataclasses import dataclass +from datetime import datetime, timezone +from typing import Dict, Any + +from .config import dht_config +from .crypto import digest_hex + + +def _json_dumps(data: Dict[str, Any]) -> bytes: + return json.dumps(data, sort_keys=True, separators=(",", ":")).encode() + + +@dataclass(frozen=True) +class MetaKey: + content_id: str + schema_version: str = dht_config.schema_version + + def fingerprint(self) -> str: + return digest_hex(self.serialize()) + + def serialize(self) -> bytes: + return _json_dumps({"schema_version": self.schema_version, "content_id": self.content_id, "type": "meta"}) + + def __str__(self) -> str: + return f"meta:{self.schema_version}:{self.content_id}" + + +@dataclass(frozen=True) +class MembershipKey: + node_id: str + schema_version: str = dht_config.schema_version + + def fingerprint(self) -> str: + return digest_hex(self.serialize()) + + def serialize(self) -> bytes: + return _json_dumps({"schema_version": self.schema_version, "node_id": self.node_id, "type": "membership"}) + + def __str__(self) -> str: + return f"membership:{self.schema_version}:{self.node_id}" + + +@dataclass(frozen=True) +class MetricKey: + content_id: str + window_id: str + schema_version: str = dht_config.schema_version + + @classmethod + def window_for(cls, timestamp: float, window_size: int | None = None) -> str: + win = int(timestamp // (window_size or dht_config.window_size)) + return datetime.fromtimestamp(win * (window_size or dht_config.window_size), tz=timezone.utc).strftime("%Y%m%d%H") + + def fingerprint(self) -> str: + return digest_hex(self.serialize()) + + def serialize(self) -> bytes: + return _json_dumps( + { + "schema_version": self.schema_version, + "content_id": self.content_id, + "window_id": self.window_id, + "type": "metric", + } + ) + + def __str__(self) -> str: + return f"metric:{self.schema_version}:{self.content_id}:{self.window_id}" + diff --git a/app/core/network/dht/membership.py b/app/core/network/dht/membership.py new file mode 100644 index 0000000..12412d0 --- /dev/null +++ b/app/core/network/dht/membership.py @@ -0,0 +1,219 @@ +from __future__ import annotations + +import ipaddress +import time +from dataclasses import dataclass +from typing import Dict, Any, Iterable, List, Optional, Tuple + +from app.core._crypto.signer import Signer +from .config import dht_config +from .crdt import LWWSet, HyperLogLog +from .keys import MembershipKey +from .store import DHTStore + + +@dataclass +class ReachabilityReceipt: + target_id: str + issuer_id: str + asn: Optional[int] + timestamp: float + signature: str + + def as_dict(self) -> Dict[str, Any]: + return { + "target_id": self.target_id, + "issuer_id": self.issuer_id, + "asn": self.asn, + "timestamp": self.timestamp, + "signature": self.signature, + } + + +def _ip_first_octet(host: str | None) -> Optional[int]: + if not host: + return None + try: + ip = ipaddress.ip_address(host) + return int(str(ip).split(".")[0]) + except Exception: + return None + + +class MembershipState: + def __init__(self, node_id: str, signer: Signer): + self.node_id = node_id + self.signer = signer + self.members = LWWSet() + self.receipts = LWWSet() + self.hll = HyperLogLog() + self.n_reports: Dict[str, float] = {} + self.logical_counter = 0 + + def _bump_counter(self) -> int: + self.logical_counter += 1 + return self.logical_counter + + def register_member( + self, + node_id: str, + public_key: str, + ip: str | None, + asn: Optional[int], + metadata: Dict[str, Any] | None = None, + timestamp: Optional[float] = None, + ) -> None: + payload = { + "node_id": node_id, + "public_key": public_key, + "ip": ip, + "asn": asn, + "ip_first_octet": _ip_first_octet(ip), + "meta": metadata or {}, + "last_update": timestamp or time.time(), + } + self.members.add(node_id, payload, logical_counter=self._bump_counter(), node_id=self.node_id, timestamp=timestamp) + self.hll.add(node_id) + + def forget_member(self, node_id: str) -> None: + self.members.remove(node_id, logical_counter=self._bump_counter(), node_id=self.node_id) + + def record_receipt(self, receipt: ReachabilityReceipt) -> None: + element_id = f"{receipt.target_id}:{receipt.issuer_id}" + self.receipts.add( + element_id, + receipt.as_dict(), + logical_counter=self._bump_counter(), + node_id=self.node_id, + timestamp=receipt.timestamp, + ) + + def report_local_population(self) -> None: + self.n_reports[self.node_id] = float(self.hll.estimate()) + + def merge(self, other: "MembershipState") -> "MembershipState": + self.members.merge(other.members) + self.receipts.merge(other.receipts) + self.hll.merge(other.hll) + for node_id, value in other.n_reports.items(): + self.n_reports[node_id] = max(self.n_reports.get(node_id, 0.0), value) + self.logical_counter = max(self.logical_counter, other.logical_counter) + return self + + def _unique_asn_for(self, node_id: str) -> Tuple[int, Iterable[int]]: + receipts = [ + entry + for rid, entry in self.receipts.elements().items() + if entry.get("target_id") == node_id + ] + unique_asn = {entry.get("asn") for entry in receipts if entry.get("asn") is not None} + return len(unique_asn), unique_asn + + def reachability_ratio(self, node_id: str) -> float: + unique_count, _ = self._unique_asn_for(node_id) + if dht_config.min_receipts <= 0: + return 1.0 + return min(1.0, unique_count / dht_config.min_receipts) + + def active_members(self, include_islands: bool = False) -> List[Dict[str, Any]]: + now = time.time() + result = [] + for node_id, data in self.members.elements().items(): + last_update = data.get("last_update") or 0 + if now - last_update > dht_config.membership_ttl: + continue + reachability = self.reachability_ratio(node_id) + if not include_islands and reachability < dht_config.default_q: + continue + enriched = dict(data) + enriched["reachability_ratio"] = reachability + result.append(enriched) + return result + + def n_estimate(self) -> float: + self.report_local_population() + active_ids = {m["node_id"] for m in self.active_members(include_islands=True)} + filtered_reports = [ + value for node_id, value in self.n_reports.items() if node_id in active_ids and self.reachability_ratio(node_id) >= dht_config.default_q + ] + local_estimate = float(self.hll.estimate()) + if filtered_reports: + return max(max(filtered_reports), local_estimate) + return local_estimate + + def to_dict(self) -> Dict[str, Any]: + return { + "members": self.members.to_dict(), + "receipts": self.receipts.to_dict(), + "hll": self.hll.to_dict(), + "reports": dict(self.n_reports), + "logical_counter": self.logical_counter, + } + + @classmethod + def from_dict(cls, node_id: str, signer: Signer, data: Dict[str, Any]) -> "MembershipState": + inst = cls(node_id=node_id, signer=signer) + if data: + inst.members = LWWSet.from_dict(data.get("members") or {}) + inst.receipts = LWWSet.from_dict(data.get("receipts") or {}) + inst.hll = HyperLogLog.from_dict(data.get("hll") or {}) + inst.n_reports = {str(k): float(v) for k, v in (data.get("reports") or {}).items()} + inst.logical_counter = int(data.get("logical_counter") or 0) + return inst + + +class MembershipManager: + def __init__(self, node_id: str, signer: Signer, store: DHTStore): + self.node_id = node_id + self.signer = signer + self.store = store + self.state = MembershipState(node_id=node_id, signer=signer) + + def _merge_remote(self, data: Dict[str, Any]) -> None: + remote_state = MembershipState.from_dict(self.node_id, self.signer, data) + self.state.merge(remote_state) + + def ingest_snapshot(self, payload: Dict[str, Any]) -> None: + self._merge_remote(payload) + + def register_local(self, public_key: str, ip: str | None, asn: Optional[int], metadata: Dict[str, Any] | None = None) -> None: + self.state.register_member(self.node_id, public_key=public_key, ip=ip, asn=asn, metadata=metadata) + self._persist() + + def update_member(self, node_id: str, **kwargs) -> None: + meta = kwargs.get("metadata") or {} + self.state.register_member( + node_id, + public_key=kwargs.get("public_key", meta.get("public_key")), + ip=kwargs.get("ip"), + asn=kwargs.get("asn"), + metadata=meta, + ) + self._persist() + + def remove_member(self, node_id: str) -> None: + self.state.forget_member(node_id) + self._persist() + + def record_receipt(self, receipt: ReachabilityReceipt) -> None: + self.state.record_receipt(receipt) + self._persist() + + def _persist(self) -> None: + key = MembershipKey(node_id=self.node_id) + self.store.put( + key=str(key), + fingerprint=key.fingerprint(), + value=self.state.to_dict(), + logical_counter=self.state.logical_counter, + merge_strategy=lambda a, b: MembershipState.from_dict(self.node_id, self.signer, a) + .merge(MembershipState.from_dict(self.node_id, self.signer, b)) + .to_dict(), + ) + + def n_estimate(self) -> float: + return self.state.n_estimate() + + def active_members(self) -> List[Dict[str, Any]]: + return self.state.active_members() + diff --git a/app/core/network/dht/metrics.py b/app/core/network/dht/metrics.py new file mode 100644 index 0000000..4dbeae3 --- /dev/null +++ b/app/core/network/dht/metrics.py @@ -0,0 +1,144 @@ +from __future__ import annotations + +import time +from dataclasses import dataclass +from typing import Dict, Any, Optional + +from app.core._crypto.signer import Signer +from .config import dht_config +from .crdt import PNCounter, GCounter, HyperLogLog +from .crypto import compute_view_id +from .keys import MetricKey +from .store import DHTStore +from .prometheus import update_view_metrics + + +@dataclass +class MetricDelta: + content_id: str + view_id: str + watch_time: int + bytes_out: int + completed: bool + timestamp: float + + def as_dict(self) -> Dict[str, Any]: + return { + "content_id": self.content_id, + "view_id": self.view_id, + "watch_time": self.watch_time, + "bytes_out": self.bytes_out, + "completed": self.completed, + "timestamp": self.timestamp, + } + + +class ContentMetricsState: + def __init__(self, node_id: str): + self.node_id = node_id + self.views = PNCounter() + self.unique = HyperLogLog() + self.watch_time = GCounter() + self.bytes_out = GCounter() + self.completions = GCounter() + self.logical_counter = 0 + + def apply(self, delta: MetricDelta) -> None: + self.logical_counter += 1 + self.views.increment(self.node_id, 1) + self.unique.add(delta.view_id) + if delta.watch_time: + self.watch_time.increment(self.node_id, delta.watch_time) + if delta.bytes_out: + self.bytes_out.increment(self.node_id, delta.bytes_out) + if delta.completed: + self.completions.increment(self.node_id, 1) + + def merge(self, other: "ContentMetricsState") -> "ContentMetricsState": + self.views.merge(other.views) + self.unique.merge(other.unique) + self.watch_time.merge(other.watch_time) + self.bytes_out.merge(other.bytes_out) + self.completions.merge(other.completions) + self.logical_counter = max(self.logical_counter, other.logical_counter) + return self + + def to_dict(self) -> Dict[str, Any]: + return { + "views": self.views.to_dict(), + "unique": self.unique.to_dict(), + "watch_time": self.watch_time.to_dict(), + "bytes_out": self.bytes_out.to_dict(), + "completions": self.completions.to_dict(), + "logical_counter": self.logical_counter, + } + + @classmethod + def from_dict(cls, node_id: str, data: Dict[str, Any]) -> "ContentMetricsState": + inst = cls(node_id=node_id) + if data: + inst.views = PNCounter.from_dict(data.get("views") or {}) + inst.unique = HyperLogLog.from_dict(data.get("unique") or {}) + inst.watch_time = GCounter.from_dict(data.get("watch_time") or {}) + inst.bytes_out = GCounter.from_dict(data.get("bytes_out") or {}) + inst.completions = GCounter.from_dict(data.get("completions") or {}) + inst.logical_counter = int(data.get("logical_counter") or 0) + return inst + + +class MetricsAggregator: + def __init__(self, node_id: str, signer: Signer, store: DHTStore): + self.node_id = node_id + self.signer = signer + self.store = store + + def _load(self, content_id: str, window_id: str) -> ContentMetricsState: + key = MetricKey(content_id=content_id, window_id=window_id) + record = self.store.get(key.fingerprint()) + if record: + return ContentMetricsState.from_dict(self.node_id, record.value) + return ContentMetricsState(node_id=self.node_id) + + def _persist(self, content_id: str, window_id: str, state: ContentMetricsState) -> None: + key = MetricKey(content_id=content_id, window_id=window_id) + self.store.put( + key=str(key), + fingerprint=key.fingerprint(), + value=state.to_dict(), + logical_counter=state.logical_counter, + merge_strategy=lambda a, b: ContentMetricsState.from_dict(self.node_id, a) + .merge(ContentMetricsState.from_dict(self.node_id, b)) + .to_dict(), + ) + update_view_metrics( + content_id=content_id, + window_id=window_id, + views=state.views.value(), + unique=state.unique.estimate(), + watch_time=state.watch_time.value(), + ) + + def record_view( + self, + content_id: str, + viewer_salt: bytes, + watch_time: int, + bytes_out: int, + completed: bool, + timestamp: Optional[float] = None, + ) -> MetricDelta: + ts = time.time() if timestamp is None else timestamp + window_id = MetricKey.window_for(ts) + view_id = compute_view_id(content_id, viewer_salt) + state = self._load(content_id, window_id) + delta = MetricDelta( + content_id=content_id, + view_id=view_id, + watch_time=watch_time, + bytes_out=bytes_out, + completed=completed, + timestamp=ts, + ) + state.apply(delta) + self._persist(content_id, window_id, state) + return delta diff --git a/app/core/network/dht/prometheus.py b/app/core/network/dht/prometheus.py new file mode 100644 index 0000000..d12f34f --- /dev/null +++ b/app/core/network/dht/prometheus.py @@ -0,0 +1,53 @@ +from __future__ import annotations + +try: + from prometheus_client import Counter, Gauge # type: ignore +except Exception: + class _Metric: + def __init__(self, *_, **__): + self._values = {} + def labels(self, **kwargs): + key = tuple(sorted(kwargs.items())) + class H: + def __init__(self, parent, k): + self._p = parent; self._k = k + def inc(self, v: float = 1.0): + self._p._values[self._k] = self._p._values.get(self._k, 0.0) + v + def set(self, v: float): + self._p._values[self._k] = v + return H(self, key) + class Counter(_Metric): + pass + class Gauge(_Metric): + pass + + +replication_under = Counter("dht_replication_under", "Times replication fell below target", ["content_id"]) +replication_over = Counter("dht_replication_over", "Times replication exceeded target", ["content_id"]) +leader_changes = Counter("dht_leader_changes_total", "Count of leader changes per content", ["content_id"]) +merge_conflicts = Counter("dht_merge_conflicts_total", "Number of DHT merge conflicts", ["key"]) +view_count_total = Gauge("dht_view_count_total", "Total content views per window", ["content_id", "window"]) +unique_estimate = Gauge("dht_unique_view_estimate", "Estimated unique viewers per window", ["content_id", "window"]) +watch_time_seconds = Gauge("dht_watch_time_seconds", "Aggregate watch time per window", ["content_id", "window"]) + + +def record_replication_under(content_id: str, have: int) -> None: + replication_under.labels(content_id=content_id).inc() + + +def record_replication_over(content_id: str, have: int) -> None: + replication_over.labels(content_id=content_id).inc() + + +def record_leader_change(content_id: str) -> None: + leader_changes.labels(content_id=content_id).inc() + + +def record_merge_conflict(key: str) -> None: + merge_conflicts.labels(key=key).inc() + + +def update_view_metrics(content_id: str, window_id: str, views: int, unique: float, watch_time: int) -> None: + view_count_total.labels(content_id=content_id, window=window_id).set(views) + unique_estimate.labels(content_id=content_id, window=window_id).set(unique) + watch_time_seconds.labels(content_id=content_id, window=window_id).set(watch_time) diff --git a/app/core/network/dht/records.py b/app/core/network/dht/records.py new file mode 100644 index 0000000..d5d5b81 --- /dev/null +++ b/app/core/network/dht/records.py @@ -0,0 +1,128 @@ +from __future__ import annotations + +import json +import time +from dataclasses import dataclass, field +from typing import Callable, Dict, Any, Tuple + +from app.core._utils.b58 import b58decode, b58encode + +try: + import nacl.signing + import nacl.encoding + _HAS_NACL = True +except Exception: # pragma: no cover - fallback path + _HAS_NACL = False + +from app.core._utils.hash import blake3_hex + +from app.core._crypto.signer import Signer +from .config import dht_config + + +def _serialize_for_signature(payload: Dict[str, Any]) -> bytes: + return json.dumps(payload, sort_keys=True, separators=(",", ":")).encode() + + +def _dominance_tuple(logical_counter: int, timestamp: float, node_id: str) -> Tuple[int, float, str]: + return logical_counter, timestamp, node_id + + +def latest_wins_merge(a: Dict[str, Any], b: Dict[str, Any]) -> Dict[str, Any]: + return b + + +@dataclass +class DHTRecord: + fingerprint: str + key: str + value: Dict[str, Any] + logical_counter: int + timestamp: float + node_id: str + schema_version: str = field(default=dht_config.schema_version) + signature: str | None = None + + def dominance(self) -> Tuple[int, float, str]: + return _dominance_tuple(self.logical_counter, self.timestamp, self.node_id) + + def dominates(self, other: "DHTRecord") -> bool: + return self.dominance() > other.dominance() + + def to_payload(self) -> Dict[str, Any]: + return { + "fingerprint": self.fingerprint, + "key": self.key, + "schema_version": self.schema_version, + "logical_counter": self.logical_counter, + "timestamp": self.timestamp, + "node_id": self.node_id, + "value": self.value, + } + + def sign(self, signer: Signer) -> "DHTRecord": + blob = _serialize_for_signature(self.to_payload()) + self.signature = signer.sign(blob) + return self + + def verify(self, public_key_b58: str) -> bool: + if not self.signature: + return False + payload = _serialize_for_signature(self.to_payload()) + if _HAS_NACL: + try: + vk = nacl.signing.VerifyKey(b58decode(public_key_b58)) + vk.verify(payload, b58decode(self.signature)) + return True + except Exception: + return False + expected = b58encode(bytes.fromhex(blake3_hex(b58decode(public_key_b58) + payload))).decode() + return expected == self.signature + + def merge(self, other: "DHTRecord", merge_strategy: Callable[[Dict[str, Any], Dict[str, Any]], Dict[str, Any]]) -> "DHTRecord": + if self.fingerprint != other.fingerprint: + raise ValueError("Cannot merge records with different keys") + + if self.dominates(other): + dominant, subordinate = self, other + elif other.dominates(self): + dominant, subordinate = other, self + else: + # Perfect tie: break via lexicographic NodeID order and prefer merged value to remain deterministic + if self.node_id >= other.node_id: + dominant, subordinate = self, other + else: + dominant, subordinate = other, self + + merged_value = merge_strategy(dominant.value, subordinate.value) + merged = DHTRecord( + fingerprint=self.fingerprint, + key=self.key, + value=merged_value, + logical_counter=dominant.logical_counter, + timestamp=max(self.timestamp, other.timestamp), + node_id=dominant.node_id, + schema_version=self.schema_version, + ) + return merged + + @classmethod + def create( + cls, + key: str, + fingerprint: str, + value: Dict[str, Any], + node_id: str, + logical_counter: int, + signature: str | None = None, + timestamp: float | None = None, + ) -> "DHTRecord": + return cls( + fingerprint=fingerprint, + key=key, + value=value, + logical_counter=logical_counter, + timestamp=timestamp or time.time(), + node_id=node_id, + signature=signature, + ) diff --git a/app/core/network/dht/replication.py b/app/core/network/dht/replication.py new file mode 100644 index 0000000..cd24457 --- /dev/null +++ b/app/core/network/dht/replication.py @@ -0,0 +1,311 @@ +from __future__ import annotations + +import math +import time +from dataclasses import dataclass, field +from typing import Dict, Any, List, Optional, Tuple + +from app.core._crypto.signer import Signer +from .config import dht_config +from .crypto import bits_from_hex, rendezvous_score +from .keys import MetaKey +from .membership import MembershipState +from .prometheus import record_replication_under, record_replication_over, record_leader_change +from .store import DHTStore + + +def _now() -> float: + return time.time() + + +@dataclass +class ReplicaLease: + node_id: str + lease_id: str + issued_at: float + expires_at: float + asn: Optional[int] + ip_first_octet: Optional[int] + heartbeat_at: float + score: int + + def renew(self, now: float) -> None: + self.heartbeat_at = now + self.expires_at = now + dht_config.lease_ttl + + def is_expired(self, now: float) -> bool: + if now >= self.expires_at: + return True + if now - self.heartbeat_at > dht_config.heartbeat_interval * dht_config.heartbeat_miss_threshold: + return True + return False + + def to_dict(self) -> Dict[str, Any]: + return { + "node_id": self.node_id, + "lease_id": self.lease_id, + "issued_at": self.issued_at, + "expires_at": self.expires_at, + "asn": self.asn, + "ip_first_octet": self.ip_first_octet, + "heartbeat_at": self.heartbeat_at, + "score": self.score, + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "ReplicaLease": + return cls( + node_id=str(data["node_id"]), + lease_id=str(data["lease_id"]), + issued_at=float(data["issued_at"]), + expires_at=float(data["expires_at"]), + asn=data.get("asn"), + ip_first_octet=data.get("ip_first_octet"), + heartbeat_at=float(data.get("heartbeat_at", data.get("issued_at"))), + score=int(data.get("score", 0)), + ) + + +@dataclass +class ReplicationState: + content_id: str + leases: Dict[str, ReplicaLease] = field(default_factory=dict) + leader: Optional[str] = None + revision: int = 0 + conflict_log: List[Dict[str, Any]] = field(default_factory=list) + + def prune(self, now: float) -> None: + for lease_id, lease in list(self.leases.items()): + if lease.is_expired(now): + self.conflict_log.append( + {"type": "LEASE_EXPIRED", "node_id": lease.node_id, "ts": now} + ) + del self.leases[lease_id] + + def assign(self, lease: ReplicaLease) -> None: + self.leases[lease.lease_id] = lease + self.revision += 1 + + def remove_node(self, node_id: str, reason: str, timestamp: float) -> None: + for lease_id, lease in list(self.leases.items()): + if lease.node_id == node_id: + del self.leases[lease_id] + self.conflict_log.append({"type": reason, "node_id": node_id, "ts": timestamp}) + self.revision += 1 + + def heartbeat(self, node_id: str, now: float) -> bool: + found = False + for lease in self.leases.values(): + if lease.node_id == node_id: + lease.renew(now) + found = True + return found + + def unique_asn(self) -> int: + return len({lease.asn for lease in self.leases.values() if lease.asn is not None}) + + def unique_octets(self) -> int: + return len({lease.ip_first_octet for lease in self.leases.values() if lease.ip_first_octet is not None}) + + def diversity_satisfied(self) -> bool: + if len(self.leases) < dht_config.replication_target: + return False + if self.unique_asn() < dht_config.min_asn_diversity: + return False + if self.unique_octets() < dht_config.min_ip_octet_diversity: + return False + return True + + def to_dict(self) -> Dict[str, Any]: + return { + "content_id": self.content_id, + "leader": self.leader, + "revision": self.revision, + "replica_leases": {lease_id: lease.to_dict() for lease_id, lease in self.leases.items()}, + "conflict_log": list(self.conflict_log)[-100:], # keep tail + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "ReplicationState": + state = cls(content_id=str(data.get("content_id", ""))) + state.leader = data.get("leader") + state.revision = int(data.get("revision", 0)) + leases_raw = data.get("replica_leases") or {} + for lease_id, payload in leases_raw.items(): + state.leases[lease_id] = ReplicaLease.from_dict(payload) + state.conflict_log = list(data.get("conflict_log") or []) + return state + + def merge_with(self, other: "ReplicationState") -> "ReplicationState": + combined = _merge_states(self, other) + return combined + + +class ReplicationManager: + def __init__(self, node_id: str, signer: Signer, store: DHTStore): + self.node_id = node_id + self.signer = signer + self.store = store + + def _load_state(self, content_id: str) -> ReplicationState: + key = MetaKey(content_id=content_id) + record = self.store.get(key.fingerprint()) + if record: + return ReplicationState.from_dict(record.value) + return ReplicationState(content_id=content_id) + + def _persist_state(self, state: ReplicationState) -> None: + key = MetaKey(content_id=state.content_id) + self.store.put( + key=str(key), + fingerprint=key.fingerprint(), + value=state.to_dict(), + logical_counter=int(time.time()), + merge_strategy=lambda a, b: ReplicationState.from_dict(a) + .merge_with(ReplicationState.from_dict(b)) + .to_dict(), + ) + + def ensure_replication(self, content_id: str, membership: MembershipState, now: Optional[float] = None) -> ReplicationState: + now = now or _now() + state = self._load_state(content_id) + + n_estimate = max(1.0, membership.n_estimate()) + p_value = max(0, round(math.log2(max(n_estimate / dht_config.replication_target, 1.0)))) + prefix, _ = bits_from_hex(content_id, p_value) + + active = membership.active_members(include_islands=True) + responsible = [] + for member in active: + node_prefix, _total = bits_from_hex(member["node_id"], p_value) + if node_prefix == prefix: + responsible.append(member) + if not responsible: + responsible = active # fall back to all active nodes + responsible.sort(key=lambda item: item["node_id"]) + leader_id = responsible[0]["node_id"] if responsible else None + previous_leader = state.leader + state.leader = leader_id + if previous_leader and leader_id and previous_leader != leader_id: + record_leader_change(content_id) + + if leader_id != self.node_id: + return state # Only leader mutates state + + state.prune(now) + + # evaluate diversity + leases_by_node = {lease.node_id: lease for lease in state.leases.values()} + if not state.diversity_satisfied(): + def rank(members): + return sorted( + ( + ( + rendezvous_score(content_id, m["node_id"]), + m["node_id"], + m.get("asn"), + m.get("ip_first_octet"), + ) + for m in members + ), + key=lambda item: item[0], + ) + + def assign_with_diversity(candidates): + added = 0 + # Phase 1: prefer candidates that increase ASN/IP octet diversity + for score, node_id, asn, ip_octet in candidates: + if node_id in leases_by_node: + continue + before_asn = state.unique_asn() + before_oct = state.unique_octets() + if ((asn is not None and before_asn < dht_config.min_asn_diversity) or + (ip_octet is not None and before_oct < dht_config.min_ip_octet_diversity)): + lease = ReplicaLease( + node_id=node_id, + lease_id=f"{content_id}:{node_id}", + issued_at=now, + expires_at=now + dht_config.lease_ttl, + asn=asn, + ip_first_octet=ip_octet, + heartbeat_at=now, + score=score, + ) + state.assign(lease) + leases_by_node[node_id] = lease + added += 1 + if state.diversity_satisfied(): + return added + # Phase 2: fill by score until target + for score, node_id, asn, ip_octet in candidates: + if node_id in leases_by_node: + continue + lease = ReplicaLease( + node_id=node_id, + lease_id=f"{content_id}:{node_id}", + issued_at=now, + expires_at=now + dht_config.lease_ttl, + asn=asn, + ip_first_octet=ip_octet, + heartbeat_at=now, + score=score, + ) + state.assign(lease) + leases_by_node[node_id] = lease + added += 1 + if state.diversity_satisfied(): + return added + return added + + # First, prefer responsible set + assign_with_diversity(rank(responsible)) + + # If under target, add more from the rest of active nodes + if not state.diversity_satisfied(): + rest = [m for m in active if m["node_id"] not in {n for _, n, *_ in rank(responsible)}] + assign_with_diversity(rank(rest)) + + # Ensure we do not exceed replication target with duplicates + if len(state.leases) > dht_config.replication_target: + # Drop lowest scoring leases until target satisfied while preserving diversity criteria + sorted_leases = sorted(state.leases.values(), key=lambda lease: lease.score, reverse=True) + while len(sorted_leases) > dht_config.replication_target: + victim = sorted_leases.pop() # lowest score + state.remove_node(victim.node_id, reason="OVER_REPLICATED", timestamp=now) + record_replication_over(content_id, len(sorted_leases)) + + if len(state.leases) < dht_config.replication_target: + state.conflict_log.append( + {"type": "UNDER_REPLICATED", "ts": now, "have": len(state.leases)} + ) + record_replication_under(content_id, len(state.leases)) + + self._persist_state(state) + return state + + def heartbeat(self, content_id: str, node_id: str, now: Optional[float] = None) -> bool: + now = now or _now() + state = self._load_state(content_id) + if state.heartbeat(node_id, now): + self._persist_state(state) + return True + return False + + +def _merge_states(left: ReplicationState, right: ReplicationState) -> ReplicationState: + # Combine leases preferring latest expiry + lease_map: Dict[str, ReplicaLease] = {} + for state in (left, right): + for lease_id, lease in state.leases.items(): + current = lease_map.get(lease_id) + if current is None or lease.expires_at > current.expires_at: + lease_map[lease_id] = lease + merged = ReplicationState(content_id=left.content_id or right.content_id) + merged.leader = min(filter(None, [left.leader, right.leader]), default=None) + merged.conflict_log = (left.conflict_log + right.conflict_log)[-100:] + merged.leases = lease_map + merged.revision = max(left.revision, right.revision) + 1 + return merged + + +# Inject helper onto ReplicationState for merge strategy diff --git a/app/core/network/dht/store.py b/app/core/network/dht/store.py new file mode 100644 index 0000000..c9c3f54 --- /dev/null +++ b/app/core/network/dht/store.py @@ -0,0 +1,57 @@ +from __future__ import annotations + +from typing import Callable, Dict, Any, Optional + +from app.core._crypto.signer import Signer +from .prometheus import record_merge_conflict +from .records import DHTRecord, latest_wins_merge + + +class DHTStore: + """In-memory DHT replica with deterministic merge semantics.""" + + def __init__(self, node_id: str, signer: Signer): + self.node_id = node_id + self.signer = signer + self._records: Dict[str, DHTRecord] = {} + + def get(self, fingerprint: str) -> Optional[DHTRecord]: + return self._records.get(fingerprint) + + def put( + self, + key: str, + fingerprint: str, + value: Dict[str, Any], + logical_counter: int, + merge_strategy: Callable[[Dict[str, Any], Dict[str, Any]], Dict[str, Any]] = latest_wins_merge, + ) -> DHTRecord: + record = DHTRecord.create( + key=key, + fingerprint=fingerprint, + value=value, + node_id=self.node_id, + logical_counter=logical_counter, + ).sign(self.signer) + return self.merge_record(record, merge_strategy) + + def merge_record( + self, + incoming: DHTRecord, + merge_strategy: Callable[[Dict[str, Any], Dict[str, Any]], Dict[str, Any]] = latest_wins_merge, + ) -> DHTRecord: + current = self._records.get(incoming.fingerprint) + if current is None: + self._records[incoming.fingerprint] = incoming + return incoming + if current.value != incoming.value: + record_merge_conflict(incoming.key) + merged = current.merge(incoming, merge_strategy) + # Debug instrumentation for tests + # print('merge', incoming.key, merged.value) + merged.sign(self.signer) + self._records[incoming.fingerprint] = merged + return merged + + def snapshot(self) -> Dict[str, Dict[str, Any]]: + return {fp: record.to_payload() | {"signature": record.signature} for fp, record in self._records.items()} diff --git a/app/core/network/handshake.py b/app/core/network/handshake.py index b33187e..20991bf 100644 --- a/app/core/network/handshake.py +++ b/app/core/network/handshake.py @@ -8,7 +8,7 @@ import shutil import secrets from typing import Dict, Any -from base58 import b58encode +from app.core._utils.b58 import b58encode from sqlalchemy import select from app.core._secrets import hot_pubkey, hot_seed @@ -17,6 +17,7 @@ from app.core.logger import make_log from app.core.models.my_network import KnownNode from app.core.models.node_storage import StoredContent from app.core.storage import db_session +from app.core.network.dht import compute_node_id, dht_config from .constants import CURRENT_PROTOCOL_VERSION from .nodes import list_known_public_nodes from .config import ( @@ -93,7 +94,9 @@ async def build_handshake_payload(session) -> Dict[str, Any]: ipfs_payload = await _local_ipfs_payload() payload = { "version": CURRENT_PROTOCOL_VERSION, + "schema_version": dht_config.schema_version, "public_key": b58encode(hot_pubkey).decode(), + "node_id": compute_node_id(hot_pubkey), # public_host is optional for private nodes **({"public_host": PUBLIC_HOST} if PUBLIC_HOST else {}), "node_type": NODE_PRIVACY if NODE_PRIVACY != NODE_TYPE_PUBLIC else NODE_TYPE_PUBLIC, @@ -122,6 +125,8 @@ async def compute_node_info(session) -> Dict[str, Any]: node_info = { "id": b58encode(hot_pubkey).decode(), "public_key": b58encode(hot_pubkey).decode(), + "schema_version": dht_config.schema_version, + "node_id": compute_node_id(hot_pubkey), **({"public_host": PUBLIC_HOST} if PUBLIC_HOST else {}), "version": CURRENT_PROTOCOL_VERSION, "node_type": NODE_PRIVACY, diff --git a/app/core/network/maintenance.py b/app/core/network/maintenance.py new file mode 100644 index 0000000..2ef49cf --- /dev/null +++ b/app/core/network/maintenance.py @@ -0,0 +1,52 @@ +from __future__ import annotations + +import asyncio + +from sqlalchemy import select + +from app.core.logger import make_log +from app.core.models.node_storage import StoredContent +from app.core.network.dht import dht_config +from app.core.storage import db_session + + +async def replication_daemon(app): + await asyncio.sleep(5) + memory = getattr(app.ctx, "memory", None) + if not memory: + make_log("Replication", "No memory context; replication daemon exiting", level="warning") + return + make_log("Replication", "daemon started") + while True: + try: + membership_state = memory.membership.state + async with db_session(auto_commit=False) as session: + rows = await session.execute(select(StoredContent.hash)) + content_hashes = [row[0] for row in rows.all()] + for content_hash in content_hashes: + try: + state = memory.replication.ensure_replication(content_hash, membership_state) + memory.replication.heartbeat(content_hash, memory.node_id) + make_log("Replication", f"Replicated {content_hash} leader={state.leader}", level="debug") + except Exception as exc: + make_log("Replication", f"ensure failed for {content_hash}: {exc}", level="warning") + except Exception as exc: + make_log("Replication", f"daemon iteration failed: {exc}", level="error") + await asyncio.sleep(dht_config.heartbeat_interval) + + +async def heartbeat_daemon(app): + await asyncio.sleep(dht_config.heartbeat_interval // 2) + memory = getattr(app.ctx, "memory", None) + if not memory: + return + while True: + try: + async with db_session(auto_commit=False) as session: + rows = await session.execute(select(StoredContent.hash)) + content_hashes = [row[0] for row in rows.all()] + for content_hash in content_hashes: + memory.replication.heartbeat(content_hash, memory.node_id) + except Exception as exc: + make_log("Replication", f"heartbeat failed: {exc}", level="warning") + await asyncio.sleep(dht_config.heartbeat_interval) diff --git a/app/core/network/nodes.py b/app/core/network/nodes.py index 0a71dc0..3142206 100644 --- a/app/core/network/nodes.py +++ b/app/core/network/nodes.py @@ -14,6 +14,7 @@ from app.core.models.my_network import KnownNode from app.core.storage import db_session from app.core._secrets import hot_pubkey from app.core.ipfs_client import swarm_connect +from app.core.network.dht import ReachabilityReceipt from .config import ( HANDSHAKE_INTERVAL_SEC, UNSUPPORTED_RECHECK_INTERVAL_SEC, @@ -203,7 +204,7 @@ async def pick_next_node(session) -> Optional[KnownNode]: return None -async def perform_handshake_round(): +async def perform_handshake_round(memory=None): async with db_session(auto_commit=True) as session: # Private nodes still do outbound handshakes; inbound typically unreachable without public endpoint node = await pick_next_node(session) @@ -238,6 +239,40 @@ async def perform_handshake_round(): node.meta = {**(node.meta or {}), "last_response": resp, "fail_count": 0, "ipfs": node_ipfs_meta} await session.commit() make_log("Handshake", f"Handshake OK with {base_url}") + if memory and resp: + try: + membership_mgr = getattr(memory, "membership", None) + if membership_mgr: + remote_node = (resp or {}).get("node") or {} + remote_node_id = remote_node.get("node_id") + if remote_node_id: + remote_asn = remote_node.get("asn") + membership_mgr.update_member( + node_id=remote_node_id, + public_key=remote_node.get("public_key") or "", + ip=node.ip, + asn=int(remote_asn) if remote_asn is not None else None, + metadata={ + "public_host": remote_node.get("public_host"), + "capabilities": remote_node.get("capabilities") or {}, + }, + ) + for receipt in (resp or {}).get("reachability_receipts") or []: + target = receipt.get("target_id") + issuer = receipt.get("issuer_id") + if not target or not issuer: + continue + membership_mgr.record_receipt( + ReachabilityReceipt( + target_id=str(target), + issuer_id=str(issuer), + asn=int(receipt["asn"]) if receipt.get("asn") is not None else None, + timestamp=float(receipt.get("timestamp", 0)), + signature=str(receipt.get("signature", "")), + ) + ) + except Exception as exc: + make_log("Handshake", f"Membership merge failed: {exc}", level="warning") except Exception as e: make_log("Handshake", f"Handshake failed with {base_url}: {e}", level='warning') # Record incident-lite in meta @@ -259,7 +294,7 @@ async def network_handshake_daemon(app): make_log("Handshake", f"Static IPFS peering failed: {exc}", level='warning') while True: try: - await perform_handshake_round() + await perform_handshake_round(getattr(app, "ctx", None) and getattr(app.ctx, "memory", None)) except Exception as e: make_log("Handshake", f"Round error: {e}", level='error') await asyncio.sleep(HANDSHAKE_INTERVAL_SEC) diff --git a/docs/indexation.md b/docs/indexation.md deleted file mode 100644 index e527529..0000000 --- a/docs/indexation.md +++ /dev/null @@ -1,110 +0,0 @@ -## Indexation - -### Stored content types - -- `local/content_bin` – binary content stored only locally (or indexer no found it on chain) -- `onchain/content` - content stored onchain -- `onchain/content_unknown` - content stored onchain, but we don't have a private key to decrypt it - -Content item may have multiple types, for example, `local/content_bin` and `onchain/content`. - -But `content cover`, `content metadata` and `decrypted content` always stored locally. - -### Content Ownership Proof NFT Values Cell Deserialization - -```text -values:^[ - content_hash:uint256 - metadata:^[ - offchain?:int1 = always 1 - https://my-public-node-1.projscale.dev/*:bytes - ] - content:^[ - content_cid:^Cell = b58encoded CID - cover_cid:^Cell = b58encoded CID - metadata_cid:^Cell = b58encoded CID - ] -] -``` - -### Available content statuses - -- `UPLOAD_TO_BTFS` – content is stored locally, upload all content parts to BTFS. This status means that payment is received yet. - - -### Upload content flow - -1. User uploads content to server (/api/v1/storage) -2. User uploads content cover to server (/api/v1/storage) -3. User send /api/v1/blockchain.sendNewContentMessage to server and accept the transaction in wallet -4. Indexer receives the transaction and indexes the content. And send telegram notification to user. -# Network Index & Sync (v3) - -This document describes the simplified, production‑ready stack for content discovery and sync: - -- Upload via tus → stream encrypt (ENCF v1, AES‑256‑GCM, 1 MiB chunks) → `ipfs add --cid-version=1 --raw-leaves --chunker=size-1048576 --pin`. -- Public index exposes only encrypted sources (CID) and safe metadata; no plaintext ids. -- Nodes full‑sync by pinning encrypted CIDs; keys are auto‑granted to trusted peers for preview/full access. - -## ENCF v1 (Encrypted Content Format) - -Unencrypted header and framed body; same bytes on all nodes ⇒ stable CID. - -Header (all big endian): - -``` -MAGIC(4): 'ENCF' -VER(1): 0x01 -SCHEME(1): 0x03 = AES_GCM (0x01 AES_GCM_SIV legacy, 0x02 AES_SIV legacy) -CHUNK(4): plaintext chunk bytes (1048576) -SALT_LEN(1) -SALT(N) -RESERVED(5): zeros -``` - -Body: repeated frames `[p_len:4][cipher][tag(16)]` where `p_len <= CHUNK` for last frame. - -AES‑GCM (scheme `0x03`) encrypts each frame with deterministic `nonce = HMAC_SHA256(salt, u64(frame_idx))[:12]`. Legacy scheme `0x01` keeps AES‑GCM‑SIV with the same nonce derivation. - -For new uploads (v2025-09), the pipeline defaults to AES‑256‑GCM. Legacy AES‑GCM‑SIV/AES‑SIV content is still readable — the decoder auto-detects the scheme byte. - -### Local encryption/decryption helpers - -``` -python -m app.core.crypto.cli encrypt --input demo.wav --output demo.encf \ - --key AAAAEyHSVws5O8JGrg3kUSVtk5dQSc5x5e7jh0S2WGE= --salt-bytes 16 - -python -m app.core.crypto.cli decrypt --input demo.encf --output demo.wav \ - --wrapped-key -``` - -Because we use standard AES‑GCM, you can also re-hydrate frames manually with tools like `openssl aes-256-gcm`. The header exposes `chunk_bytes` and salt; derive the per-frame nonce via `HMAC_SHA256(salt, idx)` where `idx` is the frame number (0-based) and feed the 12-byte prefix as IV. - -## API - -- `GET /api/v1/content.index` → `{ items:[...], schema, ETag }` with signed items. -- `GET /api/v1/content.delta?since=ISO8601` → `{ items:[...], next_since, schema }` with ETag. -- `POST /api/v1/sync.pin` (NodeSig required) → queue/pin CID. -- `POST /api/v1/keys.request` (NodeSig required) → sealed DEK for trusted peers. -- `GET /api/v1/content.derivatives?cid=` → local ready derivatives (low/high/preview). - -## NodeSig - -Canonical string: - -``` -METHOD\nPATH\nSHA256(body)\nTS\nNONCE\nNODE_ID -``` - -Headers: `X-Node-Id`, `X-Node-Ts`, `X-Node-Nonce`, `X-Node-Sig`. -Window ±120s, nonce cache ~10min; replay → 401. - -## Sync daemon - -- Jitter 0–30s per peer; uses ETag/`since`. -- Disk watermark (`SYNC_DISK_LOW_WATERMARK_PCT`) stops pin burst. -- Pinned concurrently (`SYNC_MAX_CONCURRENT_PINS`) with pre‑`findprovs` `swarm/connect`. - -## Keys policy - -`KEY_AUTO_GRANT_TRUSTED_ONLY=1` — only KnownNode.meta.role=='trusted' gets DEK automatically. Preview lease TTL via `KEY_GRANT_PREVIEW_TTL_SEC`. diff --git a/docs/web2-client.md b/docs/web2-client.md deleted file mode 100644 index a29d579..0000000 --- a/docs/web2-client.md +++ /dev/null @@ -1,118 +0,0 @@ -## Web2 Client (through HTTP API) - -### API Public Endpoints - -```text -https://music-gateway.letsw.app - – /api/v1 -``` - -### Telegram WebApp Authorization - -[Implementation](../app/api/routes/auth.py) - -#### Request (POST, /api/v1/auth.twa, JSON) - -```javascript -{ - twa_data: window.Telegram.WebApp.initData -} -``` - -#### Response (JSON) - -```javascript -{ - user: { ...User }, - connected_wallet: null | { - version: string, - address: string, - ton_balance: string // nanoTON bignum - }, - auth_v1_token: string -} -``` - -**Use** `auth_v1_token` as `Authorization` header for all authorized requests. - -### Upload file - -[Implementation](../app/api/routes/node_storage.py) - -#### Request (POST, /api/v1/storage, FormData) - -```javascript -{ - file: File -} -``` - -#### Response (JSON) - -```javascript -{ - content_sha256: string, - content_id_v1: string, - content_url: string -} -``` - -### Download file - -[Implementation](../app/api/routes/node_storage.py) - -#### Request (GET, /api/v1/storage/:content_id) - -#### Response (File) - -### Create new content - -[Implementation](../app/api/routes/blockchain.py) - -#### Request (POST, /api/v1/blockchain.sendNewContentMessage, JSON) - -```javascript -{ - title: string, - authors: list, - content: string, // recommended dmy:// - image: string, // recommended dmy:// - description: string, - price: string, // nanoTON bignum - resaleLicensePrice: string // nanoTON bignum (default = 0) - allowResale: boolean, - royaltyParams: [{ - address: string, - value: number // 10000 = 100% - }] -} -``` - -#### Response (JSON) - -```javascript -{ - message: "Transaction requested" -} -``` - -### Purchase content - -[Implementation](../app/api/routes/blockchain.py) - -#### Request (POST, /api/v1/blockchain.sendPurchaseContentMessage, JSON) - -```javascript -{ - content_address: string, - price: string // nanoTON bignum -} -``` - -#### Response (JSON) - -```javascript -{ - message: "Transaction requested" -} -``` \ No newline at end of file diff --git a/docs/web2-client_task280224.md b/docs/web2-client_task280224.md deleted file mode 100644 index 4a8f8ca..0000000 --- a/docs/web2-client_task280224.md +++ /dev/null @@ -1,9 +0,0 @@ -## Web2 Client Task #280224 - -1. В процессе изменения дизайна сделать все элементы по нормальному в отличие от того как сейчас: чтобы страница состояла из компонентов, а не монолитно написана. -2. Сделать чтобы при нажатии на кнопку "Загрузить контент" открывалось окно с "Перейдите в кошелек, вы запросили транзакцию" и если сервер в дополнении к обычному message вернул еще и walletLink, то отобразить кнопку для перехода в кошелек -3. Чтобы запросить транзакцию, нужно отправить запрос `docs/web2-client/UploadFile` с файлом и получить в ответ content_url, который после загрузки изображения и самого контента нужно приложить в запрос `docs/web2-client/CreateNewContent` в поле image и content соответственно -4. Желательно: сделать отображение загруженной обложки в виде карточки с кнопкой "Удалить" и "Изменить" (при нажатии на изменить открывается окно загрузки контента) -5. Обработать чтобы контент проходил полную цепочку загрузки (загрузка изображения, загрузка контента, запрос транзакции через бэкенд) и после всего вебапп закрывался через window.Telegram.WebApp.close() -6. Сделать дизайн как хочет Миша -7. Обработать ситуацию когда кошелек не подключен, то есть в ответе на запрос `docs/web2-client/auth.twa` приходит connected_wallet: null \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 1ddd468..ac5ccff 100644 --- a/requirements.txt +++ b/requirements.txt @@ -19,3 +19,7 @@ ffmpeg-python==0.2.0 python-magic==0.4.27 cryptography==42.0.5 alembic==1.13.1 +blake3==0.4.1 +prometheus-client==0.20.0 +pytest==8.2.1 +pytest-asyncio==0.23.7