diff --git a/ENDPOINTS.md b/ENDPOINTS.md index dd8463c..30552b7 100644 --- a/ENDPOINTS.md +++ b/ENDPOINTS.md @@ -48,6 +48,18 @@ TODO: реализовать поиск файла на других нодах 16. GET /api/v1/content.view +17. GET /api/v1/network.info +Возвращает информацию о ноде: id/public_key, version, node_type, metrics, capabilities. + +18. GET /api/v1/network.nodes +Возвращает список известных публичных нод с совместимостью и метаданными. + +19. POST /api/v1/network.handshake +Рукопожатие между нодами. Тело запроса подписано приватным ключом ноды; ответ подписан приватным ключом сервера. +Поля запроса: version, public_key (base58), node_type, metrics, capabilities, timestamp, nonce, signature. +Поле public_host обязательно для public-нод, и опционально/пустое для private-нод. +Поля ответа: compatibility, node, known_public_nodes, timestamp, server_public_key, server_signature (+ warning при несовпадении MINOR). +Private-ноды не сохраняются на стороне принимающей ноды (никакого учета peer-а), но получают список публичных нод и могут синхронизироваться через них. diff --git a/app/__main__.py b/app/__main__.py index aa5960c..6d12449 100644 --- a/app/__main__.py +++ b/app/__main__.py @@ -103,6 +103,7 @@ if __name__ == '__main__': from app.bot import dp as uploader_bot_dp from app.client_bot import dp as client_bot_dp from app.core._config import SANIC_PORT, PROJECT_HOST, DATABASE_URL + from app.core.network.nodes import network_handshake_daemon, bootstrap_once_and_exit_if_failed app.ctx.memory = main_memory for _target in [uploader_bot_dp, client_bot_dp]: @@ -117,6 +118,9 @@ if __name__ == '__main__': app.add_task(queue_daemon(app)) app.add_task(uploader_bot_dp.start_polling(app.ctx.memory._telegram_bot)) app.add_task(client_bot_dp.start_polling(app.ctx.memory._client_telegram_bot)) + # Start network handshake daemon and bootstrap step + app.add_task(network_handshake_daemon(app)) + app.add_task(bootstrap_once_and_exit_if_failed()) app.run(host='0.0.0.0', port=SANIC_PORT) else: @@ -138,6 +142,15 @@ if __name__ == '__main__': elif startup_target == 'convert_process': from app.core.background.convert_service import main_fn as target_fn time.sleep(9) + elif startup_target == 'convert_v3': + from app.core.background.convert_v3_service import main_fn as target_fn + time.sleep(9) + elif startup_target == 'index_scout_v3': + from app.core.background.index_scout_v3 import main_fn as target_fn + time.sleep(7) + elif startup_target == 'derivative_janitor': + from app.core.background.derivative_cache_janitor import main_fn as target_fn + time.sleep(5) startup_fn = startup_fn or target_fn assert startup_fn diff --git a/app/api/__init__.py b/app/api/__init__.py index 64750c5..9c277e0 100644 --- a/app/api/__init__.py +++ b/app/api/__init__.py @@ -15,16 +15,28 @@ app.register_middleware(close_db_session, "response") from app.api.routes._index import s_index, s_favicon from app.api.routes._system import s_api_v1_node, s_api_system_version, s_api_system_send_status, s_api_v1_node_friendly +from app.api.routes.network import ( + s_api_v1_network_info, + s_api_v1_network_nodes, + s_api_v1_network_handshake, +) from app.api.routes.auth import s_api_v1_auth_twa, s_api_v1_auth_select_wallet, s_api_v1_auth_me from app.api.routes.statics import s_api_tonconnect_manifest, s_api_platform_metadata from app.api.routes.node_storage import s_api_v1_storage_post, s_api_v1_storage_get, \ s_api_v1_storage_decode_cid from app.api.routes.progressive_storage import s_api_v1_5_storage_get, s_api_v1_5_storage_post +from app.api.routes.upload_tus import s_api_v1_upload_tus_hook from app.api.routes.account import s_api_v1_account_get from app.api.routes._blockchain import s_api_v1_blockchain_send_new_content_message, \ s_api_v1_blockchain_send_purchase_content_message from app.api.routes.content import s_api_v1_content_list, s_api_v1_content_view, s_api_v1_content_friendly_list, s_api_v1_5_content_list +from app.api.routes.content_index import s_api_v1_content_index, s_api_v1_content_delta +from app.api.routes.derivatives import s_api_v1_content_derivatives +from app.api.routes.admin import s_api_v1_admin_node_setrole from app.api.routes.tonconnect import s_api_v1_tonconnect_new, s_api_v1_tonconnect_logout +from app.api.routes.keys import s_api_v1_keys_request +from app.api.routes.sync import s_api_v1_sync_pin, s_api_v1_sync_status +from app.api.routes.upload_status import s_api_v1_upload_status app.add_route(s_index, "/", methods=["GET", "OPTIONS"]) @@ -34,6 +46,9 @@ app.add_route(s_api_v1_node, "/api/v1/node", methods=["GET", "OPTIONS"]) app.add_route(s_api_v1_node_friendly, "/api/v1/nodeFriendly", methods=["GET", "OPTIONS"]) app.add_route(s_api_system_version, "/api/system.version", methods=["GET", "OPTIONS"]) app.add_route(s_api_system_send_status, "/api/system.sendStatus", methods=["POST", "OPTIONS"]) +app.add_route(s_api_v1_network_info, "/api/v1/network.info", methods=["GET", "OPTIONS"]) +app.add_route(s_api_v1_network_nodes, "/api/v1/network.nodes", methods=["GET", "OPTIONS"]) +app.add_route(s_api_v1_network_handshake, "/api/v1/network.handshake", methods=["POST", "OPTIONS"]) app.add_route(s_api_tonconnect_manifest, "/api/tonconnect-manifest.json", methods=["GET", "OPTIONS"]) app.add_route(s_api_platform_metadata, "/api/platform-metadata.json", methods=["GET", "OPTIONS"]) @@ -61,6 +76,19 @@ app.add_route(s_api_v1_content_list, "/api/v1/content.list", methods=["GET", "OP app.add_route(s_api_v1_content_view, "/api/v1/content.view/", methods=["GET", "OPTIONS"]) app.add_route(s_api_v1_content_friendly_list, "/api/v1/content.friendlyList", methods=["GET", "OPTIONS"]) app.add_route(s_api_v1_5_content_list, "/api/v1.5/content.list", methods=["GET", "OPTIONS"]) +app.add_route(s_api_v1_content_index, "/api/v1/content.index", methods=["GET", "OPTIONS"]) +app.add_route(s_api_v1_content_delta, "/api/v1/content.delta", methods=["GET", "OPTIONS"]) +app.add_route(s_api_v1_content_derivatives, "/api/v1/content.derivatives", methods=["GET", "OPTIONS"]) +app.add_route(s_api_v1_admin_node_setrole, "/api/v1/admin.node.setRole", methods=["POST", "OPTIONS"]) + +# tusd HTTP hooks +app.add_route(s_api_v1_upload_tus_hook, "/api/v1/upload.tus-hook", methods=["POST", "OPTIONS"]) + +# Keys auto-grant +app.add_route(s_api_v1_keys_request, "/api/v1/keys.request", methods=["POST", "OPTIONS"]) +app.add_route(s_api_v1_sync_pin, "/api/v1/sync.pin", methods=["POST", "OPTIONS"]) +app.add_route(s_api_v1_sync_status, "/api/v1/sync.status", methods=["GET", "OPTIONS"]) +app.add_route(s_api_v1_upload_status, "/api/v1/upload.status/", methods=["GET", "OPTIONS"]) @app.exception(BaseException) diff --git a/app/api/routes/_blockchain.py b/app/api/routes/_blockchain.py index dbff5dc..5c41eaf 100644 --- a/app/api/routes/_blockchain.py +++ b/app/api/routes/_blockchain.py @@ -46,7 +46,7 @@ async def s_api_v1_blockchain_send_new_content_message(request): for field_key, field_value in { 'title': lambda x: isinstance(x, str), 'authors': lambda x: isinstance(x, list), - 'content': lambda x: isinstance(x, str), + 'content': lambda x: isinstance(x, str), # may be plaintext CID (legacy) or encrypted IPFS CID (bafy...) 'image': lambda x: isinstance(x, str), 'description': lambda x: isinstance(x, str), 'price': lambda x: (isinstance(x, str) and x.isdigit()), @@ -57,19 +57,27 @@ async def s_api_v1_blockchain_send_new_content_message(request): assert field_key in request.json, f"No {field_key} provided" assert field_value(request.json[field_key]), f"Invalid {field_key} provided" - decrypted_content_cid, err = resolve_content(request.json['content']) - assert not err, f"Invalid content CID" - - # Поиск исходного файла загруженного - decrypted_content = (await request.ctx.db_session.execute( - select(StoredContent).where(StoredContent.hash == decrypted_content_cid.content_hash_b58) - )).scalars().first() - assert decrypted_content, "No content locally found" - assert decrypted_content.type == "local/content_bin", "Invalid content type" - - # Создание фиктивного encrypted_content. Не шифруем для производительности, тк зашифрованная нигде дальше не используется - encrypted_content = await create_encrypted_content(request.ctx.db_session, decrypted_content) - encrypted_content_cid = encrypted_content.cid + # Support legacy: 'content' as decrypted ContentId; and new: 'content' as encrypted IPFS CID + encrypted_content_cid = None + try: + # Legacy path + decrypted_content_cid, err = resolve_content(request.json['content']) + assert not err + decrypted_content = (await request.ctx.db_session.execute( + select(StoredContent).where(StoredContent.hash == decrypted_content_cid.content_hash_b58) + )).scalars().first() + assert decrypted_content and decrypted_content.type == "local/content_bin" + encrypted_content = await create_encrypted_content(request.ctx.db_session, decrypted_content) + encrypted_content_cid = encrypted_content.cid + except BaseException: + # New path: treat provided string as encrypted IPFS CID (ENCF v1) + encrypted_ipfs_cid = request.json['content'] + class _EC: # tiny adapter to mimic .serialize_v2() + def __init__(self, s: str): + self._s = s + def serialize_v2(self, include_accept_type: bool = False): + return self._s + encrypted_content_cid = _EC(encrypted_ipfs_cid) if request.json['image']: image_content_cid, err = resolve_content(request.json['image']) @@ -94,6 +102,19 @@ async def s_api_v1_blockchain_send_new_content_message(request): downloadable=request.json['downloadable'] if 'downloadable' in request.json else False, ) + # Try to update ContentIndexItem with cover_url for this encrypted content + try: + from app.core.models.content_v3 import ContentIndexItem + ecid_str = encrypted_content_cid.serialize_v2() + row = (await request.ctx.db_session.execute(select(ContentIndexItem).where(ContentIndexItem.encrypted_cid == ecid_str))).scalars().first() + if row: + payload = row.payload or {} + payload['cover_url'] = f"{PROJECT_HOST}/api/v1.5/storage/{image_content_cid.serialize_v2()}" if image_content_cid else None + row.payload = payload + await request.ctx.db_session.commit() + except Exception as _e: + make_log("Blockchain", f"index cover update failed: {_e}", level='warning') + royalties_dict = begin_dict(8) i = 0 for royalty_param in request.json['royaltyParams']: diff --git a/app/api/routes/admin.py b/app/api/routes/admin.py new file mode 100644 index 0000000..863046b --- /dev/null +++ b/app/api/routes/admin.py @@ -0,0 +1,42 @@ +from __future__ import annotations + +import os +from sanic import response +from sqlalchemy import select + +from app.core.models.my_network import KnownNode + + +def _auth_ok(request) -> bool: + token = os.getenv('ADMIN_API_TOKEN') + if not token: + return False + auth = request.headers.get('Authorization', '') + return auth.strip() == f"Bearer {token}" + + +async def s_api_v1_admin_node_setrole(request): + if not _auth_ok(request): + return response.json({"error": "UNAUTHORIZED"}, status=401) + data = request.json or {} + role = (data.get('role') or '').strip() + if role not in ('trusted', 'read-only', 'deny'): + return response.json({"error": "BAD_ROLE"}, status=400) + pub = (data.get('public_key') or '').strip() + host = (data.get('host') or '').strip() + if not pub and not host: + return response.json({"error": "MISSING_TARGET"}, status=400) + session = request.ctx.db_session + row = None + if pub: + row = (await session.execute(select(KnownNode).where(KnownNode.public_key == pub))).scalars().first() + if not row and host: + row = (await session.execute(select(KnownNode).where(KnownNode.ip == host))).scalars().first() + if not row: + return response.json({"error": "NOT_FOUND"}, status=404) + meta = row.meta or {} + meta['role'] = role + row.meta = meta + await session.commit() + return response.json({"ok": True, "node": {"ip": row.ip, "public_key": row.public_key, "role": role}}) + diff --git a/app/api/routes/content.py b/app/api/routes/content.py index 24b056d..d758e02 100644 --- a/app/api/routes/content.py +++ b/app/api/routes/content.py @@ -10,6 +10,7 @@ from app.core.models.keys import KnownKey from app.core.models import StarsInvoice from app.core.models.content.user_content import UserContent from app.core._config import CLIENT_TELEGRAM_API_KEY, PROJECT_HOST +from app.core.models.content_v3 import EncryptedContent as ECv3, ContentDerivative as CDv3 import json import uuid @@ -167,7 +168,7 @@ async def s_api_v1_content_view(request, content_address: str): if converted_content: user_content_option = 'low_preview' if have_access: - user_content_option = 'low' # TODO: подключать high если человек внезапно меломан + user_content_option = 'low' converted_content = (await request.ctx.db_session.execute(select(StoredContent).where( StoredContent.hash == converted_content[user_content_option] @@ -175,6 +176,24 @@ async def s_api_v1_content_view(request, content_address: str): if converted_content: display_options['content_url'] = converted_content.web_url opts['content_ext'] = converted_content.filename.split('.')[-1] + else: + # v3 fallback: use derivatives table linked via encrypted_cid from onchain meta + enc_cid = content['encrypted_content'].meta.get('content_cid') or content['encrypted_content'].meta.get('encrypted_cid') + if enc_cid: + ec = (await request.ctx.db_session.execute(select(ECv3).where(ECv3.encrypted_cid == enc_cid))).scalars().first() + if ec: + # choose preview for non-access; low for access + desired = ['decrypted_preview'] if not have_access else ['decrypted_low', 'decrypted_high'] + rows = (await request.ctx.db_session.execute(select(CDv3).where(CDv3.content_id == ec.id, CDv3.status == 'ready'))).scalars().all() + chosen = None + for kind in desired: + chosen = next((r for r in rows if r.kind == kind), None) + if chosen: + break + if chosen and chosen.local_path: + h = chosen.local_path.split('/')[-1] + display_options['content_url'] = f"{PROJECT_HOST}/api/v1.5/storage/{h}" + opts['content_ext'] = (chosen.content_type or '').split('/')[-1] if chosen.content_type else None content_meta = content['encrypted_content'].json_format() from app.core.content.content_id import ContentId diff --git a/app/api/routes/content_index.py b/app/api/routes/content_index.py new file mode 100644 index 0000000..0f73944 --- /dev/null +++ b/app/api/routes/content_index.py @@ -0,0 +1,53 @@ +from __future__ import annotations + +from sanic import response +from sqlalchemy import select +from datetime import datetime + +from app.core.models.content_v3 import ContentIndexItem +from app.core.logger import make_log + + +async def s_api_v1_content_index(request): + rows = (await request.ctx.db_session.execute(select(ContentIndexItem))).scalars().all() + items = [{**r.payload, "encrypted_cid": r.encrypted_cid, "sig": r.sig, "_updated_at": (r.updated_at.isoformat() + 'Z') if r.updated_at else None} for r in rows] + # ETag by max updated_at + count + max_ts = max((it.get("_updated_at") for it in items if it.get("_updated_at")), default="1970-01-01T00:00:00Z") + etag = f'W/"{max_ts}.{len(items)}"' + inm = request.headers.get('If-None-Match') + if inm and inm == etag: + resp = response.empty(status=304) + resp.headers['ETag'] = etag + return resp + for it in items: + it.pop("_updated_at", None) + make_log("content.index", f"items={len(items)} etag={etag}") + resp = response.json({"items": items, "schema": "my-network/index@1"}) + resp.headers['ETag'] = etag + return resp + + +async def s_api_v1_content_delta(request): + since = request.args.get('since') + if not since: + # No since provided → act as full index + return await s_api_v1_content_index(request) + try: + # basic parse + _ = datetime.fromisoformat(since.replace('Z', '+00:00')) + except Exception: + return response.json({"error": "BAD_SINCE"}, status=400) + + rows = (await request.ctx.db_session.execute(select(ContentIndexItem))).scalars().all() + out = [] + max_ts = since + for r in rows: + upd = (r.updated_at.isoformat() + 'Z') if r.updated_at else None + if upd and upd > since: + out.append({**r.payload, "encrypted_cid": r.encrypted_cid, "sig": r.sig}) + if upd > max_ts: + max_ts = upd + resp = response.json({"items": out, "next_since": max_ts, "schema": "my-network/index@1"}) + # Weak ETag for delta response + resp.headers['ETag'] = f'W/"{max_ts}.{len(out)}"' + return resp diff --git a/app/api/routes/derivatives.py b/app/api/routes/derivatives.py new file mode 100644 index 0000000..0cd71e4 --- /dev/null +++ b/app/api/routes/derivatives.py @@ -0,0 +1,33 @@ +from __future__ import annotations + +from sanic import response +from sqlalchemy import select + +from app.core.models.content_v3 import EncryptedContent, ContentDerivative +from app.core._config import PROJECT_HOST + + +async def s_api_v1_content_derivatives(request): + cid = request.args.get('cid') + if not cid: + return response.json({"error": "BAD_REQUEST"}, status=400) + session = request.ctx.db_session + ec = (await session.execute(select(EncryptedContent).where(EncryptedContent.encrypted_cid == cid))).scalars().first() + if not ec: + return response.json({"error": "NOT_FOUND"}, status=404) + rows = (await session.execute(select(ContentDerivative).where(ContentDerivative.content_id == ec.id))).scalars().all() + out = [] + for r in rows: + # Derive /api/v1.5/storage/ from local_path if possible + path_hash = (r.local_path or '').split('/')[-1] + storage_url = f"{PROJECT_HOST}/api/v1.5/storage/{path_hash}" if path_hash else None + out.append({ + 'kind': r.kind, + 'interval': [r.interval_start_ms, r.interval_end_ms] if r.interval_start_ms is not None else None, + 'content_type': r.content_type, + 'size_bytes': r.size_bytes, + 'status': r.status, + 'url': storage_url, + }) + return response.json({'cid': cid, 'derivatives': out}) + diff --git a/app/api/routes/keys.py b/app/api/routes/keys.py new file mode 100644 index 0000000..d0b2d8b --- /dev/null +++ b/app/api/routes/keys.py @@ -0,0 +1,111 @@ +from __future__ import annotations + +import base64 +import json +from datetime import datetime +from typing import Dict, Any + +from base58 import b58encode +from sanic import response +from sqlalchemy import select + +from app.core._secrets import hot_pubkey +from app.core.logger import make_log +from app.core.models.content_v3 import EncryptedContent, ContentKey, KeyGrant +from app.core.network.nodesig import verify_request +from app.core.network.guard import check_rate_limit +from app.core.models.my_network import KnownNode + + +def _b64(b: bytes) -> str: + return base64.b64encode(b).decode() + + +async def s_api_v1_keys_request(request): + # Rate limit per remote IP (reuse handshake limiter) + remote_ip = (request.headers.get('X-Forwarded-For') or request.remote_addr or request.ip or '').split(',')[0].strip() + if not check_rate_limit(request.app.ctx.memory, remote_ip): + return response.json({"error": "RATE_LIMIT"}, status=429) + + # Verify NodeSig + ok, hdr_node, reason = verify_request(request, request.app.ctx.memory) + if not ok: + return response.json({"error": reason or "UNAUTHORIZED"}, status=401) + + data: Dict[str, Any] = request.json or {} + cid = data.get("encrypted_cid") + requester_node = data.get("requestor_node_id") + recipient_box_pub_b64 = data.get("recipient_box_pub") + if not cid or not requester_node or not recipient_box_pub_b64: + return response.json({"error": "BAD_REQUEST"}, status=400) + + if requester_node != hdr_node: + return response.json({"error": "NODE_ID_MISMATCH"}, status=401) + + session = request.ctx.db_session + row = (await session.execute(select(EncryptedContent, ContentKey).join(ContentKey, ContentKey.content_id == EncryptedContent.id).where(EncryptedContent.encrypted_cid == cid))).first() + if not row: + return response.json({"error": "NOT_FOUND"}, status=404) + ec: EncryptedContent = row[0] + ck: ContentKey = row[1] + # Allow only trusted nodes unless explicitly disabled via env + TRUSTED_ONLY = (os.getenv('KEY_AUTO_GRANT_TRUSTED_ONLY', '1') == '1') + if TRUSTED_ONLY: + kn = (await session.execute(select(KnownNode).where(KnownNode.public_key == requester_node))).scalars().first() + role = (kn.meta or {}).get('role') if kn else None + if role != 'trusted': + return response.json({"error": "DENIED_NOT_TRUSTED"}, status=403) + if not ck.allow_auto_grant: + return response.json({"error": "DENIED"}, status=403) + + # Seal the DEK for recipient using libsodium sealed box + try: + import nacl.public + pk = nacl.public.PublicKey(base64.b64decode(recipient_box_pub_b64)) + box = nacl.public.SealedBox(pk) + sealed = box.encrypt(base64.b64decode(ck.key_ciphertext_b64)) + sealed_b64 = _b64(sealed) + except Exception as e: + make_log("keys", f"seal failed: {e}", level="error") + return response.json({"error": "SEAL_FAILED"}, status=500) + + issuer = b58encode(hot_pubkey).decode() + purpose = (data.get('purpose') or 'full') + ttl_sec = int(os.getenv('KEY_GRANT_PREVIEW_TTL_SEC', '0')) if purpose == 'preview' else 0 + grant_body = { + "encrypted_cid": cid, + "to_node_id": requester_node, + "sealed_key_b64": sealed_b64, + "aead_scheme": ec.aead_scheme, + "chunk_bytes": ec.chunk_bytes, + "constraints": {"ttl_sec": ttl_sec, "scope": purpose}, + "issued_at": datetime.utcnow().isoformat(), + "issuer_node_id": issuer, + } + try: + from app.core._crypto.signer import Signer + from app.core._secrets import hot_seed + signer = Signer(hot_seed) + blob = json.dumps(grant_body, sort_keys=True, separators=(",", ":")).encode() + sig = signer.sign(blob) + except Exception: + sig = "" + + grant = KeyGrant( + encrypted_cid=cid, + issuer_node_id=issuer, + to_node_id=requester_node, + sealed_key_b64=sealed_b64, + aead_scheme=ec.aead_scheme, + chunk_bytes=ec.chunk_bytes, + constraints={"ttl_sec": 0, "scope": "full"}, + sig=sig, + ) + session.add(grant) + await session.commit() + grant_row = { + **grant_body, + "sig": sig, + "grant_id": grant.id, + } + return response.json(grant_row) diff --git a/app/api/routes/network.py b/app/api/routes/network.py new file mode 100644 index 0000000..196f23d --- /dev/null +++ b/app/api/routes/network.py @@ -0,0 +1,159 @@ +from __future__ import annotations + +import json +from datetime import datetime +from typing import Dict, Any + +from base58 import b58decode +from sanic import response +from sqlalchemy import select + +from app.core.logger import make_log +from app.core.models.my_network import KnownNode +from app.core.network.constants import CURRENT_PROTOCOL_VERSION, NODE_TYPE_PRIVATE +from app.core.network.config import NODE_PRIVACY +from app.core.network.handshake import build_handshake_payload, compute_node_info, sign_response +from app.core.network.nodes import upsert_known_node, list_known_public_nodes +from app.core.network.semver import compatibility +from app.core.network.guard import check_rate_limit, check_timestamp_fresh, check_and_remember_nonce +from app.core.network.config import HANDSHAKE_TS_TOLERANCE_SEC + + +async def s_api_v1_network_info(request): + async with request.app.ctx.memory.transaction("network.info"): + node = await compute_node_info(request.ctx.db_session) + make_log("Network", "info served") + return response.json({"node": node}) + + +async def s_api_v1_network_nodes(request): + rows = await list_known_public_nodes(request.ctx.db_session) + make_log("Network", f"nodes list count={len(rows)}") + return response.json({ + "count": len(rows), + "nodes": rows, + }) + + +async def s_api_v1_network_handshake(request): + # Handshake accepted regardless of our privacy; private nodes typically have no external endpoint + + # Rate limit per remote IP + remote_ip = (request.headers.get('X-Forwarded-For') or request.remote_addr or request.ip or '').split(',')[0].strip() + if not check_rate_limit(request.app.ctx.memory, remote_ip): + return response.json({"error": "RATE_LIMIT"}, status=429) + + data = request.json or {} + required = ["version", "public_key", "node_type", "metrics", "timestamp", "signature"] + for f in required: + if f not in data: + return response.json({"error": f"Missing field {f}"}, status=400) + # public_host is required for public nodes only + if data.get("node_type") != "private" and not data.get("public_host"): + return response.json({"error": "Missing field public_host"}, status=400) + + # Timestamp freshness + if not check_timestamp_fresh(data.get("timestamp")): + return response.json({"error": "STALE_TIMESTAMP", "tolerance_sec": HANDSHAKE_TS_TOLERANCE_SEC}, status=400) + + # Nonce replay protection (best-effort) + if not data.get("nonce") or not check_and_remember_nonce(request.app.ctx.memory, data.get("public_key"), data.get("nonce")): + return response.json({"error": "NONCE_REPLAY"}, status=400) + + peer_version = str(data.get("version")) + comp = compatibility(peer_version, CURRENT_PROTOCOL_VERSION) + if comp == "blocked": + # We still store the node but respond with 409 + try: + await upsert_known_node( + request.ctx.db_session, + host=data.get("public_host"), + port=int(str(data.get("public_host") or "").split(":")[-1]) if ":" in str(data.get("public_host") or "") else 80, + public_key=str(data.get("public_key")), + meta={ + "version": peer_version, + "compatibility": comp, + "is_public": data.get("node_type", "public") != "private", + "public_host": data.get("public_host"), + "unsupported_last_checked_at": datetime.utcnow().isoformat(), + } + ) + except Exception: + pass + make_log("Handshake", f"Reject incompatible peer {data.get('public_host')} peer={peer_version} current={CURRENT_PROTOCOL_VERSION}") + return response.json({ + "error": "INCOMPATIBLE_VERSION", + "compatibility": comp, + "current": CURRENT_PROTOCOL_VERSION, + "peer": peer_version, + }, status=409) + + # Verify signature + try: + # Verify signature over the entire payload except the signature itself + signed_fields = {k: v for (k, v) in data.items() if k != "signature"} + blob = json.dumps(signed_fields, sort_keys=True, separators=(",", ":")).encode() + import nacl.signing, nacl.encoding + vk = nacl.signing.VerifyKey(b58decode(data["public_key"])) + sig = b58decode(data["signature"]) + vk.verify(blob, sig) + ok = True + except Exception: + ok = False + if not ok: + make_log("Handshake", f"Signature verification failed from {data.get('public_host')}", level='warning') + return response.json({"error": "BAD_SIGNATURE"}, status=400) + + # Upsert node and respond with our info + known public nodes + # Do not persist private peers (ephemeral) + if data.get("node_type") != "private" and data.get("public_host"): + try: + await upsert_known_node( + request.ctx.db_session, + host=data.get("public_host"), + port=int(str(data.get("public_host") or "").split(":")[-1]) if ":" in str(data.get("public_host") or "") else 80, + public_key=str(data.get("public_key")), + meta={ + "version": peer_version, + "compatibility": comp, + "is_public": True, + "public_host": data.get("public_host"), + "last_metrics": data.get("metrics", {}), + "capabilities": data.get("capabilities", {}), + } + ) + except Exception as e: + make_log("Handshake", f"Upsert peer failed: {e}", level='warning') + + # Merge advertised peers from the caller (optional field) + for n in data.get("known_public_nodes", []) or []: + try: + await upsert_known_node( + request.ctx.db_session, + host=n.get("public_host") or n.get("host"), + port=int(n.get("port") or 80), + public_key=n.get("public_key") or "", + meta={ + "version": n.get("version") or "0.0.0", + "compatibility": compatibility(n.get("version") or "0.0.0", CURRENT_PROTOCOL_VERSION), + "is_public": True, + "public_host": n.get("public_host") or n.get("host"), + "capabilities": n.get("capabilities") or {}, + } + ) + except Exception: + pass + + node = await compute_node_info(request.ctx.db_session) + known = await list_known_public_nodes(request.ctx.db_session) + resp = sign_response({ + "compatibility": comp, + "node": node, + "known_public_nodes": known, + }) + make_log("Handshake", f"OK with {data.get('public_host')} compat={comp}") + status = 200 + if comp == "warning": + status = 200 + resp["warning"] = "MINOR version differs; proceed with caution" + return response.json(resp, status=status) diff --git a/app/api/routes/progressive_storage.py b/app/api/routes/progressive_storage.py index 5ddc0d9..91ca50e 100644 --- a/app/api/routes/progressive_storage.py +++ b/app/api/routes/progressive_storage.py @@ -14,6 +14,7 @@ from app.core.logger import make_log from sqlalchemy import select from app.core.models.node_storage import StoredContent from app.core._config import UPLOADS_DIR +from app.core.models.content_v3 import ContentDerivative from app.core._utils.resolve_content import resolve_content @@ -205,6 +206,15 @@ async def s_api_v1_5_storage_get(request, file_hash): file_size = os.path.getsize(final_path) range_header = request.headers.get("Range") + # touch derivative last_access_at if exists + try: + cd = (await request.ctx.db_session.execute(select(ContentDerivative).where(ContentDerivative.local_path.like(f"%/{file_hash}")))).scalars().first() + if cd: + cd.last_access_at = datetime.utcnow() + await request.ctx.db_session.commit() + except Exception: + pass + if range_header: make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Processing Range: {range_header}", level="DEBUG") range_spec = range_header.strip().lower() diff --git a/app/api/routes/sync.py b/app/api/routes/sync.py new file mode 100644 index 0000000..b53740c --- /dev/null +++ b/app/api/routes/sync.py @@ -0,0 +1,70 @@ +from __future__ import annotations + +from datetime import datetime +from sanic import response +from sqlalchemy import select + +from app.core.ipfs_client import pin_add, pin_ls +from app.core.logger import make_log +from app.core.models.content_v3 import EncryptedContent, IpfsSync +from app.core.network.nodesig import verify_request +from app.core.network.guard import check_rate_limit + + +async def s_api_v1_sync_pin(request): + # Rate limit per IP and require NodeSig for POST + remote_ip = (request.headers.get('X-Forwarded-For') or request.remote_addr or request.ip or '').split(',')[0].strip() + if not check_rate_limit(request.app.ctx.memory, remote_ip): + return response.json({"error": "RATE_LIMIT"}, status=429) + + ok, node_id, reason = verify_request(request, request.app.ctx.memory) + if not ok: + return response.json({"error": reason or "UNAUTHORIZED"}, status=401) + + data = request.json or {} + cid = data.get("encrypted_cid") + if not cid: + return response.json({"error": "BAD_REQUEST"}, status=400) + + session = request.ctx.db_session + row = (await session.execute(select(EncryptedContent).where(EncryptedContent.encrypted_cid == cid))).scalars().first() + if not row: + # create record with minimal info (unknown meta) + row = EncryptedContent( + encrypted_cid=cid, + title=cid, + description="", + content_type="application/octet-stream", + preview_enabled=False, + ) + session.add(row) + await session.flush() + sync = (await session.execute(select(IpfsSync).where(IpfsSync.content_id == row.id))).scalars().first() + if not sync: + sync = IpfsSync(content_id=row.id, pin_state='queued') + session.add(sync) + await session.flush() + + try: + await pin_add(cid, recursive=True) + sync.pin_state = 'pinned' + sync.pinned_at = datetime.utcnow() + except Exception as e: + make_log("sync", f"pin failed: {e}", level="error") + sync.pin_state = 'failed' + sync.pin_error = str(e) + await session.commit() + return response.json({"ok": True, "state": sync.pin_state}) + + +async def s_api_v1_sync_status(request): + cid = request.args.get("cid") + if not cid: + return response.json({"error": "BAD_REQUEST"}, status=400) + try: + info = await pin_ls(cid) + state = 'pinned' if info else 'not_pinned' + except Exception: + state = 'not_pinned' + info = {} + return response.json({"cid": cid, "state": state, "info": info}) diff --git a/app/api/routes/upload_status.py b/app/api/routes/upload_status.py new file mode 100644 index 0000000..c495004 --- /dev/null +++ b/app/api/routes/upload_status.py @@ -0,0 +1,17 @@ +from sanic import response +from app.core.models.content_v3 import UploadSession + + +async def s_api_v1_upload_status(request, upload_id: str): + session = request.ctx.db_session + row = await session.get(UploadSession, upload_id) + if not row: + return response.json({"error": "NOT_FOUND"}, status=404) + return response.json({ + "id": row.id, + "state": row.state, + "encrypted_cid": row.encrypted_cid, + "size_bytes": row.size_bytes, + "error": row.error, + }) + diff --git a/app/api/routes/upload_tus.py b/app/api/routes/upload_tus.py new file mode 100644 index 0000000..43ef520 --- /dev/null +++ b/app/api/routes/upload_tus.py @@ -0,0 +1,191 @@ +from __future__ import annotations + +import base64 +import json +import os +from datetime import datetime +from typing import Dict, Any + +from base58 import b58encode +from sanic import response + +from app.core._secrets import hot_pubkey +from app.core.crypto.aes_gcm_siv_stream import encrypt_file_to_encf +from app.core.crypto.aesgcm_stream import CHUNK_BYTES +from app.core.ipfs_client import add_streamed_file +from app.core.logger import make_log +from app.core.models.content_v3 import EncryptedContent, ContentKey, IpfsSync, ContentIndexItem, UploadSession +from app.core.storage import db_session + + +def _b64(s: bytes) -> str: + return base64.b64encode(s).decode() + + +async def s_api_v1_upload_tus_hook(request): + """ + tusd HTTP hook endpoint. We mainly handle post-finish to: encrypt -> IPFS add+pin -> record DB. + """ + try: + payload: Dict[str, Any] = request.json or {} + except Exception: + payload = {} + event = payload.get("Type") or payload.get("type") or payload.get("Event") or payload.get("event") + upload = payload.get("Upload") or payload.get("upload") or {} + + if not event: + return response.json({"ok": False, "error": "NO_EVENT"}, status=400) + + if event not in ("post-finish", "postfinish"): + # accept but ignore other events + return response.json({"ok": True}) + + # Extract storage path from tusd payload + storage = upload.get("Storage") or {} + file_path = storage.get("Path") or storage.get("path") + if not file_path: + return response.json({"ok": False, "error": "NO_STORAGE_PATH"}, status=400) + + meta = upload.get("MetaData") or {} + # Common metadata keys + title = meta.get("title") or meta.get("Title") or meta.get("name") or "Untitled" + description = meta.get("description") or meta.get("Description") or "" + content_type = meta.get("content_type") or meta.get("Content-Type") or "application/octet-stream" + preview_enabled = content_type.startswith("audio/") or content_type.startswith("video/") + # Optional preview window overrides from tus metadata + try: + start_ms = int(meta.get("preview_start_ms") or 0) + dur_ms = int(meta.get("preview_duration_ms") or 30000) + except Exception: + start_ms, dur_ms = 0, 30000 + + # Record/Update upload session + upload_id = upload.get("ID") or upload.get("Id") or upload.get("id") + try: + size = int(upload.get("Size") or 0) + except Exception: + size = None + + async with db_session() as session: + us = (await session.get(UploadSession, upload_id)) if upload_id else None + if not us and upload_id: + us = UploadSession( + id=upload_id, + filename=os.path.basename(file_path), + size_bytes=size, + state='processing', + encrypted_cid=None, + ) + session.add(us) + await session.commit() + + # Read & encrypt by streaming (ENCF v1 / AES-SIV) + # Generate per-content random DEK and salt + dek = os.urandom(32) + salt = os.urandom(16) + key_fpr = b58encode(hot_pubkey).decode() # fingerprint as our node id for now + + # Stream encrypt into IPFS add + try: + with open(file_path, 'rb') as f: + result = await add_streamed_file( + encrypt_file_to_encf(f, dek, CHUNK_BYTES, salt), + filename=os.path.basename(file_path), + params={}, + ) + except Exception as e: + make_log("tus-hook", f"Encrypt+add failed: {e}", level="error") + # mark failed + async with db_session() as session: + if upload_id: + us = await session.get(UploadSession, upload_id) + if us: + us.state = 'failed' + us.error = str(e) + await session.commit() + return response.json({"ok": False, "error": "ENCRYPT_ADD_FAILED"}, status=500) + + encrypted_cid = result.get("Hash") + try: + enc_size = int(result.get("Size") or 0) + except Exception: + enc_size = None + + # Persist records + async with db_session() as session: + ec = EncryptedContent( + encrypted_cid=encrypted_cid, + title=title, + description=description, + content_type=content_type, + enc_size_bytes=enc_size, + plain_size_bytes=os.path.getsize(file_path), + preview_enabled=preview_enabled, + preview_conf=({"duration_ms": dur_ms, "intervals": [[start_ms, start_ms + dur_ms]]} if preview_enabled else {}), + aead_scheme="AES_GCM_SIV", + chunk_bytes=CHUNK_BYTES, + salt_b64=_b64(salt), + ) + session.add(ec) + await session.flush() + + ck = ContentKey( + content_id=ec.id, + key_ciphertext_b64=_b64(dek), # NOTE: should be wrapped by local KEK; simplified for PoC + key_fingerprint=key_fpr, + issuer_node_id=key_fpr, + allow_auto_grant=True, + ) + session.add(ck) + + sync = IpfsSync( + content_id=ec.id, + pin_state='pinned', + bytes_total=enc_size, + bytes_fetched=enc_size, + pinned_at=datetime.utcnow(), + ) + session.add(sync) + + # Publish signed index item + item = { + "encrypted_cid": encrypted_cid, + "title": title, + "description": description, + "content_type": content_type, + "size_bytes": enc_size, + "preview_enabled": preview_enabled, + "preview_conf": ec.preview_conf, + "issuer_node_id": key_fpr, + "salt_b64": _b64(salt), + } + try: + from app.core._crypto.signer import Signer + from app.core._secrets import hot_seed + signer = Signer(hot_seed) + blob = json.dumps(item, sort_keys=True, separators=(",", ":")).encode() + sig = signer.sign(blob) + except Exception: + sig = "" + session.add(ContentIndexItem(encrypted_cid=encrypted_cid, payload=item, sig=sig)) + + await session.commit() + + # Update upload session with result and purge staging to avoid duplicates + async with db_session() as session: + if upload_id: + us = await session.get(UploadSession, upload_id) + if us: + us.state = 'pinned' + us.encrypted_cid = encrypted_cid + # prefer using IPFS for downstream conversion; remove staging + try: + if file_path and os.path.exists(file_path): + os.remove(file_path) + except Exception: + pass + us.storage_path = None + await session.commit() + + make_log("tus-hook", f"Uploaded+encrypted {file_path} -> {encrypted_cid}") + return response.json({"ok": True, "encrypted_cid": encrypted_cid, "upload_id": upload_id}) diff --git a/app/core/background/convert_v3_service.py b/app/core/background/convert_v3_service.py new file mode 100644 index 0000000..ddcd8ad --- /dev/null +++ b/app/core/background/convert_v3_service.py @@ -0,0 +1,307 @@ +import asyncio +import os +import json +import shutil +from datetime import datetime +from typing import List, Tuple + +from sqlalchemy import select + +from app.core.logger import make_log +from app.core.storage import db_session +from app.core._config import UPLOADS_DIR, BACKEND_LOGS_DIR_HOST +from app.core.models.content_v3 import ( + EncryptedContent, + ContentKey, + ContentDerivative, + UploadSession, +) +from app.core.models.node_storage import StoredContent +from app.core.ipfs_client import cat_stream +from app.core.crypto.aesgcm_stream import CHUNK_BYTES +from app.core.crypto.encf_stream import decrypt_encf_auto +from app.core.network.key_client import request_key_from_peer +from app.core.models.my_network import KnownNode + + +CONCURRENCY = int(os.getenv("CONVERT_V3_MAX_CONCURRENCY", "3")) + + +def _ensure_dir(path: str): + try: + os.makedirs(path, exist_ok=True) + except Exception: + pass + + +async def _sha256_b58(file_path: str) -> str: + import hashlib + import base58 + h = hashlib.sha256() + with open(file_path, 'rb') as f: + for chunk in iter(lambda: f.read(2 * 1024 * 1024), b''): + h.update(chunk) + return base58.b58encode(h.digest()).decode() + + +async def _save_derivative(file_path: str, filename: str) -> Tuple[str, int]: + """Move file into UPLOADS_DIR under sha256 b58 name; return (hash_b58, size).""" + file_hash = await _sha256_b58(file_path) + dst = os.path.join(UPLOADS_DIR, file_hash) + try: + os.remove(dst) + except FileNotFoundError: + pass + shutil.move(file_path, dst) + size = os.path.getsize(dst) + return file_hash, size + + +async def _run_media_converter(input_host_path: str, input_ext: str, quality: str, trim_value: str | None, is_audio: bool) -> Tuple[str, dict]: + rid = __import__('uuid').uuid4().hex[:8] + output_dir_container = f"/tmp/conv_{rid}" + output_dir_host = f"/tmp/conv_{rid}" + _ensure_dir(output_dir_host) + logs_dir_host = BACKEND_LOGS_DIR_HOST + _ensure_dir(logs_dir_host) + + cmd = [ + "docker", "run", "--rm", + "-v", f"{input_host_path}:/app/input:ro", + "-v", f"{output_dir_host}:/app/output", + "-v", f"{logs_dir_host}:/app/logs", + "media_converter", + "--ext", input_ext, + "--quality", quality, + ] + if trim_value: + cmd.extend(["--trim", trim_value]) + if is_audio: + cmd.append("--audio-only") + + proc = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + stdout, stderr = await proc.communicate() + if proc.returncode != 0: + raise RuntimeError(f"media_converter failed: {stderr.decode()}") + + # Find produced media file and optional output.json + try: + files = os.listdir(output_dir_host) + except Exception as e: + raise RuntimeError(f"Read output dir error: {e}") + media_files = [f for f in files if f != "output.json"] + if len(media_files) != 1: + raise RuntimeError(f"Expected one media file, found {len(media_files)}: {media_files}") + output_media = os.path.join(output_dir_host, media_files[0]) + ffprobe_meta = {} + out_json = os.path.join(output_dir_host, "output.json") + if os.path.exists(out_json): + try: + with open(out_json, 'r') as f: + ffprobe_meta = json.load(f) + except Exception: + ffprobe_meta = {} + return output_media, ffprobe_meta + + +async def _convert_content(ec: EncryptedContent, input_host_path: str): + content_kind = 'audio' if ec.content_type.startswith('audio/') else ('video' if ec.content_type.startswith('video/') else 'other') + if content_kind == 'other': + return + + input_ext = (ec.content_type.split('/')[-1] or 'bin') + is_audio = content_kind == 'audio' + # Required outputs + required = ['high', 'low', 'low_preview'] + + # Preview interval + conf = ec.preview_conf or {} + intervals = conf.get('intervals') or [[0, int(conf.get('duration_ms', 30000))]] + main_interval = intervals[0] + trim_value = None + start_s = max(0, int(main_interval[0]) // 1000) + dur_s = max(1, int((main_interval[1] - main_interval[0]) // 1000) or 30) + trim_value = f"{start_s},{dur_s}" + + qualities = { + 'high': 'high', + 'low': 'low', + 'low_preview': 'low', + } + + for opt in required: + try: + # Mark derivative processing + async with db_session() as session: + cd = ContentDerivative( + content_id=ec.id, + kind=f"decrypted_{opt if opt != 'low_preview' else 'preview'}", + interval_start_ms=main_interval[0] if opt == 'low_preview' else None, + interval_end_ms=main_interval[1] if opt == 'low_preview' else None, + local_path="", + status='processing', + ) + session.add(cd) + await session.commit() + + out_path, ffprobe = await _run_media_converter( + input_host_path=input_host_path, + input_ext=input_ext, + quality=qualities[opt], + trim_value=trim_value if opt == 'low_preview' else None, + is_audio=is_audio, + ) + + # Save into store and StoredContent + file_hash, size_bytes = await _save_derivative(out_path, os.path.basename(out_path)) + + async with db_session() as session: + sc = StoredContent( + type="local/content_bin", + hash=file_hash, + user_id=None, + filename=os.path.basename(out_path), + meta={'encrypted_cid': ec.encrypted_cid, 'kind': opt, 'ffprobe_meta': ffprobe}, + created=datetime.utcnow(), + ) + session.add(sc) + await session.flush() + + # Update derivative record + cd = (await session.execute(select(ContentDerivative).where( + ContentDerivative.content_id == ec.id, + ContentDerivative.kind == (f"decrypted_{opt if opt != 'low_preview' else 'preview'}"), + ContentDerivative.status == 'processing' + ))).scalars().first() + if cd: + cd.local_path = os.path.join(UPLOADS_DIR, file_hash) + cd.size_bytes = size_bytes + cd.content_type = ('audio/mpeg' if is_audio else 'video/mp4') if opt != 'high' else ec.content_type + cd.status = 'ready' + await session.commit() + + make_log('convert_v3', f"Converted {ec.encrypted_cid} opt={opt} -> {file_hash}") + except Exception as e: + make_log('convert_v3', f"Convert error {ec.encrypted_cid} opt={opt}: {e}", level='error') + async with db_session() as session: + cd = ContentDerivative( + content_id=ec.id, + kind=f"decrypted_{opt if opt != 'low_preview' else 'preview'}", + status='failed', + error=str(e), + local_path="", + ) + session.add(cd) + await session.commit() + + +async def _pick_pending(limit: int) -> List[Tuple[EncryptedContent, str]]: + async with db_session() as session: + # Find A/V contents with preview_enabled and no ready low/low_preview derivatives yet + ecs = (await session.execute(select(EncryptedContent).where( + EncryptedContent.preview_enabled == True + ).order_by(EncryptedContent.created_at.desc()))).scalars().all() + + picked: List[Tuple[EncryptedContent, str]] = [] + for ec in ecs: + # Check if derivatives already ready + rows = (await session.execute(select(ContentDerivative).where(ContentDerivative.content_id == ec.id))).scalars().all() + kinds_ready = {r.kind for r in rows if r.status == 'ready'} + required = {'decrypted_low', 'decrypted_high'} if ec.content_type.startswith('audio/') else {'decrypted_low', 'decrypted_high', 'decrypted_preview'} + if required.issubset(kinds_ready): + continue + # Always decrypt from IPFS using local or remote key + storage_path: str | None = None + ck = (await session.execute(select(ContentKey).where(ContentKey.content_id == ec.id))).scalars().first() + if ck: + storage_path = await stage_plain_from_ipfs(ec, ck.key_ciphertext_b64) + if not storage_path: + peers = (await session.execute(select(KnownNode))).scalars().all() + for peer in peers: + base_url = f"http://{peer.ip}:{peer.port}" + dek = await request_key_from_peer(base_url, ec.encrypted_cid) + if not dek: + continue + import base64 + dek_b64 = base64.b64encode(dek).decode() + session_ck = ContentKey( + content_id=ec.id, + key_ciphertext_b64=dek_b64, + key_fingerprint=peer.public_key, + issuer_node_id=peer.public_key, + allow_auto_grant=True, + ) + session.add(session_ck) + await session.commit() + storage_path = await stage_plain_from_ipfs(ec, dek_b64) + if storage_path: + break + if not storage_path or not os.path.exists(storage_path): + continue + picked.append((ec, storage_path)) + if len(picked) >= limit: + break + return picked + + +async def worker_loop(): + sem = asyncio.Semaphore(CONCURRENCY) + + async def _run_one(ec: EncryptedContent, input_path: str): + async with sem: + try: + await _convert_content(ec, input_path) + # After successful conversion, attempt to remove staging file to avoid duplicates + try: + if input_path and input_path.startswith("/data/") and os.path.exists(input_path): + os.remove(input_path) + except Exception: + pass + except Exception as e: + make_log('convert_v3', f"job error {ec.encrypted_cid}: {e}", level='error') + + while True: + try: + batch = await _pick_pending(limit=CONCURRENCY * 2) + if not batch: + await asyncio.sleep(3) + continue + tasks = [asyncio.create_task(_run_one(ec, path)) for (ec, path) in batch] + await asyncio.gather(*tasks) + except Exception as e: + make_log('convert_v3', f"loop error: {e}", level='error') + await asyncio.sleep(2) + + +async def main_fn(memory): + make_log('convert_v3', f"Service started with concurrency={CONCURRENCY}", level='info') + await worker_loop() + + +async def stage_plain_from_ipfs(ec: EncryptedContent, dek_b64: str) -> str | None: + """Download encrypted ENCF stream from IPFS and decrypt on the fly into a temp file.""" + import base64, tempfile + dek = base64.b64decode(dek_b64) + tmp = tempfile.NamedTemporaryFile(prefix=f"dec_{ec.encrypted_cid[:8]}_", delete=False) + tmp_path = tmp.name + tmp.close() + try: + async def _aiter(): + async for ch in cat_stream(ec.encrypted_cid): + yield ch + await decrypt_encf_auto(_aiter(), dek, tmp_path) + return tmp_path + except Exception as e: + make_log('convert_v3', f"decrypt from ipfs failed: {e}", level='error') + try: + os.remove(tmp_path) + except Exception: + pass + return None + + + diff --git a/app/core/background/derivative_cache_janitor.py b/app/core/background/derivative_cache_janitor.py new file mode 100644 index 0000000..70d0707 --- /dev/null +++ b/app/core/background/derivative_cache_janitor.py @@ -0,0 +1,87 @@ +import asyncio +import os +from datetime import datetime, timedelta + +from sqlalchemy import select + +from app.core.logger import make_log +from app.core.storage import db_session +from app.core.models.content_v3 import ContentDerivative + + +MAX_GB = float(os.getenv('DERIVATIVE_CACHE_MAX_GB', '50')) +TTL_DAYS = int(os.getenv('DERIVATIVE_CACHE_TTL_DAYS', '30')) +INTERVAL_SEC = int(os.getenv('DERIVATIVE_JANITOR_INTERVAL_SEC', '600')) + + +async def _current_total_size() -> int: + async with db_session() as session: + rows = (await session.execute(select(ContentDerivative).where(ContentDerivative.status == 'ready'))).scalars().all() + return sum(int(r.size_bytes or 0) for r in rows) + + +async def _evict_over_ttl(now: datetime) -> int: + removed = 0 + if TTL_DAYS <= 0: + return 0 + async with db_session() as session: + rows = (await session.execute(select(ContentDerivative).where(ContentDerivative.status == 'ready'))).scalars().all() + for r in rows: + la = r.last_access_at or r.created_at + if la and (now - la) > timedelta(days=TTL_DAYS): + try: + if r.local_path and os.path.exists(r.local_path): + os.remove(r.local_path) + except Exception: + pass + r.status = 'pending' + r.local_path = None + r.size_bytes = None + r.last_access_at = None + removed += 1 + await session.commit() + return removed + + +async def _evict_to_fit(): + limit_bytes = int(MAX_GB * (1024 ** 3)) + total = await _current_total_size() + if total <= limit_bytes: + return 0 + to_remove = total - limit_bytes + removed = 0 + async with db_session() as session: + # Oldest first by last_access_at + rows = (await session.execute(select(ContentDerivative).where(ContentDerivative.status == 'ready'))).scalars().all() + rows.sort(key=lambda r: (r.last_access_at or r.created_at or datetime.utcfromtimestamp(0))) + for r in rows: + if to_remove <= 0: + break + size = int(r.size_bytes or 0) + try: + if r.local_path and os.path.exists(r.local_path): + os.remove(r.local_path) + except Exception: + pass + r.status = 'pending' + r.local_path = None + r.last_access_at = None + r.size_bytes = None + await session.commit() + to_remove -= size + removed += 1 + return removed + + +async def main_fn(memory): + make_log('derivative_janitor', f"Started (MAX_GB={MAX_GB}, TTL_DAYS={TTL_DAYS})", level='info') + while True: + try: + now = datetime.utcnow() + r1 = await _evict_over_ttl(now) + r2 = await _evict_to_fit() + if r1 or r2: + make_log('derivative_janitor', f"Evicted: ttl={r1}, fit={r2}") + except Exception as e: + make_log('derivative_janitor', f"Error: {e}", level='error') + await asyncio.sleep(INTERVAL_SEC) diff --git a/app/core/background/index_scout_v3.py b/app/core/background/index_scout_v3.py new file mode 100644 index 0000000..fab1bd8 --- /dev/null +++ b/app/core/background/index_scout_v3.py @@ -0,0 +1,177 @@ +import asyncio +from typing import List + +import httpx +import random +import shutil +from sqlalchemy import select + +from app.core.logger import make_log +from app.core.storage import db_session +from app.core.models.my_network import KnownNode +from app.core.models.content_v3 import EncryptedContent, ContentDerivative +from app.core.ipfs_client import pin_add, find_providers, swarm_connect + + +INTERVAL_SEC = 60 +PIN_CONCURRENCY = int(os.getenv('SYNC_MAX_CONCURRENT_PINS', '4')) +DISK_WATERMARK_PCT = int(os.getenv('SYNC_DISK_LOW_WATERMARK_PCT', '90')) + + +async def fetch_index(base_url: str, etag: str | None, since: str | None) -> tuple[List[dict], str | None]: + try: + headers = {} + params = {} + if since: + params['since'] = since + url = f"{base_url.rstrip('/')}/api/v1/content.delta" if since else f"{base_url.rstrip('/')}/api/v1/content.index" + if etag: + headers['If-None-Match'] = etag + async with httpx.AsyncClient(timeout=20) as client: + r = await client.get(url, headers=headers, params=params) + if r.status_code != 200: + if r.status_code == 304: + return [], etag + return [], etag + j = r.json() + new_etag = r.headers.get('ETag') or etag + return j.get('items') or [], (j.get('next_since') or new_etag or etag) + except Exception: + return [], etag + + +async def upsert_content(item: dict): + cid = item.get('encrypted_cid') + if not cid: + return + async with db_session() as session: + row = (await session.execute(select(EncryptedContent).where(EncryptedContent.encrypted_cid == cid))).scalars().first() + if not row: + row = EncryptedContent( + encrypted_cid=cid, + title=item.get('title') or cid, + description=item.get('description') or '', + content_type=item.get('content_type') or 'application/octet-stream', + enc_size_bytes=item.get('size_bytes'), + preview_enabled=bool(item.get('preview_enabled')), + preview_conf=item.get('preview_conf') or {}, + salt_b64=item.get('salt_b64'), + ) + session.add(row) + else: + row.title = item.get('title') or row.title + row.description = item.get('description') or row.description + row.content_type = item.get('content_type') or row.content_type + row.enc_size_bytes = item.get('size_bytes') or row.enc_size_bytes + row.preview_enabled = bool(item.get('preview_enabled')) if item.get('preview_enabled') is not None else row.preview_enabled + if item.get('preview_conf'): + row.preview_conf = item['preview_conf'] + if item.get('salt_b64'): + row.salt_b64 = item['salt_b64'] + await session.commit() + + # Fetch thumbnail via HTTP if provided and not present locally + cover_url = item.get('cover_url') + if cover_url: + try: + async with db_session() as session: + ec = (await session.execute(select(EncryptedContent).where(EncryptedContent.encrypted_cid == cid))).scalars().first() + have_thumb = (await session.execute(select(ContentDerivative).where(ContentDerivative.content_id == ec.id, ContentDerivative.kind == 'decrypted_thumbnail', ContentDerivative.status == 'ready'))).scalars().first() + if not have_thumb: + import httpx, tempfile, os + async with httpx.AsyncClient(timeout=30) as client: + r = await client.get(cover_url) + r.raise_for_status() + tmp = tempfile.NamedTemporaryFile(delete=False) + tmp.write(r.content) + tmp.close() + # Save into store + from app.core.background.convert_v3_service import _save_derivative + h, size = await _save_derivative(tmp.name, os.path.basename(cover_url) or 'thumb.jpg') + cd = ContentDerivative( + content_id=ec.id, + kind='decrypted_thumbnail', + local_path=os.path.join(os.getenv('UPLOADS_DIR', '/app/data'), h), + content_type=r.headers.get('Content-Type') or 'image/jpeg', + size_bytes=size, + status='ready', + ) + session.add(cd) + await session.commit() + except Exception as e: + make_log('index_scout_v3', f"thumbnail fetch failed for {cid}: {e}", level='warning') + + +async def main_fn(memory): + make_log('index_scout_v3', 'Service started', level='info') + sem = asyncio.Semaphore(PIN_CONCURRENCY) + while True: + try: + async with db_session() as session: + nodes = (await session.execute(select(KnownNode))).scalars().all() + for n in nodes: + base = f"http://{n.ip}:{n.port}" + # jitter 0..30s per node to reduce stampede + await asyncio.sleep(random.uniform(0, 30)) + etag = (n.meta or {}).get('index_etag') + since = (n.meta or {}).get('index_since') + items, marker = await fetch_index(base, etag, since) + if not items and marker == etag: + continue + # update node markers + try: + async with db_session() as session: + row = (await session.execute(select(KnownNode).where(KnownNode.id == n.id))).scalars().first() + if row: + meta = row.meta or {} + meta['index_etag'] = marker + meta['index_since'] = marker if (marker and 'T' in str(marker)) else meta.get('index_since') + row.meta = meta + await session.commit() + except Exception: + pass + + if not items: + continue + make_log('index_scout_v3', f"Fetched {len(items)} from {base}") + + # Check disk watermark + try: + from app.core._config import UPLOADS_DIR + du = shutil.disk_usage(UPLOADS_DIR) + used_pct = int(100 * (1 - du.free / du.total)) + if used_pct >= DISK_WATERMARK_PCT: + make_log('index_scout_v3', f"Disk watermark reached ({used_pct}%), skipping pins") + continue + except Exception: + pass + + async def _pin_one(cid: str): + async with sem: + try: + # Try to pre-connect to discovered providers + try: + provs = await find_providers(cid, max_results=5) + for p in provs: + for addr in (p.get('addrs') or [])[:2]: + try: + await swarm_connect(addr) + except Exception: + pass + except Exception: + pass + await pin_add(cid, recursive=True) + except Exception as e: + make_log('index_scout_v3', f"pin {cid} failed: {e}", level='warning') + + tasks = [] + for it in items: + await upsert_content(it) + cid = it.get('encrypted_cid') + if cid: + tasks.append(asyncio.create_task(_pin_one(cid))) + if tasks: + await asyncio.gather(*tasks) + except Exception as e: + make_log('index_scout_v3', f"loop error: {e}", level='error') + await asyncio.sleep(INTERVAL_SEC) diff --git a/app/core/crypto/aes_gcm_siv_stream.py b/app/core/crypto/aes_gcm_siv_stream.py new file mode 100644 index 0000000..df2572c --- /dev/null +++ b/app/core/crypto/aes_gcm_siv_stream.py @@ -0,0 +1,109 @@ +from __future__ import annotations + +import hmac +import hashlib +import struct +from typing import BinaryIO, Iterator, AsyncIterator + +from gcm_siv import GcmSiv # requires `gcm_siv` package + + +MAGIC = b"ENCF" +VERSION = 1 +SCHEME_AES_GCM_SIV = 0x01 + + +def _derive_nonce(salt: bytes, idx: int) -> bytes: + b = idx.to_bytes(8, 'big') + return hmac.new(salt, b, hashlib.sha256).digest()[:12] + + +def build_header(chunk_bytes: int, salt: bytes) -> bytes: + assert 0 < chunk_bytes <= (1 << 31) + assert 1 <= len(salt) <= 255 + # MAGIC(4) | ver(1) | scheme(1) | chunk_bytes(4,BE) | salt_len(1) | salt | reserved(5) + hdr = bytearray() + hdr += MAGIC + hdr += bytes([VERSION]) + hdr += bytes([SCHEME_AES_GCM_SIV]) + hdr += struct.pack(">I", int(chunk_bytes)) + hdr += bytes([len(salt)]) + hdr += salt + hdr += b"\x00" * 5 + return bytes(hdr) + + +def encrypt_file_to_encf(src: BinaryIO, key: bytes, chunk_bytes: int, salt: bytes) -> Iterator[bytes]: + """ + Yield ENCF v1 stream using AES-GCM-SIV per chunk with deterministic nonces. + Frame: [p_len:4][cipher][tag(16)]. + """ + yield build_header(chunk_bytes, salt) + idx = 0 + while True: + block = src.read(chunk_bytes) + if not block: + break + nonce = _derive_nonce(salt, idx) + ct_and_tag = GcmSiv(key).encrypt(nonce, block, associated_data=None) + # Split tag + tag = ct_and_tag[-16:] + ct = ct_and_tag[:-16] + yield struct.pack(">I", len(block)) + yield ct + yield tag + idx += 1 + + +async def decrypt_encf_to_file(byte_iter: AsyncIterator[bytes], key: bytes, out_path: str) -> None: + """Parse ENCF v1 (AES-GCM-SIV) and write plaintext to out_path.""" + import aiofiles + buf = bytearray() + + async def _fill(n: int): + nonlocal buf + while len(buf) < n: + try: + chunk = await byte_iter.__anext__() + except StopAsyncIteration: + break + if chunk: + buf.extend(chunk) + + # header minimal + await _fill(11) + if buf[:4] != MAGIC: + raise ValueError("bad magic") + version = buf[4] + scheme = buf[5] + if version != 1 or scheme != SCHEME_AES_GCM_SIV: + raise ValueError("unsupported encf header") + chunk_bytes = struct.unpack(">I", bytes(buf[6:10]))[0] + salt_len = buf[10] + hdr_len = 4 + 1 + 1 + 4 + 1 + salt_len + 5 + await _fill(hdr_len) + salt = bytes(buf[11:11 + salt_len]) + del buf[:hdr_len] + + async with aiofiles.open(out_path, 'wb') as out: + idx = 0 + TAG_LEN = 16 + while True: + await _fill(4) + if len(buf) == 0: + break + if len(buf) < 4: + raise ValueError("truncated frame length") + p_len = struct.unpack(">I", bytes(buf[:4]))[0] + del buf[:4] + await _fill(p_len + TAG_LEN) + if len(buf) < p_len + TAG_LEN: + raise ValueError("truncated cipher/tag") + ct = bytes(buf[:p_len]) + tag = bytes(buf[p_len:p_len+TAG_LEN]) + del buf[:p_len+TAG_LEN] + nonce = _derive_nonce(salt, idx) + pt = GcmSiv(key).decrypt(nonce, ct + tag, associated_data=None) + await out.write(pt) + idx += 1 + diff --git a/app/core/crypto/aes_siv_stream.py b/app/core/crypto/aes_siv_stream.py new file mode 100644 index 0000000..1e08d9e --- /dev/null +++ b/app/core/crypto/aes_siv_stream.py @@ -0,0 +1,137 @@ +from __future__ import annotations + +import os +import struct +from typing import BinaryIO, Iterator, AsyncIterator + +from Crypto.Cipher import SIV +from Crypto.Cipher import AES + + +MAGIC = b"ENCF" +VERSION = 1 + +# Scheme codes +SCHEME_AES_SIV = 0x02 # RFC5297 AES-SIV (CMAC-based) + + +def build_header(chunk_bytes: int, salt: bytes, scheme: int = SCHEME_AES_SIV) -> bytes: + assert 0 < chunk_bytes <= (1 << 31) + assert 1 <= len(salt) <= 255 + # Layout: MAGIC(4) | version(1) | scheme(1) | chunk_bytes(4,BE) | salt_len(1) | salt(N) | reserved(5 zeros) + hdr = bytearray() + hdr += MAGIC + hdr += bytes([VERSION]) + hdr += bytes([scheme]) + hdr += struct.pack(">I", int(chunk_bytes)) + hdr += bytes([len(salt)]) + hdr += salt + hdr += b"\x00" * 5 + return bytes(hdr) + + +def parse_header(buf: bytes) -> tuple[int, int, int, bytes, int]: + if len(buf) < 4 + 1 + 1 + 4 + 1: + raise ValueError("header too short") + if buf[:4] != MAGIC: + raise ValueError("bad magic") + version = buf[4] + scheme = buf[5] + chunk_bytes = struct.unpack(">I", buf[6:10])[0] + salt_len = buf[10] + needed = 4 + 1 + 1 + 4 + 1 + salt_len + 5 + if len(buf) < needed: + raise ValueError("incomplete header") + salt = buf[11:11 + salt_len] + # reserved 5 bytes at the end ignored + return version, scheme, chunk_bytes, salt, needed + + +def _ad(salt: bytes, idx: int) -> bytes: + return salt + struct.pack(">Q", idx) + + +def encrypt_file_to_encf(src: BinaryIO, key: bytes, chunk_bytes: int, salt: bytes) -> Iterator[bytes]: + """ + Yield ENCF v1 stream bytes: [header] then for each chunk: [p_len:4][cipher][tag(16)]. + Uses AES-SIV (RFC5297) with per-chunk associated data salt||index. + """ + yield build_header(chunk_bytes, salt, SCHEME_AES_SIV) + idx = 0 + while True: + block = src.read(chunk_bytes) + if not block: + break + siv = SIV.new(key=key, ciphermod=AES) # new object per message + siv.update(_ad(salt, idx)) + ciph, tag = siv.encrypt_and_digest(block) + yield struct.pack(">I", len(block)) + yield ciph + yield tag + idx += 1 + + +async def decrypt_encf_to_file(byte_iter: AsyncIterator[bytes], key: bytes, out_path: str) -> None: + """ + Parse ENCF v1 stream from async byte iterator and write plaintext to out_path. + """ + import aiofiles + from Crypto.Cipher import SIV as _SIV + from Crypto.Cipher import AES as _AES + + buf = bytearray() + + async def _fill(n: int): + """Ensure at least n bytes in buffer (or EOF).""" + nonlocal buf + while len(buf) < n: + try: + chunk = await byte_iter.__anext__() + except StopAsyncIteration: + break + if chunk: + buf.extend(chunk) + + # Read and parse header + await _fill(4 + 1 + 1 + 4 + 1) # minimal header + # Might still be incomplete if salt_len > 0; keep filling progressively + # First, get preliminary to know salt_len + if len(buf) < 11: + await _fill(11) + if buf[:4] != MAGIC: + raise ValueError("bad magic") + salt_len = buf[10] + hdr_len = 4 + 1 + 1 + 4 + 1 + salt_len + 5 + await _fill(hdr_len) + version, scheme, chunk_bytes, salt, consumed = parse_header(bytes(buf)) + del buf[:consumed] + if version != 1: + raise ValueError("unsupported ENCF version") + if scheme != SCHEME_AES_SIV: + raise ValueError("unsupported scheme") + + async with aiofiles.open(out_path, 'wb') as out: + idx = 0 + TAG_LEN = 16 + while True: + # Need at least 4 bytes for p_len + await _fill(4) + if len(buf) == 0: + break # EOF exactly on boundary + if len(buf) < 4: + raise ValueError("truncated frame length") + p_len = struct.unpack(">I", bytes(buf[:4]))[0] + del buf[:4] + # Now need p_len + 16 bytes + await _fill(p_len + TAG_LEN) + if len(buf) < p_len + TAG_LEN: + raise ValueError("truncated cipher/tag") + c = bytes(buf[:p_len]) + t = bytes(buf[p_len:p_len+TAG_LEN]) + del buf[:p_len+TAG_LEN] + siv = _SIV.new(key=key, ciphermod=_AES) + siv.update(_ad(salt, idx)) + p = siv.decrypt_and_verify(c, t) + await out.write(p) + idx += 1 + diff --git a/app/core/crypto/aesgcm_stream.py b/app/core/crypto/aesgcm_stream.py new file mode 100644 index 0000000..15d3562 --- /dev/null +++ b/app/core/crypto/aesgcm_stream.py @@ -0,0 +1,77 @@ +from __future__ import annotations + +import os +import hmac +import hashlib +from typing import BinaryIO, Iterator + +from Crypto.Cipher import AES + + +CHUNK_BYTES = int(os.getenv("CRYPTO_CHUNK_BYTES", "1048576")) # 1 MiB + + +def _derive_nonce(salt: bytes, chunk_index: int) -> bytes: + """Derive a 12-byte GCM nonce deterministically from per-file salt and chunk index.""" + idx = chunk_index.to_bytes(8, 'big') + digest = hmac.new(salt, idx, hashlib.sha256).digest() + return digest[:12] + + +def encrypt_stream_aesgcm(src: BinaryIO, key: bytes, salt: bytes) -> Iterator[bytes]: + """ + Read plaintext from src by CHUNK_BYTES, encrypt each chunk with AES-GCM using a + deterministic nonce derived from (salt, index). Yields bytes in framing: [C_i][TAG_i]... + Ciphertext length equals plaintext chunk length. Tag is 16 bytes. + """ + assert len(key) in (16, 24, 32) + assert len(salt) >= 12 + idx = 0 + while True: + block = src.read(CHUNK_BYTES) + if not block: + break + nonce = _derive_nonce(salt, idx) + cipher = AES.new(key, AES.MODE_GCM, nonce=nonce) + ciphertext, tag = cipher.encrypt_and_digest(block) + yield ciphertext + yield tag + idx += 1 + + +def decrypt_stream_aesgcm_iter(byte_iter: Iterator[bytes], key: bytes, salt: bytes) -> Iterator[bytes]: + """ + Decrypt a stream that was produced by encrypt_stream_aesgcm. + Frame format: concatenation of [C_i][TAG_i] for each i, where |C_i| = CHUNK_BYTES and |TAG_i|=16. + We accept arbitrary chunking from the underlying iterator and reframe accordingly. + """ + assert len(key) in (16, 24, 32) + buf = bytearray() + idx = 0 + TAG_LEN = 16 + def _try_yield(): + nonlocal idx + out = [] + while len(buf) >= CHUNK_BYTES + TAG_LEN: + c = bytes(buf[:CHUNK_BYTES]) + t = bytes(buf[CHUNK_BYTES:CHUNK_BYTES+TAG_LEN]) + del buf[:CHUNK_BYTES+TAG_LEN] + nonce = _derive_nonce(salt, idx) + cipher = AES.new(key, AES.MODE_GCM, nonce=nonce) + try: + p = cipher.decrypt_and_verify(c, t) + except Exception as e: + raise ValueError(f"Decrypt failed at chunk {idx}: {e}") + out.append(p) + idx += 1 + return out + for chunk in byte_iter: + if not chunk: + continue + buf.extend(chunk) + for p in _try_yield(): + yield p + # At end, buffer must be empty + if len(buf) != 0: + raise ValueError("Trailing bytes in encrypted stream (incomplete frame)") + diff --git a/app/core/crypto/encf_stream.py b/app/core/crypto/encf_stream.py new file mode 100644 index 0000000..b53ce36 --- /dev/null +++ b/app/core/crypto/encf_stream.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +from typing import AsyncIterator + +from .aes_gcm_siv_stream import MAGIC as _MAGIC, VERSION as _VER, SCHEME_AES_GCM_SIV +from .aes_gcm_siv_stream import decrypt_encf_to_file as _dec_gcmsiv +from .aes_siv_stream import decrypt_encf_to_file as _dec_siv + + +async def decrypt_encf_auto(byte_iter: AsyncIterator[bytes], key: bytes, out_path: str) -> None: + """ + Detect scheme by peeking header, then delegate to proper decrypter. + Re-feeds the peeked bytes back to the chosen decoder. + """ + buf = bytearray() + + async def _fill(n: int): + nonlocal buf + while len(buf) < n: + try: + ch = await byte_iter.__anext__() + except StopAsyncIteration: + break + if ch: + buf.extend(ch) + + await _fill(11) + if buf[:4] != _MAGIC: + raise ValueError("bad magic") + scheme = buf[5] + + async def _prepend_iter(): + nonlocal buf + if buf: + yield bytes(buf) + async for ch in byte_iter: + yield ch + + if scheme == SCHEME_AES_GCM_SIV: + await _dec_gcmsiv(_prepend_iter(), key, out_path) + else: + await _dec_siv(_prepend_iter(), key, out_path) + diff --git a/app/core/crypto/x25519.py b/app/core/crypto/x25519.py new file mode 100644 index 0000000..1164865 --- /dev/null +++ b/app/core/crypto/x25519.py @@ -0,0 +1,25 @@ +from __future__ import annotations + +import base64 +from typing import Tuple + +from nacl import public, signing, bindings + + +def ed25519_to_x25519(ed_seed: bytes) -> Tuple[public.PrivateKey, public.PublicKey]: + """Convert Ed25519 seed (32 bytes) to X25519 key pair using libsodium conversion.""" + if len(ed_seed) != 32: + raise ValueError("ed25519 seed must be 32 bytes") + sk_ed = signing.SigningKey(ed_seed) + sk_ed_bytes = sk_ed._seed + sk_ed.verify_key._key # 64-byte expanded sk (seed||pub) + sk_x_bytes = bindings.crypto_sign_ed25519_sk_to_curve25519(sk_ed_bytes) + pk_x_bytes = bindings.crypto_sign_ed25519_pk_to_curve25519(bytes(sk_ed.verify_key)) + sk_x = public.PrivateKey(sk_x_bytes) + pk_x = public.PublicKey(pk_x_bytes) + return sk_x, pk_x + + +def x25519_pub_b64_from_ed_seed(ed_seed: bytes) -> str: + _, pk = ed25519_to_x25519(ed_seed) + return base64.b64encode(bytes(pk)).decode() + diff --git a/app/core/ipfs_client.py b/app/core/ipfs_client.py new file mode 100644 index 0000000..d1b5183 --- /dev/null +++ b/app/core/ipfs_client.py @@ -0,0 +1,105 @@ +from __future__ import annotations + +import os +from typing import AsyncIterator, Dict, Any, Iterable, Optional + +import httpx + +IPFS_API_URL = os.getenv("IPFS_API_URL", "http://ipfs:5001") +IPFS_GATEWAY_URL = os.getenv("IPFS_GATEWAY_URL", "http://ipfs:8080") + + +async def add_streamed_file(stream_iter: Iterable[bytes], filename: str = "file.bin", params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: + """ + Stream-encrypt pipeline can pass a generator of bytes here. We stream to /api/v0/add as multipart. + Returns dict with fields from IPFS: { Name, Hash, Size }. + """ + params = params or {} + # Ensure deterministic chunking and CIDv1 + default_params = { + "cid-version": 1, + "raw-leaves": "true", + "chunker": f"size-{int(os.getenv('CRYPTO_CHUNK_BYTES', '1048576'))}", + "pin": "true", + "wrap-with-directory": "false", + "progress": "true", + } + q = {**default_params, **params} + + async with httpx.AsyncClient(timeout=None) as client: + files = {"file": (filename, stream_iter, "application/octet-stream")} + r = await client.post(f"{IPFS_API_URL}/api/v0/add", params=q, files=files) + r.raise_for_status() + # /add may emit NDJSON lines; most often single JSON + try: + data = r.json() + except Exception: + # Fallback: last non-empty line + last = [ln for ln in r.text.splitlines() if ln.strip()][-1] + import json as _json + data = _json.loads(last) + return data + + +async def pin_add(cid: str, recursive: bool = True) -> Dict[str, Any]: + async with httpx.AsyncClient(timeout=None) as client: + r = await client.post(f"{IPFS_API_URL}/api/v0/pin/add", params={"arg": cid, "recursive": str(recursive).lower(), "progress": "true"}) + r.raise_for_status() + return r.json() + + +async def pin_ls(cid: str) -> Dict[str, Any]: + async with httpx.AsyncClient(timeout=30) as client: + r = await client.post(f"{IPFS_API_URL}/api/v0/pin/ls", params={"arg": cid}) + r.raise_for_status() + return r.json() + + +async def swarm_connect(multiaddr: str) -> Dict[str, Any]: + async with httpx.AsyncClient(timeout=10) as client: + r = await client.post(f"{IPFS_API_URL}/api/v0/swarm/connect", params={"arg": multiaddr}) + r.raise_for_status() + return r.json() + + +async def cat_stream(cid: str): + client = httpx.AsyncClient(timeout=None) + try: + async with client.stream("POST", f"{IPFS_API_URL}/api/v0/cat", params={"arg": cid}) as r: + r.raise_for_status() + async for chunk in r.aiter_bytes(): + if chunk: + yield chunk + finally: + await client.aclose() + + +async def find_providers(cid: str, max_results: int = 8): + """Query DHT for providers of a CID and return a list of {peer, addrs[]}. + Uses /api/v0/dht/findprovs and parses NDJSON stream. + """ + out = [] + async with httpx.AsyncClient(timeout=30) as client: + async with client.stream("POST", f"{IPFS_API_URL}/api/v0/dht/findprovs", params={"arg": cid}) as r: + r.raise_for_status() + async for line in r.aiter_lines(): + if not line: + continue + try: + j = httpx.Response(200, text=line).json() + except Exception: + import json as _json + try: + j = _json.loads(line) + except Exception: + continue + # Entries can include 'Extra' or 'Responses' + resps = j.get('Responses') or [] + for resp in resps: + peer = resp.get('ID') or resp.get('ID', '') + addrs = resp.get('Addrs') or [] + if peer: + out.append({"peer": peer, "addrs": addrs}) + if len(out) >= max_results: + return out + return out diff --git a/app/core/models/__init__.py b/app/core/models/__init__.py index 9137a2d..28d1840 100644 --- a/app/core/models/__init__.py +++ b/app/core/models/__init__.py @@ -13,3 +13,11 @@ from app.core.models.asset import Asset from app.core.models.my_network import KnownNode, KnownNodeIncident, RemoteContentIndex from app.core.models.promo import PromoAction from app.core.models.tasks import BlockchainTask +from app.core.models.content_v3 import ( + EncryptedContent, + ContentKey, + IpfsSync, + ContentDerivative, + ContentIndexItem, + KeyGrant, +) diff --git a/app/core/models/content_v3.py b/app/core/models/content_v3.py new file mode 100644 index 0000000..aceb09d --- /dev/null +++ b/app/core/models/content_v3.py @@ -0,0 +1,123 @@ +from __future__ import annotations + +from datetime import datetime +from sqlalchemy import Column, BigInteger, Integer, String, DateTime, JSON, Boolean, ForeignKey +from sqlalchemy.orm import relationship + +from .base import AlchemyBase + + +class EncryptedContent(AlchemyBase): + __tablename__ = 'encrypted_contents' + + id = Column(Integer, autoincrement=True, primary_key=True) + # CID of encrypted source stored in IPFS (CIDv1 base32) + encrypted_cid = Column(String(128), nullable=False, unique=True) + + # Public metadata + title = Column(String(512), nullable=False) + description = Column(String(4096), nullable=True) + content_type = Column(String(64), nullable=False) # e.g. audio/flac, video/mp4, application/octet-stream + + # Sizes + enc_size_bytes = Column(BigInteger, nullable=True) + plain_size_bytes = Column(BigInteger, nullable=True) + + # Preview flags and config (all preview params live here, not in derivatives) + preview_enabled = Column(Boolean, nullable=False, default=False) + preview_conf = Column(JSON, nullable=False, default=dict) + + # Crypto parameters (fixed per network) + aead_scheme = Column(String(32), nullable=False, default='AES_GCM_SIV') + chunk_bytes = Column(Integer, nullable=False, default=1048576) + salt_b64 = Column(String(64), nullable=True) # per-content salt used for nonce derivation + + created_at = Column(DateTime, nullable=False, default=datetime.utcnow) + updated_at = Column(DateTime, nullable=False, default=datetime.utcnow, onupdate=datetime.utcnow) + + +class ContentKey(AlchemyBase): + __tablename__ = 'content_keys' + + content_id = Column(Integer, ForeignKey('encrypted_contents.id'), primary_key=True) + key_ciphertext_b64 = Column(String(512), nullable=False) + key_fingerprint = Column(String(128), nullable=False) + issuer_node_id = Column(String(128), nullable=False) + allow_auto_grant = Column(Boolean, nullable=False, default=True) + lease_expires_at = Column(DateTime, nullable=True) + created_at = Column(DateTime, nullable=False, default=datetime.utcnow) + + content = relationship('EncryptedContent', uselist=False, foreign_keys=[content_id]) + + +class IpfsSync(AlchemyBase): + __tablename__ = 'ipfs_sync' + + content_id = Column(Integer, ForeignKey('encrypted_contents.id'), primary_key=True) + pin_state = Column(String(32), nullable=False, default='pinned') # not_pinned|queued|pinning|pinned|failed + pin_error = Column(String(1024), nullable=True) + bytes_total = Column(BigInteger, nullable=True) + bytes_fetched = Column(BigInteger, nullable=True) + providers_cache = Column(JSON, nullable=False, default=list) + first_seen_at = Column(DateTime, nullable=True) + pinned_at = Column(DateTime, nullable=True) + updated_at = Column(DateTime, nullable=False, default=datetime.utcnow, onupdate=datetime.utcnow) + + content = relationship('EncryptedContent', uselist=False, foreign_keys=[content_id]) + + +class ContentDerivative(AlchemyBase): + __tablename__ = 'content_derivatives' + + id = Column(Integer, autoincrement=True, primary_key=True) + content_id = Column(Integer, ForeignKey('encrypted_contents.id'), nullable=False) + kind = Column(String(64), nullable=False) # decrypted_high|decrypted_low|decrypted_thumbnail|decrypted_preview + interval_start_ms = Column(Integer, nullable=True) + interval_end_ms = Column(Integer, nullable=True) + local_path = Column(String(1024), nullable=False) + content_type = Column(String(64), nullable=True) + size_bytes = Column(BigInteger, nullable=True) + status = Column(String(32), nullable=False, default='pending') # pending|processing|ready|failed + error = Column(String(1024), nullable=True) + created_at = Column(DateTime, nullable=False, default=datetime.utcnow) + last_access_at = Column(DateTime, nullable=True) + + content = relationship('EncryptedContent', uselist=False, foreign_keys=[content_id]) + + +class ContentIndexItem(AlchemyBase): + __tablename__ = 'content_index_items' + + encrypted_cid = Column(String(128), primary_key=True) + payload = Column(JSON, nullable=False, default=dict) + sig = Column(String(512), nullable=False) + updated_at = Column(DateTime, nullable=False, default=datetime.utcnow, onupdate=datetime.utcnow) + + +class KeyGrant(AlchemyBase): + __tablename__ = 'key_grants' + + id = Column(Integer, autoincrement=True, primary_key=True) + encrypted_cid = Column(String(128), nullable=False) + issuer_node_id = Column(String(128), nullable=False) + to_node_id = Column(String(128), nullable=False) + sealed_key_b64 = Column(String(1024), nullable=False) + aead_scheme = Column(String(32), nullable=False) + chunk_bytes = Column(Integer, nullable=False) + constraints = Column(JSON, nullable=False, default=dict) + issued_at = Column(DateTime, nullable=False, default=datetime.utcnow) + sig = Column(String(512), nullable=False) + + +class UploadSession(AlchemyBase): + __tablename__ = 'upload_sessions' + + id = Column(String(128), primary_key=True) # tus Upload.ID + filename = Column(String(512), nullable=True) + size_bytes = Column(BigInteger, nullable=True) + state = Column(String(32), nullable=False, default='uploading') # uploading|processing|pinned|failed + encrypted_cid = Column(String(128), nullable=True) + storage_path = Column(String(1024), nullable=True) + error = Column(String(1024), nullable=True) + created_at = Column(DateTime, nullable=False, default=datetime.utcnow) + updated_at = Column(DateTime, nullable=False, default=datetime.utcnow, onupdate=datetime.utcnow) diff --git a/app/core/models/memory.py b/app/core/models/memory.py index 61a8ed0..949748e 100644 --- a/app/core/models/memory.py +++ b/app/core/models/memory.py @@ -42,6 +42,10 @@ class Memory: self._telegram_bot = Bot(TELEGRAM_API_KEY) self._client_telegram_bot = Bot(CLIENT_TELEGRAM_API_KEY) + # Network handshake guards + self._handshake_rl = {"minute": 0, "counts": {}} + self._handshake_nonces = {} + @asynccontextmanager async def transaction(self, desc=""): make_log("Memory.transaction", f"Starting transaction; {desc}", level='debug') @@ -77,4 +81,3 @@ class Memory: self._execute_queue.append([_fn, args, kwargs]) - diff --git a/app/core/network/__init__.py b/app/core/network/__init__.py new file mode 100644 index 0000000..ddc34ec --- /dev/null +++ b/app/core/network/__init__.py @@ -0,0 +1,2 @@ +# Network package for MY nodes + diff --git a/app/core/network/config.py b/app/core/network/config.py new file mode 100644 index 0000000..90c7258 --- /dev/null +++ b/app/core/network/config.py @@ -0,0 +1,38 @@ +import os +from typing import List + +from app.core._config import PROJECT_HOST +from .constants import NODE_TYPE_PUBLIC, NODE_TYPE_PRIVATE + + +def _csv_list(val: str) -> List[str]: + return [x.strip() for x in (val or "").split(",") if x.strip()] + + +# Handshake / network config driven by env +NODE_PRIVACY = os.getenv("NODE_PRIVACY", NODE_TYPE_PUBLIC).strip().lower() +if NODE_PRIVACY not in (NODE_TYPE_PUBLIC, NODE_TYPE_PRIVATE): + NODE_PRIVACY = NODE_TYPE_PUBLIC + +# Public endpoint for network (can be empty for private nodes) +_env_public_host = os.getenv("PUBLIC_HOST") +PUBLIC_HOST = _env_public_host if (_env_public_host is not None and _env_public_host.strip() != "") else None + +HANDSHAKE_INTERVAL_SEC = int(os.getenv("HANDSHAKE_INTERVAL_SEC", "5")) +UNSUPPORTED_RECHECK_INTERVAL_SEC = int(os.getenv("UNSUPPORTED_RECHECK_INTERVAL_SEC", str(24 * 3600))) + +BOOTSTRAP_SEEDS = _csv_list(os.getenv("BOOTSTRAP_SEEDS", "")) +BOOTSTRAP_REQUIRED = int(os.getenv("BOOTSTRAP_REQUIRED", "1")) == 1 +BOOTSTRAP_TIMEOUT_SEC = int(os.getenv("BOOTSTRAP_TIMEOUT_SEC", "20")) + +# Security knobs +NETWORK_TLS_VERIFY = int(os.getenv("NETWORK_TLS_VERIFY", "1")) == 1 +HANDSHAKE_TS_TOLERANCE_SEC = int(os.getenv("HANDSHAKE_TS_TOLERANCE_SEC", "300")) +HANDSHAKE_RATE_LIMIT_PER_MIN = int(os.getenv("HANDSHAKE_RATE_LIMIT_PER_MIN", "60")) + +# Capabilities +NODE_IS_BOOTSTRAP = int(os.getenv("NODE_IS_BOOTSTRAP", "0")) == 1 +MAX_CONTENT_SIZE_MB = int(os.getenv("MAX_CONTENT_SIZE_MB", "512")) + +# Privacy allowlist (for NODE_PRIVACY=private) +PRIVATE_ALLOWLIST = _csv_list(os.getenv("PRIVATE_ALLOWLIST", "/api/system.version")) diff --git a/app/core/network/constants.py b/app/core/network/constants.py new file mode 100644 index 0000000..1d4aafd --- /dev/null +++ b/app/core/network/constants.py @@ -0,0 +1,6 @@ +CURRENT_PROTOCOL_VERSION = "3.0.0" + +# Node roles/types +NODE_TYPE_PUBLIC = "public" +NODE_TYPE_PRIVATE = "private" + diff --git a/app/core/network/guard.py b/app/core/network/guard.py new file mode 100644 index 0000000..219cffb --- /dev/null +++ b/app/core/network/guard.py @@ -0,0 +1,63 @@ +from __future__ import annotations + +import time +from typing import Dict, Set + +from app.core.network.config import HANDSHAKE_RATE_LIMIT_PER_MIN, HANDSHAKE_TS_TOLERANCE_SEC + + +def check_rate_limit(memory, remote_ip: str) -> bool: + """Simple per-IP rate limit within current minute window. + Returns True if allowed, False if limited. + """ + now = int(time.time()) + minute = now // 60 + rl = getattr(memory, "_handshake_rl", None) + if rl is None or rl.get("minute") != minute: + rl = {"minute": minute, "counts": {}} + memory._handshake_rl = rl + counts = rl["counts"] + cnt = counts.get(remote_ip, 0) + if cnt >= HANDSHAKE_RATE_LIMIT_PER_MIN: + return False + counts[remote_ip] = cnt + 1 + return True + + +def check_timestamp_fresh(ts: int) -> bool: + now = int(time.time()) + return abs(now - int(ts)) <= HANDSHAKE_TS_TOLERANCE_SEC + + +def check_and_remember_nonce(memory, pubkey_b58: str, nonce: str) -> bool: + """Return True if nonce is new; remember nonce with TTL ~ tolerance window. + We keep a compact in-memory set per pubkey. + """ + now = int(time.time()) + store = getattr(memory, "_handshake_nonces", None) + if store is None: + store = {} + memory._handshake_nonces = store + + entry = store.get(pubkey_b58) + if entry is None: + entry = {"nonces": {}, "updated": now} + store[pubkey_b58] = entry + + nonces: Dict[str, int] = entry["nonces"] + # prune old nonces + to_delete = [k for k, t in nonces.items() if now - int(t) > HANDSHAKE_TS_TOLERANCE_SEC] + for k in to_delete: + nonces.pop(k, None) + + if nonce in nonces: + return False + # prevent unbounded growth + if len(nonces) > 2048: + # drop half oldest + for k, _ in sorted(nonces.items(), key=lambda kv: kv[1])[:1024]: + nonces.pop(k, None) + nonces[nonce] = now + entry["updated"] = now + return True + diff --git a/app/core/network/handshake.py b/app/core/network/handshake.py new file mode 100644 index 0000000..c447598 --- /dev/null +++ b/app/core/network/handshake.py @@ -0,0 +1,113 @@ +from __future__ import annotations + +import json +from datetime import datetime +import os +import time +import shutil +import secrets +from typing import Dict, Any + +from base58 import b58encode +from sqlalchemy import select + +from app.core._secrets import hot_pubkey, hot_seed +from app.core._crypto.signer import Signer +from app.core.logger import make_log +from app.core.models.my_network import KnownNode +from app.core.models.node_storage import StoredContent +from app.core.storage import db_session +from .constants import CURRENT_PROTOCOL_VERSION +from .nodes import list_known_public_nodes +from .config import PUBLIC_HOST, NODE_PRIVACY, NODE_IS_BOOTSTRAP, MAX_CONTENT_SIZE_MB +from app.core._config import ALLOWED_CONTENT_TYPES +from .constants import NODE_TYPE_PUBLIC + + +START_TS = time.time() + + +async def _metrics(session) -> Dict[str, Any]: + # Lightweight metrics for handshake + # Count total content (any type) + total_contents = (await session.execute(select(StoredContent))).scalars().all() + content_count = len(total_contents) + # Basic system metrics + try: + load1, load5, load15 = os.getloadavg() + except Exception: + load1 = load5 = load15 = 0.0 + try: + from app.core._config import UPLOADS_DIR + du = shutil.disk_usage(UPLOADS_DIR) + disk_total_gb = round(du.total / (1024 ** 3), 2) + disk_free_gb = round(du.free / (1024 ** 3), 2) + except Exception: + disk_total_gb = disk_free_gb = -1 + uptime_sec = int(time.time() - START_TS) + return { + "content_count": content_count, + "uptime_sec": uptime_sec, + "loadavg": [load1, load5, load15], + "disk_total_gb": disk_total_gb, + "disk_free_gb": disk_free_gb, + } + + +def _sign(obj: Dict[str, Any]) -> str: + signer = Signer(hot_seed) + blob = json.dumps(obj, sort_keys=True, separators=(",", ":")).encode() + return signer.sign(blob) + + +async def build_handshake_payload(session) -> Dict[str, Any]: + payload = { + "version": CURRENT_PROTOCOL_VERSION, + "public_key": b58encode(hot_pubkey).decode(), + # public_host is optional for private nodes + **({"public_host": PUBLIC_HOST} if PUBLIC_HOST else {}), + "node_type": NODE_PRIVACY if NODE_PRIVACY != NODE_TYPE_PUBLIC else NODE_TYPE_PUBLIC, + "metrics": await _metrics(session), + "capabilities": { + "accepts_inbound": NODE_PRIVACY == NODE_TYPE_PUBLIC, + "is_bootstrap": NODE_IS_BOOTSTRAP, + "supported_types": ALLOWED_CONTENT_TYPES, + "max_content_size_mb": MAX_CONTENT_SIZE_MB, + }, + "timestamp": int(datetime.utcnow().timestamp()), + "nonce": secrets.token_hex(16), + } + try: + payload["known_public_nodes"] = await list_known_public_nodes(session) + except Exception: + payload["known_public_nodes"] = [] + payload["signature"] = _sign(payload) + return payload + + +async def compute_node_info(session) -> Dict[str, Any]: + node_info = { + "id": b58encode(hot_pubkey).decode(), + "public_key": b58encode(hot_pubkey).decode(), + **({"public_host": PUBLIC_HOST} if PUBLIC_HOST else {}), + "version": CURRENT_PROTOCOL_VERSION, + "node_type": NODE_PRIVACY, + "metrics": await _metrics(session), + "capabilities": { + "accepts_inbound": NODE_PRIVACY == NODE_TYPE_PUBLIC, + "is_bootstrap": NODE_IS_BOOTSTRAP, + "supported_types": ALLOWED_CONTENT_TYPES, + "max_content_size_mb": MAX_CONTENT_SIZE_MB, + }, + } + return node_info + +def sign_response(data: Dict[str, Any]) -> Dict[str, Any]: + body = { + **data, + "timestamp": int(datetime.utcnow().timestamp()), + } + sig = _sign(body) + body["server_public_key"] = b58encode(hot_pubkey).decode() + body["server_signature"] = sig + return body diff --git a/app/core/network/key_client.py b/app/core/network/key_client.py new file mode 100644 index 0000000..04fc781 --- /dev/null +++ b/app/core/network/key_client.py @@ -0,0 +1,46 @@ +from __future__ import annotations + +import base64 +from typing import Optional + +import httpx +from base58 import b58encode + +from app.core._secrets import hot_seed, hot_pubkey +from app.core.crypto.x25519 import ed25519_to_x25519 +from app.core.logger import make_log +from app.core.network.nodesig import sign_headers + + +async def request_key_from_peer(base_url: str, encrypted_cid: str) -> Optional[bytes]: + """ + Request a sealed key from peer and decrypt it using our X25519 private key. + Returns plaintext DEK bytes or None on failure. + """ + try: + sk_x, pk_x = ed25519_to_x25519(hot_seed) + node_id = b58encode(hot_pubkey).decode() + body = { + "encrypted_cid": encrypted_cid, + "requestor_node_id": node_id, + "recipient_box_pub": base64.b64encode(bytes(pk_x)).decode(), + } + path = "/api/v1/keys.request" + headers = sign_headers("POST", path, json.dumps(body).encode(), hot_seed, b58encode(hot_pubkey).decode()) + async with httpx.AsyncClient(timeout=15) as client: + r = await client.post(f"{base_url.rstrip('/')}{path}", json=body, headers=headers) + if r.status_code != 200: + make_log('key_client', f"{base_url} returned {r.status_code}: {r.text}", level='warning') + return None + j = r.json() + sealed_b64 = j.get('sealed_key_b64') + if not sealed_b64: + return None + sealed = base64.b64decode(sealed_b64) + from nacl.public import SealedBox + sb = SealedBox(sk_x) + dek = sb.decrypt(sealed) + return dek + except Exception as e: + make_log('key_client', f"request/decrypt failed: {e}", level='error') + return None diff --git a/app/core/network/nodes.py b/app/core/network/nodes.py new file mode 100644 index 0000000..e1f2c58 --- /dev/null +++ b/app/core/network/nodes.py @@ -0,0 +1,261 @@ +from __future__ import annotations + +import asyncio +from datetime import datetime, timedelta +import json +from typing import Dict, Any, Optional, List + +import httpx +from base58 import b58encode +from sqlalchemy import select, update + +from app.core.logger import make_log +from app.core.models.my_network import KnownNode +from app.core.storage import db_session +from app.core._secrets import hot_pubkey +from .config import ( + HANDSHAKE_INTERVAL_SEC, + UNSUPPORTED_RECHECK_INTERVAL_SEC, + BOOTSTRAP_SEEDS, + BOOTSTRAP_REQUIRED, + BOOTSTRAP_TIMEOUT_SEC, + NODE_PRIVACY, + NETWORK_TLS_VERIFY, +) +from .constants import NODE_TYPE_PRIVATE +from .semver import compatibility +from .constants import CURRENT_PROTOCOL_VERSION + + +def _now() -> datetime: + return datetime.utcnow() + + +async def upsert_known_node(session, host: str, port: int, public_key: str, meta: Dict[str, Any]) -> KnownNode: + # Host can be full URL; normalize host/ip and port if available + host = (host or "").replace("http://", "").replace("https://", "").strip("/") + h_only = host + if ":" in host: + h_only, port_str = host.rsplit(":", 1) + try: + port = int(port_str) + except Exception: + pass + # Prefer match by public_key (stable identity) + if public_key: + result = await session.execute(select(KnownNode).where(KnownNode.public_key == public_key)) + row = result.scalars().first() + if row: + row.ip = h_only or row.ip + row.port = port or row.port + row.public_key = public_key or row.public_key + row.meta = {**(row.meta or {}), **(meta or {})} + row.last_sync = _now() + await session.commit() + return row + # Fallback by IP/host + result = await session.execute(select(KnownNode).where(KnownNode.ip == h_only)) + row = result.scalars().first() + if row: + row.port = port or row.port + row.public_key = public_key or row.public_key + row.meta = {**(row.meta or {}), **(meta or {})} + row.last_sync = _now() + await session.commit() + return row + node = KnownNode( + ip=h_only, + port=port or 80, + public_key=public_key, + reputation=0, + last_sync=_now(), + meta=meta or {}, + located_at=_now(), + ) + session.add(node) + await session.commit() + return node + + +def _compatibility_for_meta(remote_version: str) -> str: + if not remote_version or remote_version == "0.0.0": + return "warning" + return compatibility(remote_version, CURRENT_PROTOCOL_VERSION) + + +async def list_known_public_nodes(session) -> List[Dict[str, Any]]: + rows = (await session.execute(select(KnownNode))).scalars().all() + result = [] + for r in rows: + meta = r.meta or {} + if not meta.get("is_public", True): + continue + result.append({ + "host": r.ip, + "port": r.port, + "public_key": r.public_key, + "version": meta.get("version"), + "compatibility": _compatibility_for_meta(meta.get("version", "0.0.0")), + "last_seen": (r.last_sync.isoformat() + "Z") if r.last_sync else None, + "public_host": meta.get("public_host"), + "capabilities": meta.get("capabilities") or {}, + }) + return result + + +async def _handshake_with(session, base_url: str) -> Optional[Dict[str, Any]]: + url = base_url.rstrip("/") + "/api/v1/network.handshake" + from .handshake import build_handshake_payload + payload = await build_handshake_payload(session) + timeout = httpx.Timeout(5.0, read=10.0) + async with httpx.AsyncClient(timeout=timeout, verify=NETWORK_TLS_VERIFY) as client: + r = await client.post(url, json=payload) + if r.status_code == 403 and NODE_PRIVACY == NODE_TYPE_PRIVATE: + # We are private; outbound is allowed, inbound denied by peers is fine + pass + r.raise_for_status() + data = r.json() + # Verify server signature if present + try: + import nacl.signing + from base58 import b58decode + required = ["server_signature", "server_public_key", "timestamp"] + if all(k in data for k in required): + signed_fields = {k: data[k] for k in data if k not in ("server_signature", "server_public_key")} + blob = json.dumps(signed_fields, sort_keys=True, separators=(",", ":")).encode() + vk = nacl.signing.VerifyKey(b58decode(data["server_public_key"])) + vk.verify(blob, b58decode(data["server_signature"])) + except Exception as e: + make_log("Handshake", f"Server signature verification failed for {base_url}: {e}", level='warning') + return data + + +async def pick_next_node(session) -> Optional[KnownNode]: + rows = (await session.execute(select(KnownNode))).scalars().all() + if not rows: + return None + # Prefer nodes with oldest last_sync + rows.sort(key=lambda r: (r.last_sync or datetime.fromtimestamp(0))) + now = _now() + for r in rows: + meta = r.meta or {} + compat = _compatibility_for_meta(meta.get("version", "0.0.0")) + if compat == "blocked": + last = datetime.fromisoformat(meta.get("unsupported_last_checked_at")) if meta.get("unsupported_last_checked_at") else None + if last and (now - last) < timedelta(seconds=UNSUPPORTED_RECHECK_INTERVAL_SEC): + continue + # Backoff after failures + if meta.get("last_failure_at"): + try: + last_fail = datetime.fromisoformat(meta.get("last_failure_at")) + fail_count = int(meta.get("fail_count", 1)) + # Exponential backoff: 30s * 2^fail_count, capped 2h + wait = min(7200, 30 * (2 ** max(0, fail_count))) + if (now - last_fail) < timedelta(seconds=wait): + continue + except Exception: + pass + return r + # If we only have unsupported nodes and all are within cooldown, skip this round + return None + + +async def perform_handshake_round(): + async with db_session(auto_commit=True) as session: + # Private nodes still do outbound handshakes; inbound typically unreachable without public endpoint + node = await pick_next_node(session) + if not node: + return + base_url = node.meta.get("public_host") or f"http://{node.ip}:{node.port}" + try: + resp = await _handshake_with(session, base_url) + # Merge known nodes received + for peer in (resp or {}).get("known_public_nodes", []): + try: + await upsert_known_node( + session, + host=peer.get("host") or peer.get("public_host") or "", + port=int(peer.get("port") or 80), + public_key=peer.get("public_key") or "", + meta={ + "is_public": True, + "version": peer.get("version") or "0.0.0", + "public_host": peer.get("public_host") or (f"http://{peer.get('host')}:{peer.get('port')}" if peer.get('host') else None), + } + ) + except Exception as e: + make_log("Handshake", f"Ignore bad peer from {base_url}: {e}", level='warning') + # Update last_sync and meta for node + node.last_sync = _now() + node.meta = {**(node.meta or {}), "last_response": resp, "fail_count": 0} + await session.commit() + make_log("Handshake", f"Handshake OK with {base_url}") + except Exception as e: + make_log("Handshake", f"Handshake failed with {base_url}: {e}", level='warning') + # Record incident-lite in meta + meta = node.meta or {} + meta["last_error"] = str(e) + meta["last_failure_at"] = _now().isoformat() + meta["fail_count"] = int(meta.get("fail_count", 0)) + 1 + node.meta = meta + await session.commit() + + +async def network_handshake_daemon(app): + # Stagger start a bit to allow HTTP server to come up + await asyncio.sleep(3) + make_log("Handshake", f"Daemon started; interval={HANDSHAKE_INTERVAL_SEC}s") + while True: + try: + await perform_handshake_round() + except Exception as e: + make_log("Handshake", f"Round error: {e}", level='error') + await asyncio.sleep(HANDSHAKE_INTERVAL_SEC) + + +async def bootstrap_once_and_exit_if_failed(): + # Do not try to bootstrap private nodes as inbound is blocked, but outbound required for seeds discovery + seeds = BOOTSTRAP_SEEDS or [] + if not seeds: + return # Nothing to do + async with db_session(auto_commit=True) as session: + # If we already know nodes, skip bootstrap + have_any = (await session.execute(select(KnownNode))).scalars().first() + if have_any: + return + make_log("Bootstrap", f"Starting bootstrap with seeds={seeds}; required={BOOTSTRAP_REQUIRED}") + + deadline = _now() + timedelta(seconds=BOOTSTRAP_TIMEOUT_SEC) + ok = False + for seed in seeds: + try: + async with db_session(auto_commit=True) as session: + resp = await _handshake_with(session, seed) + if resp: + ok = True + # Seed itself gets inserted by handshake handling route; also insert it explicitly + try: + await upsert_known_node( + session, + host=seed, + port=80, + public_key=resp.get("node", {}).get("public_key", ""), + meta={ + "is_public": True, + "version": resp.get("node", {}).get("version", "0.0.0"), + "public_host": resp.get("node", {}).get("public_host") or seed, + } + ) + except Exception: + pass + break + except Exception as e: + make_log("Bootstrap", f"Seed failed {seed}: {e}", level='warning') + if _now() > deadline: + break + + if BOOTSTRAP_REQUIRED and not ok: + make_log("Bootstrap", "Failed to reach any bootstrap seeds; exiting", level='error') + # Hard exit; Sanic won't stop otherwise + import os + os._exit(2) diff --git a/app/core/network/nodesig.py b/app/core/network/nodesig.py new file mode 100644 index 0000000..e428eb1 --- /dev/null +++ b/app/core/network/nodesig.py @@ -0,0 +1,70 @@ +from __future__ import annotations + +import base64 +import hashlib +import json +import secrets +import time +from typing import Dict, Tuple + +from base58 import b58decode, b58encode + +from app.core.network.guard import check_timestamp_fresh, check_and_remember_nonce + + +def _body_sha256(body: bytes) -> str: + h = hashlib.sha256() + h.update(body or b"") + return h.hexdigest() + + +def canonical_string(method: str, path: str, body: bytes, ts: int, nonce: str, node_id: str) -> bytes: + parts = [ + method.upper(), + path, + _body_sha256(body), + str(int(ts)), + str(nonce), + node_id, + ] + return ("\n".join(parts)).encode() + + +def sign_headers(method: str, path: str, body: bytes, sk_bytes: bytes, pk_b58: str) -> Dict[str, str]: + import nacl.signing + ts = int(time.time()) + nonce = secrets.token_hex(16) + msg = canonical_string(method, path, body, ts, nonce, pk_b58) + sig = nacl.signing.SigningKey(sk_bytes).sign(msg).signature + return { + "X-Node-Id": pk_b58, + "X-Node-Ts": str(ts), + "X-Node-Nonce": nonce, + "X-Node-Sig": b58encode(sig).decode(), + } + + +def verify_request(request, memory) -> Tuple[bool, str, str]: + """Verify NodeSig headers of an incoming Sanic request. + Returns (ok, node_id, error). ok==True if signature valid, timestamp fresh, nonce unused. + """ + try: + node_id = request.headers.get("X-Node-Id", "").strip() + ts = int(request.headers.get("X-Node-Ts", "0").strip() or 0) + nonce = request.headers.get("X-Node-Nonce", "").strip() + sig_b58 = request.headers.get("X-Node-Sig", "").strip() + if not node_id or not ts or not nonce or not sig_b58: + return False, "", "MISSING_HEADERS" + if not check_timestamp_fresh(ts): + return False, node_id, "STALE_TS" + if not check_and_remember_nonce(memory, node_id, nonce): + return False, node_id, "NONCE_REPLAY" + import nacl.signing + vk = nacl.signing.VerifyKey(b58decode(node_id)) + sig = b58decode(sig_b58) + msg = canonical_string(request.method, request.path, request.body or b"", ts, nonce, node_id) + vk.verify(msg, sig) + return True, node_id, "" + except Exception as e: + return False, "", f"BAD_SIGNATURE: {e}" + diff --git a/app/core/network/semver.py b/app/core/network/semver.py new file mode 100644 index 0000000..927b597 --- /dev/null +++ b/app/core/network/semver.py @@ -0,0 +1,26 @@ +from typing import Tuple + + +def parse_semver(v: str) -> Tuple[int, int, int]: + try: + parts = v.split(".") + major = int(parts[0]) + minor = int(parts[1]) if len(parts) > 1 else 0 + patch = int(parts[2]) if len(parts) > 2 else 0 + return major, minor, patch + except Exception: + return 0, 0, 0 + + +def compatibility(peer: str, current: str) -> str: + """Return one of: compatible, warning, blocked""" + pM, pm, pp = parse_semver(peer) + cM, cm, cp = parse_semver(current) + if pM != cM: + return "blocked" + # Same major + if pm == cm: + return "compatible" + # Different minor within same major => warning + return "warning" + diff --git a/docs/indexation.md b/docs/indexation.md index 9a97bb4..ec986a7 100644 --- a/docs/indexation.md +++ b/docs/indexation.md @@ -38,4 +38,60 @@ values:^[ 2. User uploads content cover to server (/api/v1/storage) 3. User send /api/v1/blockchain.sendNewContentMessage to server and accept the transaction in wallet 4. Indexer receives the transaction and indexes the content. And send telegram notification to user. +# Network Index & Sync (v3) + +This document describes the simplified, production‑ready stack for content discovery and sync: + +- Upload via tus → stream encrypt (ENCF v1, AES‑GCM‑SIV, 1 MiB chunks) → `ipfs add --cid-version=1 --raw-leaves --chunker=size-1048576 --pin`. +- Public index exposes only encrypted sources (CID) and safe metadata; no plaintext ids. +- Nodes full‑sync by pinning encrypted CIDs; keys are auto‑granted to trusted peers for preview/full access. + +## ENCF v1 (Encrypted Content Format) + +Unencrypted header and framed body; same bytes on all nodes ⇒ stable CID. + +Header (all big endian): + +``` +MAGIC(4): 'ENCF' +VER(1): 0x01 +SCHEME(1): 0x01 = AES_GCM_SIV (0x02 AES_SIV legacy) +CHUNK(4): plaintext chunk bytes (1048576) +SALT_LEN(1) +SALT(N) +RESERVED(5): zeros +``` + +Body: repeated frames `[p_len:4][cipher][tag(16)]` where `p_len <= CHUNK` for last frame. + +AES‑GCM‑SIV per frame, deterministic `nonce = HMAC_SHA256(salt, u64(frame_idx))[:12]`, AAD unused. + +## API + +- `GET /api/v1/content.index` → `{ items:[...], schema, ETag }` with signed items. +- `GET /api/v1/content.delta?since=ISO8601` → `{ items:[...], next_since, schema }` with ETag. +- `POST /api/v1/sync.pin` (NodeSig required) → queue/pin CID. +- `POST /api/v1/keys.request` (NodeSig required) → sealed DEK for trusted peers. +- `GET /api/v1/content.derivatives?cid=` → local ready derivatives (low/high/preview). + +## NodeSig + +Canonical string: + +``` +METHOD\nPATH\nSHA256(body)\nTS\nNONCE\nNODE_ID +``` + +Headers: `X-Node-Id`, `X-Node-Ts`, `X-Node-Nonce`, `X-Node-Sig`. +Window ±120s, nonce cache ~10min; replay → 401. + +## Sync daemon + +- Jitter 0–30s per peer; uses ETag/`since`. +- Disk watermark (`SYNC_DISK_LOW_WATERMARK_PCT`) stops pin burst. +- Pinned concurrently (`SYNC_MAX_CONCURRENT_PINS`) with pre‑`findprovs` `swarm/connect`. + +## Keys policy + +`KEY_AUTO_GRANT_TRUSTED_ONLY=1` — only KnownNode.meta.role=='trusted' gets DEK automatically. Preview lease TTL via `KEY_GRANT_PREVIEW_TTL_SEC`. diff --git a/requirements.txt b/requirements.txt index 402a037..229ebf9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -17,3 +17,4 @@ pydub==0.25.1 pillow==10.2.0 ffmpeg-python==0.2.0 python-magic==0.4.27 +gcm_siv==1.0.0