from __future__ import annotations import json import logging from datetime import datetime from typing import Dict, Any, List, Optional from fastapi import APIRouter, HTTPException, Request, Depends, status from fastapi.responses import JSONResponse from app.core.crypto import get_ed25519_manager from app.core.content.chunk_manager import ChunkManager from app.core.content.sync_manager import ContentSyncManager from app.core.models.content.chunk import ContentChunk from app.core.models.api.sync_models import ( ContentRequest, ContentProvideResponse, ContentStatusResponse, ContentVerifyRequest, ) from app.core.validation.content_validator import ContentValidator from app.core.validation.integrity_checker import IntegrityChecker from app.core.validation.trust_manager import TrustManager from app.core.models.validation.validation_models import ContentSignature, ValidationResult logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/node/content", tags=["node-content-sync"]) # Глобальные вспомогательные объекты (можно заменить DI при необходимости) _trust_manager = TrustManager() _content_validator = ContentValidator() _integrity_checker = IntegrityChecker() async def _verify_inter_node_request(request: Request) -> Dict[str, Any]: """ Проверка заголовков и Ed25519 подписи межузлового запроса. Используем ту же схему заголовков, что и в fastapi_node_routes. Дополнительно — первичная фильтрация по доверию ноды (blacklist/override/score). """ required_headers = ["x-node-communication", "x-node-id", "x-node-public-key", "x-node-signature"] for header in required_headers: if header not in request.headers: logger.warning("Missing header on inter-node request: %s", header) raise HTTPException(status_code=400, detail=f"Missing required header: {header}") if request.headers.get("x-node-communication") != "true": raise HTTPException(status_code=400, detail="Not a valid inter-node communication") body = await request.body() if not body: raise HTTPException(status_code=400, detail="Empty message body") try: message_data = json.loads(body.decode("utf-8")) except json.JSONDecodeError: raise HTTPException(status_code=400, detail="Invalid JSON in request body") signature = request.headers.get("x-node-signature") node_id = request.headers.get("x-node-id") public_key = request.headers.get("x-node-public-key") # Проверка подписи межузлового сообщения crypto_manager = get_ed25519_manager() is_valid = crypto_manager.verify_signature(message_data, signature, public_key) if not is_valid: logger.warning("Invalid signature from node %s", node_id) # При невалидной подписи сразу штрафуем доверие и отклоняем _trust_manager.update_trust_score(node_id, delta=-0.2, reason="invalid_inter_node_signature") raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Invalid cryptographic signature") # Обновим доверие за валидную подпись _trust_manager.update_trust_score(node_id, delta=0.02, reason="valid_inter_node_signature") # Проверка blacklist/override/минимального порога if not _trust_manager.is_node_trusted(node_id): logger.warning("Request rejected by trust policy: node_id=%s", node_id) raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Untrusted node") request.state.inter_node_communication = True request.state.source_node_id = node_id request.state.source_public_key = public_key return {"node_id": node_id, "public_key": public_key, "message": message_data} def _create_signed_response(data: Dict[str, Any]) -> JSONResponse: """Формирование подписанного ответа и стандартных заголовков межузлового взаимодействия.""" crypto_manager = get_ed25519_manager() payload = { "success": True, "timestamp": datetime.utcnow().isoformat(), "node_id": crypto_manager.node_id, "data": data, } signature = crypto_manager.sign_message(payload) headers = { "X-Node-ID": crypto_manager.node_id, "X-Node-Public-Key": crypto_manager.public_key_hex, "X-Node-Communication": "true", "X-Node-Signature": signature, } return JSONResponse(content=payload, headers=headers) @router.post("/sync") async def node_content_sync(request: Request, body: ContentRequest): """ POST /api/node/content/sync Универсальный endpoint для межузловой синхронизации чанков. Поддерживаемые сценарии: - sync_type == "content_request": получить набор чанков по content_id и списку индексов ожидается content_info: { content_id: str, indexes: List[int] } Ответ: ContentProvideResponse со списком чанков (валидированные и подписанные при создании). - sync_type == "new_content": уведомление о новом контенте (пока лишь логируем, ок подтверждаем) - sync_type == "content_list": запрос списка контента (пока возвращаем пусто) """ # Проверка подписи и доверия запроса ctx = await _verify_inter_node_request(request) source_node_id = ctx["node_id"] sync_mgr = ContentSyncManager() chunk_mgr = sync_mgr.chunk_manager try: if body.sync_type == "content_request": content_info = body.content_info content_id = content_info["content_id"] indexes: List[int] = list(map(int, content_info["indexes"])) # Локальный storage_reader. В реальном проекте заменить на обращение к хранилищу чанков. def storage_reader(cid: str, idx: int) -> Optional[ContentChunk]: # Здесь можно реализовать доступ к БД/файловой системе. Пока возвращаем None. return None provided = await sync_mgr.provide_chunks( content_id=content_id, indexes=indexes, storage_reader=storage_reader, ) # Доп. защита: прогоняем полученные чанки через IntegrityChecker (если есть) chunks_models: List[ContentChunk] = [] for c in provided.get("chunks", []): try: chunks_models.append(ContentChunk.from_dict(c)) except Exception as e: logger.error("content_request: invalid provided chunk from storage: %s", e) if chunks_models: chain_result = _integrity_checker.verify_content_chain(chunks_models, verify_signatures=True) if not chain_result.ok: logger.warning("integrity check failed for provided chunks: %s", chain_result.reason) # Понижаем доверие источнику запроса (как попытка манипуляции/атаки) _trust_manager.update_trust_score(source_node_id, delta=-0.05, reason="invalid_chain_on_provide") # Pydantic-ответ resp = ContentProvideResponse( success=True, chunks=[c.to_dict() for c in chunks_models], errors=provided.get("errors", []), ) return _create_signed_response(resp.dict()) elif body.sync_type == "new_content": # Нода сообщает о новом контенте — можно валидировать метаданные/подписи при наличии logger.info("new_content received: %s", body.content_info) _trust_manager.update_trust_score(source_node_id, delta=0.01, reason="announce_new_content") return _create_signed_response({"sync_result": "ack", "info": body.content_info}) elif body.sync_type == "content_list": return _create_signed_response({"content_list": [], "total_items": 0}) else: raise HTTPException(status_code=400, detail=f"Unknown sync_type: {body.sync_type}") except HTTPException: raise except Exception as e: logger.exception("node_content_sync error") _trust_manager.update_trust_score(source_node_id, delta=-0.02, reason="sync_handler_exception") raise HTTPException(status_code=500, detail=str(e)) @router.get("/status/{content_id}") async def node_content_status(content_id: str): """ GET /api/node/content/status/{content_id} Вернуть статус хранения контента на ноде: - какие индексы имеются - какие отсутствуют - общий ожидаемый total_chunks (если известен; иначе 0) """ try: have_indexes: List[int] = [] total_chunks = 0 missing = sorted(set(range(total_chunks)) - set(have_indexes)) if total_chunks else [] resp = ContentStatusResponse( content_id=content_id, total_chunks=total_chunks, have_indexes=have_indexes, missing_indexes=missing, verified=None, message="ok", ) return resp.dict() except Exception as e: logger.exception("node_content_status error") raise HTTPException(status_code=500, detail=str(e)) @router.post("/verify") async def node_content_verify(request: Request, body: ContentVerifyRequest): """ POST /api/node/content/verify Проверка валидности набора чанков (хеш и Ed25519 подпись каждой записи), а также расширенная проверка целостности цепочки чанков и оценка доверия источнику. """ ctx = await _verify_inter_node_request(request) source_node_id = ctx["node_id"] source_pubkey = ctx["public_key"] try: chunk_mgr = ChunkManager() errors: List[Dict[str, Any]] = [] ok_count = 0 chunks_models: List[ContentChunk] = [] for ch in body.chunks: try: model = ContentChunk.from_dict(ch.dict()) chunks_models.append(model) ok, err = chunk_mgr.verify_chunk_integrity(model, verify_signature=body.verify_signatures) if not ok: errors.append({"chunk_id": model.chunk_id, "error": err}) else: ok_count += 1 except Exception as ce: logger.error("verify: failed to parse/validate chunk", extra={"error": str(ce)}) errors.append({"error": str(ce), "chunk_ref": ch.dict()}) # Дополнительно проверим целостность всей цепочки if chunks_models: chain_res = _integrity_checker.verify_content_chain(chunks_models, verify_signatures=body.verify_signatures) if not chain_res.ok: errors.append({"chain_error": chain_res.reason, "details": chain_res.details}) # Итоговая оценка доверия по исходу операции if errors: _trust_manager.update_trust_score(source_node_id, delta=-0.05, reason="verify_errors_detected") else: _trust_manager.update_trust_score(source_node_id, delta=0.02, reason="verify_ok") result = { "verified_ok": ok_count, "errors": errors, "trust": _trust_manager.assess_node_trust(source_node_id).to_dict(), } return _create_signed_response(result) except HTTPException: raise except Exception as e: logger.exception("node_content_verify error") _trust_manager.update_trust_score(source_node_id, delta=-0.02, reason="verify_exception") raise HTTPException(status_code=500, detail=str(e))