diff --git a/.gitignore b/.gitignore index 3b1fb24..231f844 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,4 @@ playground .DS_Store messages.pot activeConfig +__pycache__ diff --git a/alembic/versions/d3e5f7a9c0d1_create_dht_and_rdap_tables.py b/alembic/versions/d3e5f7a9c0d1_create_dht_and_rdap_tables.py new file mode 100644 index 0000000..f528e6e --- /dev/null +++ b/alembic/versions/d3e5f7a9c0d1_create_dht_and_rdap_tables.py @@ -0,0 +1,70 @@ +"""create dht_records and rdap_cache tables + +Revision ID: d3e5f7a9c0d1 +Revises: c2d4e6f8a1b2 +Create Date: 2025-10-22 00:00:00.000000 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = 'd3e5f7a9c0d1' +down_revision: Union[str, None] = 'c2d4e6f8a1b2' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + bind = op.get_bind() + inspector = sa.inspect(bind) + + # dht_records + if not inspector.has_table('dht_records'): + op.create_table( + 'dht_records', + sa.Column('fingerprint', sa.String(length=128), primary_key=True), + sa.Column('key', sa.String(length=512), nullable=False), + sa.Column('schema_version', sa.String(length=16), nullable=False, server_default='v1'), + sa.Column('logical_counter', sa.Integer(), nullable=False, server_default='0'), + sa.Column('timestamp', sa.Float(), nullable=False, server_default='0'), + sa.Column('node_id', sa.String(length=128), nullable=False), + sa.Column('signature', sa.String(length=512), nullable=True), + sa.Column('value', sa.JSON(), nullable=False, server_default=sa.text("'{}'::jsonb")), + sa.Column('updated_at', sa.DateTime(), nullable=False, server_default=sa.text('CURRENT_TIMESTAMP')), + ) + # ensure index exists (but don't fail if it already exists) + try: + existing_indexes = {idx['name'] for idx in inspector.get_indexes('dht_records')} + except Exception: + existing_indexes = set() + if 'ix_dht_records_key' not in existing_indexes: + op.create_index('ix_dht_records_key', 'dht_records', ['key']) + + # rdap_cache + if not inspector.has_table('rdap_cache'): + op.create_table( + 'rdap_cache', + sa.Column('ip', sa.String(length=64), primary_key=True), + sa.Column('asn', sa.Integer(), nullable=True), + sa.Column('source', sa.String(length=64), nullable=True), + sa.Column('updated_at', sa.DateTime(), nullable=False, server_default=sa.text('CURRENT_TIMESTAMP')), + ) + + +def downgrade() -> None: + try: + op.drop_table('rdap_cache') + except Exception: + pass + try: + op.drop_index('ix_dht_records_key', table_name='dht_records') + except Exception: + pass + try: + op.drop_table('dht_records') + except Exception: + pass diff --git a/app/__main__.py b/app/__main__.py index 4c4ea1c..af746e4 100644 --- a/app/__main__.py +++ b/app/__main__.py @@ -100,16 +100,12 @@ if __name__ == '__main__': except Exception as e: make_log('Startup', f'DB sync init failed: {e}', level='error') from app.api import app - from app.bot import dp as uploader_bot_dp - from app.client_bot import dp as client_bot_dp + # Delay aiogram dispatcher creation until loop is running from app.core._config import SANIC_PORT, PROJECT_HOST, DATABASE_URL from app.core.network.nodes import network_handshake_daemon, bootstrap_once_and_exit_if_failed - from app.core.network.maintenance import replication_daemon, heartbeat_daemon + from app.core.network.maintenance import replication_daemon, heartbeat_daemon, dht_gossip_daemon app.ctx.memory = main_memory - for _target in [uploader_bot_dp, client_bot_dp]: - _target._s_memory = app.ctx.memory - app.ctx.memory._app = app # Ensure DB schema exists using the same event loop as Sanic (idempotent) @@ -117,13 +113,28 @@ if __name__ == '__main__': app.add_task(execute_queue(app)) app.add_task(queue_daemon(app)) - app.add_task(uploader_bot_dp.start_polling(app.ctx.memory._telegram_bot)) - app.add_task(client_bot_dp.start_polling(app.ctx.memory._client_telegram_bot)) + # Start bots after loop is ready + async def _start_bots(): + try: + from app.bot import create_dispatcher as create_uploader_dp + from app.client_bot import create_dispatcher as create_client_dp + uploader_bot_dp = create_uploader_dp() + client_bot_dp = create_client_dp() + for _target in [uploader_bot_dp, client_bot_dp]: + _target._s_memory = app.ctx.memory + await asyncio.gather( + uploader_bot_dp.start_polling(app.ctx.memory._telegram_bot), + client_bot_dp.start_polling(app.ctx.memory._client_telegram_bot), + ) + except Exception as e: + make_log('Bots', f'Failed to start bots: {e}', level='error') + app.add_task(_start_bots()) # Start network handshake daemon and bootstrap step app.add_task(network_handshake_daemon(app)) app.add_task(bootstrap_once_and_exit_if_failed()) app.add_task(replication_daemon(app)) app.add_task(heartbeat_daemon(app)) + app.add_task(dht_gossip_daemon(app)) app.run(host='0.0.0.0', port=SANIC_PORT) else: diff --git a/app/api/__init__.py b/app/api/__init__.py index cdca550..2e2e219 100644 --- a/app/api/__init__.py +++ b/app/api/__init__.py @@ -25,7 +25,7 @@ from app.api.routes.auth import s_api_v1_auth_twa, s_api_v1_auth_select_wallet, from app.api.routes.statics import s_api_tonconnect_manifest, s_api_platform_metadata from app.api.routes.node_storage import s_api_v1_storage_post, s_api_v1_storage_get, \ s_api_v1_storage_decode_cid -from app.api.routes.progressive_storage import s_api_v1_5_storage_get, s_api_v1_5_storage_post +from app.api.routes.progressive_storage import s_api_v1_5_storage_get, s_api_v1_5_storage_post, s_api_v1_storage_fetch, s_api_v1_storage_proxy from app.api.routes.upload_tus import s_api_v1_upload_tus_hook from app.api.routes.account import s_api_v1_account_get from app.api.routes._blockchain import s_api_v1_blockchain_send_new_content_message, \ @@ -52,12 +52,16 @@ from app.api.routes.admin import ( s_api_v1_admin_system, s_api_v1_admin_uploads, s_api_v1_admin_users, + s_api_v1_admin_network, + s_api_v1_admin_network_config, + s_api_v1_admin_network_config_set, ) from app.api.routes.tonconnect import s_api_v1_tonconnect_new, s_api_v1_tonconnect_logout from app.api.routes.keys import s_api_v1_keys_request from app.api.routes.sync import s_api_v1_sync_pin, s_api_v1_sync_status from app.api.routes.upload_status import s_api_v1_upload_status from app.api.routes.metrics import s_api_metrics +from app.api.routes.dht import s_api_v1_dht_get, s_api_v1_dht_put app.add_route(s_index, "/", methods=["GET", "OPTIONS"]) @@ -84,6 +88,8 @@ app.add_route(s_api_v1_tonconnect_logout, "/api/v1/tonconnect.logout", methods=[ app.add_route(s_api_v1_5_storage_post, "/api/v1.5/storage", methods=["POST", "OPTIONS"]) app.add_route(s_api_v1_5_storage_get, "/api/v1.5/storage/", methods=["GET", "OPTIONS"]) +app.add_route(s_api_v1_storage_fetch, "/api/v1/storage.fetch/", methods=["GET", "OPTIONS"]) +app.add_route(s_api_v1_storage_proxy, "/api/v1/storage.proxy/", methods=["GET", "OPTIONS"]) app.add_route(s_api_v1_storage_post, "/api/v1/storage", methods=["POST", "OPTIONS"]) app.add_route(s_api_v1_storage_get, "/api/v1/storage/", methods=["GET", "OPTIONS"]) @@ -119,6 +125,9 @@ app.add_route(s_api_v1_admin_status, "/api/v1/admin.status", methods=["GET", "OP app.add_route(s_api_v1_admin_cache_setlimits, "/api/v1/admin.cache.setLimits", methods=["POST", "OPTIONS"]) app.add_route(s_api_v1_admin_cache_cleanup, "/api/v1/admin.cache.cleanup", methods=["POST", "OPTIONS"]) app.add_route(s_api_v1_admin_sync_setlimits, "/api/v1/admin.sync.setLimits", methods=["POST", "OPTIONS"]) +app.add_route(s_api_v1_admin_network, "/api/v1/admin.network", methods=["GET", "OPTIONS"]) +app.add_route(s_api_v1_admin_network_config, "/api/v1/admin.network.config", methods=["GET", "OPTIONS"]) +app.add_route(s_api_v1_admin_network_config_set, "/api/v1/admin.network.config.set", methods=["POST", "OPTIONS"]) # tusd HTTP hooks app.add_route(s_api_v1_upload_tus_hook, "/api/v1/upload.tus-hook", methods=["POST", "OPTIONS"]) @@ -129,6 +138,8 @@ app.add_route(s_api_v1_sync_pin, "/api/v1/sync.pin", methods=["POST", "OPTIONS"] app.add_route(s_api_v1_sync_status, "/api/v1/sync.status", methods=["GET", "OPTIONS"]) app.add_route(s_api_v1_upload_status, "/api/v1/upload.status/", methods=["GET", "OPTIONS"]) app.add_route(s_api_metrics, "/metrics", methods=["GET", "OPTIONS"]) +app.add_route(s_api_v1_dht_get, "/api/v1/dht.get", methods=["GET", "OPTIONS"]) +app.add_route(s_api_v1_dht_put, "/api/v1/dht.put", methods=["POST", "OPTIONS"]) @app.exception(BaseException) diff --git a/app/api/__pycache__/__init__.cpython-310.pyc b/app/api/__pycache__/__init__.cpython-310.pyc new file mode 100644 index 0000000..a2d1ae0 Binary files /dev/null and b/app/api/__pycache__/__init__.cpython-310.pyc differ diff --git a/app/api/__pycache__/middleware.cpython-310.pyc b/app/api/__pycache__/middleware.cpython-310.pyc new file mode 100644 index 0000000..3b4822d Binary files /dev/null and b/app/api/__pycache__/middleware.cpython-310.pyc differ diff --git a/app/api/routes/__pycache__/_blockchain.cpython-310.pyc b/app/api/routes/__pycache__/_blockchain.cpython-310.pyc new file mode 100644 index 0000000..2d8ba21 Binary files /dev/null and b/app/api/routes/__pycache__/_blockchain.cpython-310.pyc differ diff --git a/app/api/routes/__pycache__/_index.cpython-310.pyc b/app/api/routes/__pycache__/_index.cpython-310.pyc new file mode 100644 index 0000000..6312067 Binary files /dev/null and b/app/api/routes/__pycache__/_index.cpython-310.pyc differ diff --git a/app/api/routes/__pycache__/_system.cpython-310.pyc b/app/api/routes/__pycache__/_system.cpython-310.pyc new file mode 100644 index 0000000..d9a0559 Binary files /dev/null and b/app/api/routes/__pycache__/_system.cpython-310.pyc differ diff --git a/app/api/routes/__pycache__/account.cpython-310.pyc b/app/api/routes/__pycache__/account.cpython-310.pyc new file mode 100644 index 0000000..dcd9932 Binary files /dev/null and b/app/api/routes/__pycache__/account.cpython-310.pyc differ diff --git a/app/api/routes/__pycache__/admin.cpython-310.pyc b/app/api/routes/__pycache__/admin.cpython-310.pyc new file mode 100644 index 0000000..dfcef36 Binary files /dev/null and b/app/api/routes/__pycache__/admin.cpython-310.pyc differ diff --git a/app/api/routes/__pycache__/auth.cpython-310.pyc b/app/api/routes/__pycache__/auth.cpython-310.pyc new file mode 100644 index 0000000..944b115 Binary files /dev/null and b/app/api/routes/__pycache__/auth.cpython-310.pyc differ diff --git a/app/api/routes/__pycache__/content.cpython-310.pyc b/app/api/routes/__pycache__/content.cpython-310.pyc new file mode 100644 index 0000000..7c12731 Binary files /dev/null and b/app/api/routes/__pycache__/content.cpython-310.pyc differ diff --git a/app/api/routes/__pycache__/content_index.cpython-310.pyc b/app/api/routes/__pycache__/content_index.cpython-310.pyc new file mode 100644 index 0000000..4e7e216 Binary files /dev/null and b/app/api/routes/__pycache__/content_index.cpython-310.pyc differ diff --git a/app/api/routes/__pycache__/derivatives.cpython-310.pyc b/app/api/routes/__pycache__/derivatives.cpython-310.pyc new file mode 100644 index 0000000..6e54a63 Binary files /dev/null and b/app/api/routes/__pycache__/derivatives.cpython-310.pyc differ diff --git a/app/api/routes/__pycache__/dht.cpython-310.pyc b/app/api/routes/__pycache__/dht.cpython-310.pyc new file mode 100644 index 0000000..efdc2a3 Binary files /dev/null and b/app/api/routes/__pycache__/dht.cpython-310.pyc differ diff --git a/app/api/routes/__pycache__/keys.cpython-310.pyc b/app/api/routes/__pycache__/keys.cpython-310.pyc new file mode 100644 index 0000000..c2bf9a9 Binary files /dev/null and b/app/api/routes/__pycache__/keys.cpython-310.pyc differ diff --git a/app/api/routes/__pycache__/metrics.cpython-310.pyc b/app/api/routes/__pycache__/metrics.cpython-310.pyc new file mode 100644 index 0000000..91bc842 Binary files /dev/null and b/app/api/routes/__pycache__/metrics.cpython-310.pyc differ diff --git a/app/api/routes/__pycache__/network.cpython-310.pyc b/app/api/routes/__pycache__/network.cpython-310.pyc new file mode 100644 index 0000000..e80eefd Binary files /dev/null and b/app/api/routes/__pycache__/network.cpython-310.pyc differ diff --git a/app/api/routes/__pycache__/network_events.cpython-310.pyc b/app/api/routes/__pycache__/network_events.cpython-310.pyc new file mode 100644 index 0000000..28f3571 Binary files /dev/null and b/app/api/routes/__pycache__/network_events.cpython-310.pyc differ diff --git a/app/api/routes/__pycache__/node_storage.cpython-310.pyc b/app/api/routes/__pycache__/node_storage.cpython-310.pyc new file mode 100644 index 0000000..195a363 Binary files /dev/null and b/app/api/routes/__pycache__/node_storage.cpython-310.pyc differ diff --git a/app/api/routes/__pycache__/progressive_storage.cpython-310.pyc b/app/api/routes/__pycache__/progressive_storage.cpython-310.pyc new file mode 100644 index 0000000..05b5f80 Binary files /dev/null and b/app/api/routes/__pycache__/progressive_storage.cpython-310.pyc differ diff --git a/app/api/routes/__pycache__/statics.cpython-310.pyc b/app/api/routes/__pycache__/statics.cpython-310.pyc new file mode 100644 index 0000000..68ebc13 Binary files /dev/null and b/app/api/routes/__pycache__/statics.cpython-310.pyc differ diff --git a/app/api/routes/__pycache__/sync.cpython-310.pyc b/app/api/routes/__pycache__/sync.cpython-310.pyc new file mode 100644 index 0000000..4b3b884 Binary files /dev/null and b/app/api/routes/__pycache__/sync.cpython-310.pyc differ diff --git a/app/api/routes/__pycache__/tonconnect.cpython-310.pyc b/app/api/routes/__pycache__/tonconnect.cpython-310.pyc new file mode 100644 index 0000000..0c47f13 Binary files /dev/null and b/app/api/routes/__pycache__/tonconnect.cpython-310.pyc differ diff --git a/app/api/routes/__pycache__/upload_status.cpython-310.pyc b/app/api/routes/__pycache__/upload_status.cpython-310.pyc new file mode 100644 index 0000000..6a91568 Binary files /dev/null and b/app/api/routes/__pycache__/upload_status.cpython-310.pyc differ diff --git a/app/api/routes/__pycache__/upload_tus.cpython-310.pyc b/app/api/routes/__pycache__/upload_tus.cpython-310.pyc new file mode 100644 index 0000000..4b08749 Binary files /dev/null and b/app/api/routes/__pycache__/upload_tus.cpython-310.pyc differ diff --git a/app/api/routes/admin.py b/app/api/routes/admin.py index 35b6394..e8c9c55 100644 --- a/app/api/routes/admin.py +++ b/app/api/routes/admin.py @@ -10,7 +10,7 @@ from pathlib import Path from typing import Any, Dict, List, Optional, Tuple from urllib.parse import urlparse -from base58 import b58encode +from app.core._utils.b58 import b58encode from sanic import response from sqlalchemy import Integer, String, and_, case, cast, func, or_, select, Text @@ -48,6 +48,9 @@ from app.core.models.user_activity import UserActivity from app.core._utils.share_links import build_content_links from app.core.content.content_id import ContentId from app.core.events.service import record_event +from app.core.network.dht import MetricKey # type stub; used in typing only +from app.core.network.dht import dht_config +from app.core.models._config import ServiceConfig MIN_ONCHAIN_INDEX = int(os.getenv("MIN_ONCHAIN_INDEX", "8")) @@ -2219,6 +2222,272 @@ async def s_api_v1_admin_cache_cleanup(request): return response.json({"ok": True, "removed": removed}) +async def s_api_v1_admin_network(request): + """Сводка состояния децентрализованной сети для вкладки "Состояние сети". + + Возвращает: + - summary: n_estimate, количество участников, число островов, сводка конфликтов репликаций + - members: список нод с ролями/версиями/достижимостью и атрибутами + - per_node_replication: сколько лизов держит каждая нода и сколько раз является лидером + """ + if (unauth := _ensure_admin(request)): + return unauth + + mem = getattr(request.app.ctx, 'memory', None) + if not mem: + return response.json({"error": "MEMORY_NOT_READY"}, status=503) + + membership = mem.membership.state + n_est = membership.n_estimate() + active_all = membership.active_members(include_islands=True) + active_filtered = membership.active_members(include_islands=False) + islands = [m for m in active_all if membership.reachability_ratio(m['node_id']) < dht_config.default_q] + + # Обогащение из БД (версии, роли, public_host) + db = request.ctx.db_session + known = (await db.execute(select(KnownNode))).scalars().all() + meta_by_pub = {r.public_key: (r, r.meta or {}) for r in known} + meta_by_host = {r.ip: (r, r.meta or {}) for r in known} + + # Precompute receipts stats per node + receipts_elements = membership.receipts.elements() if hasattr(membership, 'receipts') else {} + receipts_by_target: Dict[str, Dict[str, Any]] = {} + for _rid, rec in receipts_elements.items(): + tid = str(rec.get('target_id')) + if not tid: + continue + bucket = receipts_by_target.setdefault(tid, { 'total': 0, 'asn_set': set() }) + bucket['total'] += 1 + if rec.get('asn') is not None: + try: + bucket['asn_set'].add(int(rec.get('asn'))) + except Exception: + pass + + def _enrich(member: dict) -> dict: + pub = str(member.get('public_key') or '') + host = str(member.get('ip') or '') + row_meta = (meta_by_pub.get(pub) or meta_by_host.get(host) or (None, {}))[1] + caps = (member.get('meta') or {}).get('capabilities') or {} + rec_stat = receipts_by_target.get(member.get('node_id') or '', {'total': 0, 'asn_set': set()}) + return { + 'node_id': member.get('node_id'), + 'public_key': pub or None, + 'public_host': row_meta.get('public_host'), + 'version': row_meta.get('version'), + 'role': row_meta.get('role') or 'read-only', + 'ip': host or None, + 'asn': member.get('asn'), + 'ip_first_octet': member.get('ip_first_octet'), + 'reachability_ratio': membership.reachability_ratio(member.get('node_id')), + 'last_update': member.get('last_update'), + 'accepts_inbound': bool(caps.get('accepts_inbound')), + 'is_bootstrap': bool(caps.get('is_bootstrap')), + 'receipts_total': int(rec_stat.get('total') or 0), + 'receipts_asn_unique': len(rec_stat.get('asn_set') or ()), + } + + members_payload = [_enrich(m) for m in active_all] + # Server-side pagination + try: + page = max(1, int(request.args.get('page') or 1)) + except Exception: + page = 1 + try: + page_size = max(1, min(500, int(request.args.get('page_size') or 100))) + except Exception: + page_size = 100 + total_members = len(members_payload) + start = (page - 1) * page_size + end = start + page_size + members_page = members_payload[start:end] + + # Агрегация репликаций по снимку DHT + snapshot = mem.dht_store.snapshot() if hasattr(mem, 'dht_store') else {} + per_node = {} + conflict_under = 0 + conflict_over = 0 + for fp, rec in snapshot.items(): + key = rec.get('key') or '' + if not key.startswith('meta:'): + continue + value = rec.get('value') or {} + content_id = value.get('content_id') + leases = (value.get('replica_leases') or {}).values() + leader = value.get('leader') + # Конфликты + for ev in value.get('conflict_log') or []: + t = (ev.get('type') or '').upper() + if t == 'UNDER_REPLICATED': + conflict_under += 1 + elif t == 'OVER_REPLICATED': + conflict_over += 1 + # Пер-нодовые конфликты + nid = ev.get('node_id') + if nid: + p = per_node.setdefault(nid, {'leases_held': 0, 'leaderships': 0, 'sample_contents': [], 'conflicts': {'over': 0, 'lease_expired': 0}, 'conflict_samples': []}) + if t == 'OVER_REPLICATED': + p['conflicts']['over'] = p['conflicts'].get('over', 0) + 1 + elif t == 'LEASE_EXPIRED': + p['conflicts']['lease_expired'] = p['conflicts'].get('lease_expired', 0) + 1 + if content_id and len(p['conflict_samples']) < 10: + p['conflict_samples'].append({'content_id': content_id, 'type': t, 'ts': ev.get('ts')}) + # Лизы + for l in leases: + nid = l.get('node_id') + if not nid: + continue + p = per_node.setdefault(nid, {'leases_held': 0, 'leaderships': 0, 'sample_contents': [], 'conflicts': {'over': 0, 'lease_expired': 0}, 'conflict_samples': []}) + p['leases_held'] += 1 + if content_id and len(p['sample_contents']) < 5: + p['sample_contents'].append(content_id) + if leader: + p = per_node.setdefault(leader, {'leases_held': 0, 'leaderships': 0, 'sample_contents': [], 'conflicts': {'over': 0, 'lease_expired': 0}, 'conflict_samples': []}) + p['leaderships'] += 1 + + # Добавим trusted-only n_estimate и показатели активности в summary + # Соберём allowed_nodes так же, как в репликации + from app.core._utils.b58 import b58decode + from app.core.network.dht.crypto import compute_node_id + allowed_nodes = set() + for row, meta in meta_by_pub.values(): + try: + if (meta or {}).get('role') == 'trusted' and row.public_key: + allowed_nodes.add(compute_node_id(b58decode(row.public_key))) + except Exception: + pass + allowed_nodes.add(mem.node_id) + n_est_trusted = membership.n_estimate_trusted(allowed_nodes) if hasattr(membership, 'n_estimate_trusted') else n_est + # Активные trusted: те, кто в allowed_nodes и проходят TTL/Q + active_trusted = [m for m in active_filtered if m.get('node_id') in allowed_nodes] + # Экспортируем конфиг интервалов + from app.core.network.dht import dht_config + + # Build receipts report with validation status + receipts_raw = (membership.receipts.elements() if hasattr(membership, 'receipts') else {}) or {} + receipts: List[Dict[str, Any]] = [] + members_map = membership.members.elements() if hasattr(membership, 'members') else {} + for _rid, entry in receipts_raw.items(): + target_id = str(entry.get('target_id')) + issuer_id = str(entry.get('issuer_id')) + asn = entry.get('asn') + timestamp = entry.get('timestamp') + signature = str(entry.get('signature') or '') + status = 'unknown' + # verify if possible + issuer_pub = None + for mid, mdata in members_map.items(): + if mid == issuer_id: + issuer_pub = mdata.get('public_key') + break + if issuer_pub: + try: + from app.core._utils.b58 import b58decode as _b58d + from app.core.network.dht.crypto import compute_node_id + import nacl.signing # type: ignore + # node_id/pubkey match + if compute_node_id(_b58d(issuer_pub)) != issuer_id: + status = 'mismatch_node_id' + else: + payload = { + 'schema_version': dht_config.schema_version, + 'target_id': target_id, + 'issuer_id': issuer_id, + 'asn': int(asn) if asn is not None else None, + 'timestamp': float(timestamp or 0), + } + blob = json.dumps(payload, sort_keys=True, separators=(",", ":")).encode() + vk = nacl.signing.VerifyKey(_b58d(issuer_pub)) + vk.verify(blob, _b58d(signature)) + status = 'valid' + except Exception: + status = 'bad_signature' + else: + status = 'unknown_issuer' + receipts.append({ + 'target_id': target_id, + 'issuer_id': issuer_id, + 'asn': asn, + 'timestamp': timestamp, + 'status': status, + }) + + return response.json({ + 'summary': { + 'n_estimate': n_est, + 'n_estimate_trusted': n_est_trusted, + 'active_trusted': len(active_trusted), + 'members_total': len(active_all), + 'active': len(active_filtered), + 'islands': len(islands), + 'replication_conflicts': { + 'under': conflict_under, + 'over': conflict_over, + }, + 'config': { + 'heartbeat_interval': dht_config.heartbeat_interval, + 'lease_ttl': dht_config.lease_ttl, + 'gossip_interval_sec': dht_config.gossip_interval_sec, + 'gossip_backoff_base_sec': dht_config.gossip_backoff_base_sec, + 'gossip_backoff_cap_sec': dht_config.gossip_backoff_cap_sec, + } + }, + 'members': members_page, + 'per_node_replication': per_node, + 'receipts': receipts, + 'paging': { 'page': page, 'page_size': page_size, 'total': total_members }, + }) + + +async def s_api_v1_admin_network_config(request): + if (unauth := _ensure_admin(request)): + return unauth + cfg = dht_config + async with request.ctx.db_session() as session: + sc = ServiceConfig(session) + out = { + 'heartbeat_interval': cfg.heartbeat_interval, + 'lease_ttl': cfg.lease_ttl, + 'gossip_interval_sec': cfg.gossip_interval_sec, + 'gossip_backoff_base_sec': cfg.gossip_backoff_base_sec, + 'gossip_backoff_cap_sec': cfg.gossip_backoff_cap_sec, + } + # include overrides if present + for k in list(out.keys()): + ov = await sc.get(f'DHT_{k.upper()}', None) + if ov is not None: + out[k] = int(ov) + return response.json({'ok': True, 'config': out}) + + +async def s_api_v1_admin_network_config_set(request): + if (unauth := _ensure_admin(request)): + return unauth + data = request.json or {} + allowed = { + 'heartbeat_interval': (5, 3600), + 'lease_ttl': (60, 86400), + 'gossip_interval_sec': (5, 600), + 'gossip_backoff_base_sec': (1, 300), + 'gossip_backoff_cap_sec': (10, 7200), + } + updates = {} + for key, (lo, hi) in allowed.items(): + if key in data: + try: + val = int(data[key]) + except Exception: + return response.json({'error': f'BAD_{key.upper()}'}, status=400) + if val < lo or val > hi: + return response.json({'error': f'RANGE_{key.upper()}', 'min': lo, 'max': hi}, status=400) + updates[key] = val + async with request.ctx.db_session() as session: + sc = ServiceConfig(session) + for key, val in updates.items(): + await sc.set(f'DHT_{key.upper()}', val) + return response.json({'ok': True, 'updated': updates}) + + async def s_api_v1_admin_sync_setlimits(request): if (unauth := _ensure_admin(request)): return unauth diff --git a/app/api/routes/content.py b/app/api/routes/content.py index a7d3dd2..a0ed089 100644 --- a/app/api/routes/content.py +++ b/app/api/routes/content.py @@ -1,3 +1,4 @@ +from __future__ import annotations from datetime import datetime, timedelta from sanic import response from sqlalchemy import select, and_, func, or_ @@ -86,7 +87,24 @@ async def s_api_v1_content_view(request, content_address: str): 'content_type': content_type, 'content_mime': ctype, } - content = await open_content_async(request.ctx.db_session, r_content) + try: + content = await open_content_async(request.ctx.db_session, r_content) + except AssertionError: + # Fallback: handle plain stored content without encrypted/decrypted pairing + sc = r_content + from mimetypes import guess_type as _guess + _mime, _ = _guess(sc.filename or '') + _mime = _mime or 'application/octet-stream' + try: + _ctype = _mime.split('/')[0] + except Exception: + _ctype = 'application' + content = { + 'encrypted_content': sc, + 'decrypted_content': sc, + 'content_type': _ctype, + 'content_mime': _mime, + } master_address = content['encrypted_content'].meta.get('item_address', '') opts = { @@ -222,6 +240,16 @@ async def s_api_v1_content_view(request, content_address: str): or opts.get('content_mime') or 'application/octet-stream' ) + # Fallback: if stored content reports generic application/*, try guess by filename + try: + if content_mime.startswith('application/'): + from mimetypes import guess_type as _guess + _fn = decrypted_json.get('filename') or encrypted_json.get('filename') or '' + _gm, _ = _guess(_fn) + if _gm: + content_mime = _gm + except Exception: + pass opts['content_mime'] = content_mime try: opts['content_type'] = content_mime.split('/')[0] @@ -249,7 +277,7 @@ async def s_api_v1_content_view(request, content_address: str): if not row or not row.local_path: return None, None file_hash = row.local_path.split('/')[-1] - return file_hash, f"{PROJECT_HOST}/api/v1.5/storage/{file_hash}" + return file_hash, f"{PROJECT_HOST}/api/v1/storage.proxy/{file_hash}" has_preview = bool(derivative_latest.get('decrypted_preview') or converted_meta_map.get('low_preview')) display_options['has_preview'] = has_preview @@ -270,10 +298,28 @@ async def s_api_v1_content_view(request, content_address: str): chosen_row = derivative_latest[key] break + def _make_token_for(hash_value: str, scope: str, user_id: int | None) -> str: + try: + from app.core._crypto.signer import Signer + from app.core._secrets import hot_seed, hot_pubkey + from app.core._utils.b58 import b58encode as _b58e + import time, json + signer = Signer(hot_seed) + exp = int(time.time()) + 600 + uid = int(user_id or 0) + payload = {'hash': hash_value, 'scope': scope, 'exp': exp, 'uid': uid} + blob = json.dumps(payload, sort_keys=True, separators=(",", ":")).encode() + sig = signer.sign(blob) + pub = _b58e(hot_pubkey).decode() + return f"pub={pub}&exp={exp}&scope={scope}&uid={uid}&sig={sig}" + except Exception: + return "" + if chosen_row: file_hash, url = _row_to_hash_and_url(chosen_row) if url: - display_options['content_url'] = url + token = _make_token_for(file_hash or '', 'full' if have_access else 'preview', getattr(request.ctx.user, 'id', None)) + display_options['content_url'] = f"{url}?{token}" if token else url ext_candidate = None if chosen_row.content_type: ext_candidate = chosen_row.content_type.split('/')[-1] @@ -298,17 +344,31 @@ async def s_api_v1_content_view(request, content_address: str): hash_value = converted_meta_map.get(key) if not hash_value: continue - stored = (await request.ctx.db_session.execute(select(StoredContent).where(StoredContent.hash == hash_value))).scalars().first() - if stored: - display_options['content_url'] = stored.web_url - filename = stored.filename or '' - if '.' in filename: - opts['content_ext'] = filename.split('.')[-1] - elif '/' in content_mime: - opts['content_ext'] = content_mime.split('/')[-1] - if content_kind == 'binary': - display_options['original_available'] = True - break + # Пробуем сразу через прокси (даже если локальной записи нет) + token = _make_token_for(hash_value, 'full' if have_access else 'preview', getattr(request.ctx.user, 'id', None)) + display_options['content_url'] = f"{PROJECT_HOST}/api/v1/storage.proxy/{hash_value}?{token}" if token else f"{PROJECT_HOST}/api/v1/storage.proxy/{hash_value}" + if '/' in content_mime: + opts['content_ext'] = content_mime.split('/')[-1] + if content_kind == 'binary': + display_options['original_available'] = True + break + + # Final fallback: no derivatives known — serve stored content directly for AV + if not display_options['content_url'] and content_kind in ('audio', 'video'): + from app.core._utils.b58 import b58encode as _b58e + scid = decrypted_json.get('cid') or encrypted_json.get('cid') + try: + from app.core.content.content_id import ContentId as _CID + if scid: + _cid = _CID.deserialize(scid) + h = _cid.content_hash_b58 + else: + h = decrypted_json.get('hash') + except Exception: + h = decrypted_json.get('hash') + if h: + token = _make_token_for(h, 'preview' if not have_access else 'full', getattr(request.ctx.user, 'id', None)) + display_options['content_url'] = f"{PROJECT_HOST}/api/v1/storage.proxy/{h}?{token}" if token else f"{PROJECT_HOST}/api/v1/storage.proxy/{h}" # Metadata fallback content_meta = encrypted_json @@ -334,7 +394,8 @@ async def s_api_v1_content_view(request, content_address: str): } cover_cid = content_meta.get('cover_cid') if cover_cid: - content_metadata_json.setdefault('image', f"{PROJECT_HOST}/api/v1.5/storage/{cover_cid}") + token = _make_token_for(cover_cid, 'preview', getattr(request.ctx.user, 'id', None)) + content_metadata_json.setdefault('image', f"{PROJECT_HOST}/api/v1/storage.proxy/{cover_cid}?{token}" if token else f"{PROJECT_HOST}/api/v1/storage.proxy/{cover_cid}") display_options['metadata'] = content_metadata_json diff --git a/app/api/routes/dht.py b/app/api/routes/dht.py new file mode 100644 index 0000000..931b357 --- /dev/null +++ b/app/api/routes/dht.py @@ -0,0 +1,125 @@ +from __future__ import annotations + +import json +from typing import Any, Dict, List + +from sanic import response + +from app.core.logger import make_log +from app.core._utils.b58 import b58decode +from app.core.network.dht.records import DHTRecord +from app.core.network.dht.store import DHTStore +from app.core.network.dht.crypto import compute_node_id +from app.core.network.dht.keys import MetaKey, MembershipKey, MetricKey +from sqlalchemy import select +from app.core.models.my_network import KnownNode + + +def _merge_strategy_for(key: str): + # Выбираем правильную стратегию merge по префиксу ключа + from app.core.network.dht.replication import ReplicationState + from app.core.network.dht.membership import MembershipState + from app.core.network.dht.metrics import ContentMetricsState + if key.startswith('meta:'): + return lambda a, b: ReplicationState.from_dict(a).merge_with(ReplicationState.from_dict(b)).to_dict() + if key.startswith('membership:'): + # Для membership нужен node_id, но это только для локального состояния; здесь достаточно CRDT-мерджа + return lambda a, b: MembershipState.from_dict('remote', None, a).merge(MembershipState.from_dict('remote', None, b)).to_dict() + if key.startswith('metric:'): + return lambda a, b: ContentMetricsState.from_dict('remote', a).merge(ContentMetricsState.from_dict('remote', b)).to_dict() + return lambda a, b: b + + +async def s_api_v1_dht_get(request): + """Возвращает запись DHT по fingerprint или key.""" + store: DHTStore = request.app.ctx.memory.dht_store + fp = request.args.get('fingerprint') + key = request.args.get('key') + if fp: + rec = store.get(fp) + if not rec: + return response.json({'error': 'NOT_FOUND'}, status=404) + return response.json({**rec.to_payload(), 'signature': rec.signature}) + if key: + snap = store.snapshot() + for _fp, payload in snap.items(): + if payload.get('key') == key: + return response.json(payload) + return response.json({'error': 'NOT_FOUND'}, status=404) + return response.json({'error': 'BAD_REQUEST'}, status=400) + + +def _verify_publisher(node_id: str, public_key_b58: str) -> bool: + try: + derived = compute_node_id(b58decode(public_key_b58)) + return derived == node_id + except Exception: + return False + + +async def s_api_v1_dht_put(request): + """Принимает запись(и) DHT, проверяет подпись и выполняет merge/persist. + + Поддерживает одиночную запись (record: {...}) и пакет (records: [{...}]). + Требует поле public_key отправителя и соответствие node_id. + """ + mem = request.app.ctx.memory + store: DHTStore = mem.dht_store + data = request.json or {} + public_key = data.get('public_key') + if not public_key: + return response.json({'error': 'MISSING_PUBLIC_KEY'}, status=400) + + # Determine publisher role (trusted/read-only/deny) + role = None + try: + session = request.ctx.db_session + kn = (await session.execute(select(KnownNode).where(KnownNode.public_key == public_key))).scalars().first() + role = (kn.meta or {}).get('role') if kn and kn.meta else None + except Exception: + role = None + + def _process_one(payload: Dict[str, Any]) -> Dict[str, Any]: + try: + rec = DHTRecord.create( + key=payload['key'], + fingerprint=payload['fingerprint'], + value=payload['value'], + node_id=payload['node_id'], + logical_counter=int(payload['logical_counter']), + signature=payload.get('signature'), + timestamp=float(payload.get('timestamp') or 0), + ) + except Exception as e: + return {'error': f'BAD_RECORD: {e}'} + if not _verify_publisher(rec.node_id, public_key): + return {'error': 'NODE_ID_MISMATCH'} + # Подтверждение подписи записи + if not rec.verify(public_key): + return {'error': 'BAD_SIGNATURE'} + # Enforce ACL: untrusted nodes may not mutate meta/metric records + if role != 'trusted': + if rec.key.startswith('meta:') or rec.key.startswith('metric:'): + return {'error': 'FORBIDDEN_NOT_TRUSTED'} + merge_fn = _merge_strategy_for(rec.key) + try: + merged = store.merge_record(rec, merge_fn) + return {'ok': True, 'fingerprint': merged.fingerprint} + except Exception as e: + make_log('DHT.put', f'merge failed: {e}', level='warning') + return {'error': 'MERGE_FAILED'} + + if 'record' in data: + result = _process_one(data['record']) + status = 200 if 'ok' in result else 400 + return response.json(result, status=status) + elif 'records' in data and isinstance(data['records'], list): + results: List[Dict[str, Any]] = [] + ok = True + for item in data['records']: + res = _process_one(item) + if 'error' in res: + ok = False + results.append(res) + return response.json({'ok': ok, 'results': results}, status=200 if ok else 207) + return response.json({'error': 'BAD_REQUEST'}, status=400) diff --git a/app/api/routes/network.py b/app/api/routes/network.py index 9f47b53..73d2d28 100644 --- a/app/api/routes/network.py +++ b/app/api/routes/network.py @@ -109,6 +109,17 @@ async def s_api_v1_network_handshake(request): if not data.get("nonce") or not check_and_remember_nonce(request.app.ctx.memory, data.get("public_key"), data.get("nonce")): return response.json({"error": "NONCE_REPLAY"}, status=400) + # Base schema and identity checks + if data.get("schema_version") != dht_config.schema_version: + return response.json({"error": "UNSUPPORTED_SCHEMA_VERSION"}, status=400) + + try: + expected_node_id = compute_node_id(b58decode(data["public_key"])) + except Exception: + return response.json({"error": "BAD_PUBLIC_KEY"}, status=400) + if data.get("node_id") != expected_node_id: + return response.json({"error": "NODE_ID_MISMATCH"}, status=400) + peer_version = str(data.get("version")) ipfs_meta = _extract_ipfs_meta(data.get("ipfs") or {}) comp = compatibility(peer_version, CURRENT_PROTOCOL_VERSION) @@ -160,9 +171,10 @@ async def s_api_v1_network_handshake(request): membership_mgr = getattr(request.app.ctx.memory, "membership", None) if membership_mgr: remote_ip = (request.headers.get('X-Forwarded-For') or request.remote_addr or request.ip or '').split(',')[0].strip() or None + # Determine caller ASN using advertised value or resolver remote_asn = data.get("asn") if remote_asn is None: - remote_asn = asn_resolver.resolve(remote_ip) + remote_asn = await asn_resolver.resolve_async(remote_ip, request.ctx.db_session) else: if remote_ip: asn_resolver.learn(remote_ip, int(remote_asn)) @@ -181,15 +193,42 @@ async def s_api_v1_network_handshake(request): if not receipt.get("target_id") or not receipt.get("issuer_id"): continue try: - membership_mgr.record_receipt( - ReachabilityReceipt( - target_id=str(receipt.get("target_id")), - issuer_id=str(receipt.get("issuer_id")), - asn=int(receipt["asn"]) if receipt.get("asn") is not None else None, - timestamp=float(receipt.get("timestamp", data.get("timestamp"))), - signature=str(receipt.get("signature", "")), + # Only accept receipts issued by the caller + issuer_id = str(receipt.get("issuer_id")) + if issuer_id != data["node_id"]: + continue + # Canonical message for receipt verification + # schema_version is embedded to avoid replay across versions + rec_asn = receipt.get("asn") + if rec_asn is None: + rec_asn = remote_asn + payload = { + "schema_version": dht_config.schema_version, + "target_id": str(receipt.get("target_id")), + "issuer_id": issuer_id, + "asn": int(rec_asn) if rec_asn is not None else None, + "timestamp": float(receipt.get("timestamp", data.get("timestamp"))), + } + blob = json.dumps(payload, sort_keys=True, separators=(",", ":")).encode() + try: + import nacl.signing # type: ignore + from app.core._utils.b58 import b58decode as _b58d + vk = nacl.signing.VerifyKey(_b58d(data["public_key"])) + sig_b = _b58d(str(receipt.get("signature", ""))) + vk.verify(blob, sig_b) + # Accept and persist + membership_mgr.record_receipt( + ReachabilityReceipt( + target_id=payload["target_id"], + issuer_id=payload["issuer_id"], + asn=payload["asn"], + timestamp=payload["timestamp"], + signature=str(receipt.get("signature", "")), + ) ) - ) + except Exception: + # Ignore invalid receipts + continue except Exception: continue except Exception as exc: @@ -271,8 +310,3 @@ async def s_api_v1_network_handshake(request): status = 200 resp["warning"] = "MINOR version differs; proceed with caution" return response.json(resp, status=status) - if data.get("schema_version") != dht_config.schema_version: - return response.json({"error": "UNSUPPORTED_SCHEMA_VERSION"}, status=400) - expected_node_id = compute_node_id(b58decode(data["public_key"])) - if data.get("node_id") != expected_node_id: - return response.json({"error": "NODE_ID_MISMATCH"}, status=400) diff --git a/app/api/routes/progressive_storage.py b/app/api/routes/progressive_storage.py index 91ca50e..12e19bd 100644 --- a/app/api/routes/progressive_storage.py +++ b/app/api/routes/progressive_storage.py @@ -16,6 +16,14 @@ from app.core.models.node_storage import StoredContent from app.core._config import UPLOADS_DIR from app.core.models.content_v3 import ContentDerivative from app.core._utils.resolve_content import resolve_content +from app.core.network.nodesig import verify_request +from app.core.models.my_network import KnownNode +from sqlalchemy import select as sa_select +import httpx +from app.core._crypto.signer import Signer +from app.core._secrets import hot_seed +from app.core._utils.b58 import b58encode as _b58e, b58decode as _b58d +import json, time # POST /api/v1.5/storage @@ -305,3 +313,125 @@ async def s_api_v1_5_storage_get(request, file_hash): else: make_log("uploader_v1.5", f"Returning full file for video/audio: {final_path}", level="INFO") return await response.file(final_path, mime_type=mime_type) + + +# GET /api/v1/storage.fetch/ +# Внутренний эндпойнт для межузлового запроса (NodeSig). Возвращает файл, если он есть локально. +async def s_api_v1_storage_fetch(request, file_hash): + ok, node_id, reason = verify_request(request, request.app.ctx.memory) + if not ok: + return response.json({"error": reason or "UNAUTHORIZED"}, status=401) + # Только доверенные узлы + try: + session = request.ctx.db_session + row = (await session.execute(sa_select(KnownNode).where(KnownNode.public_key == node_id))).scalars().first() + role = (row.meta or {}).get('role') if row and row.meta else None + if role != 'trusted': + return response.json({"error": "DENIED_NOT_TRUSTED"}, status=403) + except Exception: + pass + # Переиспользуем реализацию v1.5 + return await s_api_v1_5_storage_get(request, file_hash) + + +# GET /api/v1/storage.proxy/ +# Проксирование для web-клиента: если локально нет файла, попытка получить у доверенных узлов по NodeSig +async def s_api_v1_storage_proxy(request, file_hash): + # Require either valid NodeSig (unlikely for public clients) or a signed access token + # Token fields: pub, exp, scope, uid, sig over json {hash,scope,exp,uid} + def _verify_access_token() -> bool: + try: + pub = (request.args.get('pub') or '').strip() + exp = int(request.args.get('exp') or '0') + scope = (request.args.get('scope') or '').strip() + uid = int(request.args.get('uid') or '0') + sig = (request.args.get('sig') or '').strip() + if not pub or not exp or not scope or not sig: + return False + if exp < int(time.time()): + return False + payload = { + 'hash': file_hash, + 'scope': scope, + 'exp': exp, + 'uid': uid, + } + blob = json.dumps(payload, sort_keys=True, separators=(",", ":")).encode() + import nacl.signing + vk = nacl.signing.VerifyKey(_b58d(pub)) + vk.verify(blob, _b58d(sig)) + # Note: we do not require a session-bound user for media fetches, + # the short‑lived signature itself is sufficient. + return True + except Exception: + return False + + ok_nodesig, _nid, _reason = verify_request(request, request.app.ctx.memory) + if not ok_nodesig and not _verify_access_token(): + return response.json({'error': 'UNAUTHORIZED'}, status=401) + # Сначала пробуем локально без возврата 404 + try: + from base58 import b58encode as _b58e + try: + # Поддержка как хэша, так и CID + from app.core._utils.resolve_content import resolve_content as _res + cid, _ = _res(file_hash) + file_hash = _b58e(cid.content_hash).decode() + except Exception: + pass + final_path = os.path.join(UPLOADS_DIR, f"{file_hash}") + if os.path.exists(final_path): + return await s_api_v1_5_storage_get(request, file_hash) + except Exception: + pass + # Локально нет — пробуем у доверенных + try: + async with request.app.ctx.memory.transaction("storage.proxy"): + # Соберём список trusted узлов + session = request.ctx.db_session + nodes = (await session.execute(sa_select(KnownNode))).scalars().all() + candidates = [] + for n in nodes: + role = (n.meta or {}).get('role') if n.meta else None + if role != 'trusted': + continue + host = (n.meta or {}).get('public_host') or (n.ip or '') + if not host: + continue + base = host.rstrip('/') + if not base.startswith('http'): + base = f"http://{base}:{n.port or 80}" + candidates.append(base) + # Проксируем с передачей Range, стриминг + range_header = request.headers.get("Range") + timeout = httpx.Timeout(10.0, read=60.0) + for base in candidates: + url = f"{base}/api/v1/storage.fetch/{file_hash}" + try: + # Подпишем NodeSig + from app.core._secrets import hot_seed, hot_pubkey + from app.core.network.nodesig import sign_headers + from app.core._utils.b58 import b58encode as _b58e + pk_b58 = _b58e(hot_pubkey).decode() + headers = sign_headers('GET', f"/api/v1/storage.fetch/{file_hash}", b"", hot_seed, pk_b58) + if range_header: + headers['Range'] = range_header + async with httpx.AsyncClient(timeout=timeout) as client: + r = await client.get(url, headers=headers) + if r.status_code == 404: + continue + if r.status_code not in (200, 206): + continue + # Проксируем заголовки контента + resp = await request.respond(status=r.status_code, headers={ + k: v for k, v in r.headers.items() if k.lower() in ("content-type", "content-length", "content-range", "accept-ranges") + }) + async for chunk in r.aiter_bytes(chunk_size=1024*1024): + await resp.send(chunk) + await resp.eof() + return resp + except Exception as e: + continue + except Exception: + pass + return response.json({"error": "File not found"}, status=404) diff --git a/app/bot/__init__.py b/app/bot/__init__.py index e677de1..9e79e37 100644 --- a/app/bot/__init__.py +++ b/app/bot/__init__.py @@ -7,6 +7,9 @@ from app.bot.middleware import UserDataMiddleware from app.bot.routers.index import main_router -dp = Dispatcher(storage=MemoryStorage()) -dp.update.outer_middleware(UserDataMiddleware()) -dp.include_router(main_router) +def create_dispatcher() -> Dispatcher: + """Create aiogram Dispatcher lazily to avoid event loop issues at import time.""" + dp = Dispatcher(storage=MemoryStorage()) + dp.update.outer_middleware(UserDataMiddleware()) + dp.include_router(main_router) + return dp diff --git a/app/client_bot/__init__.py b/app/client_bot/__init__.py index e7dbdae..602eced 100644 --- a/app/client_bot/__init__.py +++ b/app/client_bot/__init__.py @@ -6,6 +6,9 @@ from aiogram.fsm.storage.memory import MemoryStorage from app.bot.middleware import UserDataMiddleware from app.client_bot.routers.index import main_router -dp = Dispatcher(storage=MemoryStorage()) -dp.update.outer_middleware(UserDataMiddleware()) -dp.include_router(main_router) + +def create_dispatcher() -> Dispatcher: + dp = Dispatcher(storage=MemoryStorage()) + dp.update.outer_middleware(UserDataMiddleware()) + dp.include_router(main_router) + return dp diff --git a/app/core/models/dht.py b/app/core/models/dht.py new file mode 100644 index 0000000..d537fea --- /dev/null +++ b/app/core/models/dht.py @@ -0,0 +1,22 @@ +from __future__ import annotations + +from sqlalchemy import Column, String, Integer, Float, JSON, DateTime +from datetime import datetime + +from .base import AlchemyBase + + +class DHTRecordRow(AlchemyBase): + __tablename__ = 'dht_records' + + # fingerprint = blake3(serialized key) + fingerprint = Column(String(128), primary_key=True) + key = Column(String(512), nullable=False, index=True) + schema_version = Column(String(16), nullable=False, default='v1') + logical_counter = Column(Integer, nullable=False, default=0) + timestamp = Column(Float, nullable=False, default=0.0) + node_id = Column(String(128), nullable=False) + signature = Column(String(512), nullable=True) + value = Column(JSON, nullable=False, default=dict) + updated_at = Column(DateTime, nullable=False, default=datetime.utcnow, onupdate=datetime.utcnow) + diff --git a/app/core/models/memory.py b/app/core/models/memory.py index 55d44c7..84dc197 100644 --- a/app/core/models/memory.py +++ b/app/core/models/memory.py @@ -11,12 +11,12 @@ from app.core._crypto.signer import Signer from app.core._secrets import hot_pubkey, hot_seed from app.core.logger import make_log from app.core.network.dht import ( - DHTStore, MembershipManager, ReplicationManager, MetricsAggregator, compute_node_id, ) +from app.core.network.dht.store import PersistentDHTStore class Memory: @@ -59,7 +59,7 @@ class Memory: # Decentralised storage components self.node_id = compute_node_id(hot_pubkey) self.signer = Signer(hot_seed) - self.dht_store = DHTStore(self.node_id, self.signer) + self.dht_store = PersistentDHTStore(self.node_id, self.signer) self.membership = MembershipManager(self.node_id, self.signer, self.dht_store) self.replication = ReplicationManager(self.node_id, self.signer, self.dht_store) self.metrics = MetricsAggregator(self.node_id, self.signer, self.dht_store) diff --git a/app/core/models/rdap.py b/app/core/models/rdap.py new file mode 100644 index 0000000..3f94dae --- /dev/null +++ b/app/core/models/rdap.py @@ -0,0 +1,16 @@ +from __future__ import annotations + +from datetime import datetime +from sqlalchemy import Column, String, Integer, DateTime + +from .base import AlchemyBase + + +class RdapCache(AlchemyBase): + __tablename__ = 'rdap_cache' + + ip = Column(String(64), primary_key=True) + asn = Column(Integer, nullable=True) + source = Column(String(64), nullable=True) + updated_at = Column(DateTime, nullable=False, default=datetime.utcnow, onupdate=datetime.utcnow) + diff --git a/app/core/network/asn.py b/app/core/network/asn.py index 704ce7f..5a1b388 100644 --- a/app/core/network/asn.py +++ b/app/core/network/asn.py @@ -32,6 +32,63 @@ class ASNResolver: return self.cache[norm] = asn + async def resolve_async(self, ip: str | None, db_session=None) -> Optional[int]: + """Resolve ASN via persistent cache; fallback to RDAP API; store result. + + - Checks in-memory cache first. + - If not found, checks DB table rdap_cache when available. + - If still not found, queries a public API and persists. + """ + norm = self.normalise(ip) + if not norm: + return None + # In-memory cache first + if norm in self.cache: + return self.cache[norm] + # DB lookup if possible + try: + if db_session is not None: + from sqlalchemy import select + from app.core.models.rdap import RdapCache + row = (await db_session.execute(select(RdapCache).where(RdapCache.ip == norm))).scalars().first() + if row and row.asn is not None: + self.cache[norm] = int(row.asn) + return int(row.asn) + except Exception as e: + make_log("ASNResolver", f"DB lookup failed for {norm}: {e}", level="warning") + + # Remote lookup (best-effort) + asn: Optional[int] = None + try: + import httpx + url = f"https://api.iptoasn.com/v1/as/ip/{norm}" + async with httpx.AsyncClient(timeout=5.0) as client: + r = await client.get(url) + if r.status_code == 200: + j = r.json() + num = j.get("as_number") + if isinstance(num, int) and num > 0: + asn = num + except Exception as e: + make_log("ASNResolver", f"RDAP lookup failed for {norm}: {e}", level="warning") + + if asn is not None: + self.cache[norm] = asn + # Persist to DB if possible + try: + if db_session is not None: + from app.core.models.rdap import RdapCache + row = await db_session.get(RdapCache, norm) + if row is None: + row = RdapCache(ip=norm, asn=asn, source="iptoasn") + db_session.add(row) + else: + row.asn = asn + row.source = "iptoasn" + await db_session.commit() + except Exception as e: + make_log("ASNResolver", f"DB persist failed for {norm}: {e}", level="warning") + return asn + resolver = ASNResolver() - diff --git a/app/core/network/dht/__pycache__/config.cpython-310.pyc b/app/core/network/dht/__pycache__/config.cpython-310.pyc index 87c49bc..1267287 100644 Binary files a/app/core/network/dht/__pycache__/config.cpython-310.pyc and b/app/core/network/dht/__pycache__/config.cpython-310.pyc differ diff --git a/app/core/network/dht/__pycache__/membership.cpython-310.pyc b/app/core/network/dht/__pycache__/membership.cpython-310.pyc index 7e11208..3acb021 100644 Binary files a/app/core/network/dht/__pycache__/membership.cpython-310.pyc and b/app/core/network/dht/__pycache__/membership.cpython-310.pyc differ diff --git a/app/core/network/dht/__pycache__/prometheus.cpython-310.pyc b/app/core/network/dht/__pycache__/prometheus.cpython-310.pyc index aab32c3..94fd0c7 100644 Binary files a/app/core/network/dht/__pycache__/prometheus.cpython-310.pyc and b/app/core/network/dht/__pycache__/prometheus.cpython-310.pyc differ diff --git a/app/core/network/dht/__pycache__/replication.cpython-310.pyc b/app/core/network/dht/__pycache__/replication.cpython-310.pyc index f4924e0..667191c 100644 Binary files a/app/core/network/dht/__pycache__/replication.cpython-310.pyc and b/app/core/network/dht/__pycache__/replication.cpython-310.pyc differ diff --git a/app/core/network/dht/__pycache__/store.cpython-310.pyc b/app/core/network/dht/__pycache__/store.cpython-310.pyc index 57a229b..060c3b7 100644 Binary files a/app/core/network/dht/__pycache__/store.cpython-310.pyc and b/app/core/network/dht/__pycache__/store.cpython-310.pyc differ diff --git a/app/core/network/dht/config.py b/app/core/network/dht/config.py index ac199a2..ee90d63 100644 --- a/app/core/network/dht/config.py +++ b/app/core/network/dht/config.py @@ -41,6 +41,10 @@ class DHTConfig: window_size: int = _env_int("DHT_METRIC_WINDOW_SEC", 3600) default_q: float = _env_float("DHT_MIN_Q", 0.6) seed_refresh_interval: int = _env_int("DHT_SEED_REFRESH_INTERVAL", 30) + # Gossip / backoff tuning + gossip_interval_sec: int = _env_int("DHT_GOSSIP_INTERVAL_SEC", 30) + gossip_backoff_base_sec: int = _env_int("DHT_GOSSIP_BACKOFF_BASE_SEC", 5) + gossip_backoff_cap_sec: int = _env_int("DHT_GOSSIP_BACKOFF_CAP_SEC", 600) @lru_cache @@ -51,4 +55,3 @@ def load_config() -> DHTConfig: dht_config = load_config() - diff --git a/app/core/network/dht/membership.py b/app/core/network/dht/membership.py index 12412d0..bfb5edb 100644 --- a/app/core/network/dht/membership.py +++ b/app/core/network/dht/membership.py @@ -141,6 +141,23 @@ class MembershipState: return max(max(filtered_reports), local_estimate) return local_estimate + def n_estimate_trusted(self, allowed_ids: set[str]) -> float: + """Оценка размера сети только по trusted узлам. + Берём активных участников, пересекаем с allowed_ids и оцениваем по их числу + и по их N_local репортам (если доступны). + """ + self.report_local_population() + active_trusted = {m["node_id"] for m in self.active_members(include_islands=True) if m.get("node_id") in allowed_ids} + filtered_reports = [ + value for node_id, value in self.n_reports.items() + if node_id in active_trusted and self.reachability_ratio(node_id) >= dht_config.default_q + ] + # Для доверенных полагаемся на фактическое количество активных Trusted + local_estimate = float(len(active_trusted)) + if filtered_reports: + return max(max(filtered_reports), local_estimate) + return local_estimate + def to_dict(self) -> Dict[str, Any]: return { "members": self.members.to_dict(), @@ -216,4 +233,3 @@ class MembershipManager: def active_members(self) -> List[Dict[str, Any]]: return self.state.active_members() - diff --git a/app/core/network/dht/prometheus.py b/app/core/network/dht/prometheus.py index d12f34f..ff973bc 100644 --- a/app/core/network/dht/prometheus.py +++ b/app/core/network/dht/prometheus.py @@ -29,6 +29,10 @@ merge_conflicts = Counter("dht_merge_conflicts_total", "Number of DHT merge conf view_count_total = Gauge("dht_view_count_total", "Total content views per window", ["content_id", "window"]) unique_estimate = Gauge("dht_unique_view_estimate", "Estimated unique viewers per window", ["content_id", "window"]) watch_time_seconds = Gauge("dht_watch_time_seconds", "Aggregate watch time per window", ["content_id", "window"]) +gossip_success = Counter("dht_gossip_success_total", "Successful gossip posts", ["peer"]) +gossip_failure = Counter("dht_gossip_failure_total", "Failed gossip posts", ["peer"]) +gossip_skipped = Counter("dht_gossip_skipped_total", "Skipped gossip posts due to backoff", ["peer", "reason"]) +gossip_backoff = Gauge("dht_gossip_backoff_seconds", "Gossip backoff seconds remaining", ["peer"]) def record_replication_under(content_id: str, have: int) -> None: @@ -51,3 +55,17 @@ def update_view_metrics(content_id: str, window_id: str, views: int, unique: flo view_count_total.labels(content_id=content_id, window=window_id).set(views) unique_estimate.labels(content_id=content_id, window=window_id).set(unique) watch_time_seconds.labels(content_id=content_id, window=window_id).set(watch_time) + + +def record_gossip_success(peer: str) -> None: + gossip_success.labels(peer=peer).inc() + gossip_backoff.labels(peer=peer).set(0) + + +def record_gossip_failure(peer: str, backoff_sec: float) -> None: + gossip_failure.labels(peer=peer).inc() + gossip_backoff.labels(peer=peer).set(backoff_sec) + + +def record_gossip_skipped(peer: str, reason: str) -> None: + gossip_skipped.labels(peer=peer, reason=reason).inc() diff --git a/app/core/network/dht/replication.py b/app/core/network/dht/replication.py index cd24457..61b5487 100644 --- a/app/core/network/dht/replication.py +++ b/app/core/network/dht/replication.py @@ -166,15 +166,20 @@ class ReplicationManager: .to_dict(), ) - def ensure_replication(self, content_id: str, membership: MembershipState, now: Optional[float] = None) -> ReplicationState: + def ensure_replication(self, content_id: str, membership: MembershipState, now: Optional[float] = None, allowed_nodes: Optional[set[str]] = None) -> ReplicationState: now = now or _now() state = self._load_state(content_id) - n_estimate = max(1.0, membership.n_estimate()) + if allowed_nodes is not None and len(allowed_nodes) > 0: + n_estimate = max(1.0, membership.n_estimate_trusted(allowed_nodes)) + else: + n_estimate = max(1.0, membership.n_estimate()) p_value = max(0, round(math.log2(max(n_estimate / dht_config.replication_target, 1.0)))) prefix, _ = bits_from_hex(content_id, p_value) active = membership.active_members(include_islands=True) + if allowed_nodes is not None: + active = [m for m in active if m.get("node_id") in allowed_nodes] responsible = [] for member in active: node_prefix, _total = bits_from_hex(member["node_id"], p_value) @@ -265,6 +270,44 @@ class ReplicationManager: rest = [m for m in active if m["node_id"] not in {n for _, n, *_ in rank(responsible)}] assign_with_diversity(rank(rest)) + # Финальный добор по ASN, если всё ещё не достигли диверсификации + if not state.diversity_satisfied(): + current_asn = {lease.asn for lease in state.leases.values() if lease.asn is not None} + by_asn: Dict[int, List[dict]] = {} + for m in active: + a = m.get('asn') + if a is None: + continue + by_asn.setdefault(int(a), []).append(m) + for a, group in by_asn.items(): + if a in current_asn: + continue + # берём лучшего кандидата этой ASN + score, node_id, asn, ip_octet = min( + ( + (rendezvous_score(content_id, g["node_id"]), g["node_id"], g.get("asn"), g.get("ip_first_octet")) + for g in group + ), + key=lambda item: item[0], + ) + if node_id in leases_by_node: + continue + lease = ReplicaLease( + node_id=node_id, + lease_id=f"{content_id}:{node_id}", + issued_at=now, + expires_at=now + dht_config.lease_ttl, + asn=asn, + ip_first_octet=ip_octet, + heartbeat_at=now, + score=score, + ) + state.assign(lease) + leases_by_node[node_id] = lease + current_asn.add(int(a)) + if state.diversity_satisfied(): + break + # Ensure we do not exceed replication target with duplicates if len(state.leases) > dht_config.replication_target: # Drop lowest scoring leases until target satisfied while preserving diversity criteria diff --git a/app/core/network/dht/store.py b/app/core/network/dht/store.py index c9c3f54..9dc4991 100644 --- a/app/core/network/dht/store.py +++ b/app/core/network/dht/store.py @@ -55,3 +55,86 @@ class DHTStore: def snapshot(self) -> Dict[str, Dict[str, Any]]: return {fp: record.to_payload() | {"signature": record.signature} for fp, record in self._records.items()} + + +# ---- Persistent adapter (DB-backed) ---- +try: + from sqlalchemy import create_engine + from sqlalchemy.orm import sessionmaker + from app.core._config import DATABASE_URL + from app.core.models.dht import DHTRecordRow + + def _sync_engine_url(url: str) -> str: + return url.replace('+asyncpg', '+psycopg2') if '+asyncpg' in url else url + + class PersistentDHTStore(DHTStore): + """DHT хранилище с простейшей синхронной персистенцией в БД.""" + + def __init__(self, node_id: str, signer: Signer, db_url: str | None = None): + super().__init__(node_id, signer) + self._engine = create_engine(_sync_engine_url(db_url or DATABASE_URL), pool_pre_ping=True) + self._Session = sessionmaker(bind=self._engine) + + def _db_get(self, fingerprint: str) -> DHTRecord | None: + with self._Session() as s: + row = s.get(DHTRecordRow, fingerprint) + if not row: + return None + rec = DHTRecord.create( + key=row.key, + fingerprint=row.fingerprint, + value=row.value or {}, + node_id=row.node_id, + logical_counter=row.logical_counter, + signature=row.signature, + timestamp=row.timestamp, + ) + return rec + + def _db_put(self, record: DHTRecord) -> None: + with self._Session() as s: + row = s.get(DHTRecordRow, record.fingerprint) + if not row: + row = DHTRecordRow( + fingerprint=record.fingerprint, + key=record.key, + schema_version=record.schema_version, + logical_counter=record.logical_counter, + timestamp=record.timestamp, + node_id=record.node_id, + signature=record.signature, + value=record.value, + ) + s.add(row) + else: + row.key = record.key + row.schema_version = record.schema_version + row.logical_counter = record.logical_counter + row.timestamp = record.timestamp + row.node_id = record.node_id + row.signature = record.signature + row.value = record.value + s.commit() + + def get(self, fingerprint: str) -> DHTRecord | None: + rec = super().get(fingerprint) + if rec: + return rec + rec = self._db_get(fingerprint) + if rec: + self._records[fingerprint] = rec + return rec + + def put(self, key: str, fingerprint: str, value: Dict[str, Any], logical_counter: int, merge_strategy=latest_wins_merge) -> DHTRecord: + rec = super().put(key, fingerprint, value, logical_counter, merge_strategy) + self._db_put(rec) + return rec + + def merge_record(self, incoming: DHTRecord, merge_strategy=latest_wins_merge) -> DHTRecord: + merged = super().merge_record(incoming, merge_strategy) + self._db_put(merged) + return merged +except Exception: + # Fallback: без SQLAlchemy используем чисто in-memory (для тестов/минимальных окружений) + class PersistentDHTStore(DHTStore): + pass diff --git a/app/core/network/maintenance.py b/app/core/network/maintenance.py index 2ef49cf..bc3baca 100644 --- a/app/core/network/maintenance.py +++ b/app/core/network/maintenance.py @@ -7,6 +7,8 @@ from sqlalchemy import select from app.core.logger import make_log from app.core.models.node_storage import StoredContent from app.core.network.dht import dht_config +from app.core.network.nodes import list_known_public_nodes +import httpx from app.core.storage import db_session @@ -23,9 +25,25 @@ async def replication_daemon(app): async with db_session(auto_commit=False) as session: rows = await session.execute(select(StoredContent.hash)) content_hashes = [row[0] for row in rows.all()] + # Build allowed (trusted) node_ids set + from app.core.models.my_network import KnownNode + from app.core._utils.b58 import b58decode + from app.core.network.dht.crypto import compute_node_id + trusted_rows = (await session.execute(select(KnownNode))).scalars().all() + allowed_nodes = set() + for kn in trusted_rows: + role = (kn.meta or {}).get('role') if kn.meta else None + if role == 'trusted' and kn.public_key: + try: + nid = compute_node_id(b58decode(kn.public_key)) + allowed_nodes.add(nid) + except Exception: + pass + # Always include ourselves + allowed_nodes.add(memory.node_id) for content_hash in content_hashes: try: - state = memory.replication.ensure_replication(content_hash, membership_state) + state = memory.replication.ensure_replication(content_hash, membership_state, allowed_nodes=allowed_nodes) memory.replication.heartbeat(content_hash, memory.node_id) make_log("Replication", f"Replicated {content_hash} leader={state.leader}", level="debug") except Exception as exc: @@ -50,3 +68,74 @@ async def heartbeat_daemon(app): except Exception as exc: make_log("Replication", f"heartbeat failed: {exc}", level="warning") await asyncio.sleep(dht_config.heartbeat_interval) + + +async def dht_gossip_daemon(app): + # Периодически публикуем снимок DHT на известные публичные ноды + await asyncio.sleep(7) + memory = getattr(app.ctx, "memory", None) + if not memory: + return + while True: + try: + # собираем список публичных хостов доверенных узлов + async with db_session(auto_commit=True) as session: + from sqlalchemy import select + from app.core.models.my_network import KnownNode + nodes = (await session.execute(select(KnownNode))).scalars().all() + urls = [] + peers = [] + for n in nodes: + role = (n.meta or {}).get('role') if n.meta else None + if role != 'trusted': + continue + pub = (n.meta or {}).get('public_host') or n.ip + if not pub: + continue + base = pub.rstrip('/') + if not base.startswith('http'): + base = f"http://{base}:{n.port or 80}" + url = base + "/api/v1/dht.put" + urls.append(url) + peers.append(base) + if urls: + snapshot = memory.dht_store.snapshot() + records = list(snapshot.values()) + payload = { + 'public_key': None, # получатель обязует себя проверять подпись записи, не отправителя тут + 'records': records, + } + # public_key не обязателен в пакетном режиме, записи содержат собственные подписи + timeout = httpx.Timeout(5.0, read=10.0) + async with httpx.AsyncClient(timeout=timeout) as client: + # Throttle/backoff per peer + from time import time as _t + from app.core.network.dht.prometheus import record_gossip_success, record_gossip_failure, record_gossip_skipped + if not hasattr(dht_gossip_daemon, '_backoff'): + dht_gossip_daemon._backoff = {} + backoff = dht_gossip_daemon._backoff # type: ignore + for url, peer in zip(urls, peers): + now = _t() + st = backoff.get(peer) or { 'fail': 0, 'next': 0.0 } + if now < st['next']: + record_gossip_skipped(peer, 'backoff') + continue + try: + r = await client.post(url, json=payload) + if 200 <= r.status_code < 300: + backoff[peer] = { 'fail': 0, 'next': 0.0 } + record_gossip_success(peer) + else: + raise Exception(f"HTTP {r.status_code}") + except Exception as exc: + st['fail'] = st.get('fail', 0) + 1 + base = max(1, int(dht_config.gossip_backoff_base_sec)) + cap = max(base, int(dht_config.gossip_backoff_cap_sec)) + wait = min(cap, base * (2 ** max(0, st['fail'] - 1))) + st['next'] = now + wait + backoff[peer] = st + record_gossip_failure(peer, wait) + make_log('DHT.gossip', f'gossip failed {url}: {exc}; backoff {wait}s', level='debug') + except Exception as exc: + make_log('DHT.gossip', f'iteration error: {exc}', level='warning') + await asyncio.sleep(dht_config.gossip_interval_sec)