266 lines
12 KiB
Python
266 lines
12 KiB
Python
from __future__ import annotations
|
||
|
||
import json
|
||
import logging
|
||
from datetime import datetime
|
||
from typing import Dict, Any, List, Optional
|
||
|
||
from fastapi import APIRouter, HTTPException, Request, Depends, status
|
||
from fastapi.responses import JSONResponse
|
||
|
||
from app.core.crypto import get_ed25519_manager
|
||
from app.core.content.chunk_manager import ChunkManager
|
||
from app.core.content.sync_manager import ContentSyncManager
|
||
from app.core.models.content.chunk import ContentChunk
|
||
from app.core.models.api.sync_models import (
|
||
ContentRequest,
|
||
ContentProvideResponse,
|
||
ContentStatusResponse,
|
||
ContentVerifyRequest,
|
||
)
|
||
from app.core.validation.content_validator import ContentValidator
|
||
from app.core.validation.integrity_checker import IntegrityChecker
|
||
from app.core.validation.trust_manager import TrustManager
|
||
from app.core.models.validation.validation_models import ContentSignature, ValidationResult
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
router = APIRouter(prefix="/api/node/content", tags=["node-content-sync"])
|
||
|
||
# Глобальные вспомогательные объекты (можно заменить DI при необходимости)
|
||
_trust_manager = TrustManager()
|
||
_content_validator = ContentValidator()
|
||
_integrity_checker = IntegrityChecker()
|
||
|
||
|
||
async def _verify_inter_node_request(request: Request) -> Dict[str, Any]:
|
||
"""
|
||
Проверка заголовков и Ed25519 подписи межузлового запроса.
|
||
Используем ту же схему заголовков, что и в fastapi_node_routes.
|
||
Дополнительно — первичная фильтрация по доверию ноды (blacklist/override/score).
|
||
"""
|
||
required_headers = ["x-node-communication", "x-node-id", "x-node-public-key", "x-node-signature"]
|
||
for header in required_headers:
|
||
if header not in request.headers:
|
||
logger.warning("Missing header on inter-node request: %s", header)
|
||
raise HTTPException(status_code=400, detail=f"Missing required header: {header}")
|
||
|
||
if request.headers.get("x-node-communication") != "true":
|
||
raise HTTPException(status_code=400, detail="Not a valid inter-node communication")
|
||
|
||
body = await request.body()
|
||
if not body:
|
||
raise HTTPException(status_code=400, detail="Empty message body")
|
||
|
||
try:
|
||
message_data = json.loads(body.decode("utf-8"))
|
||
except json.JSONDecodeError:
|
||
raise HTTPException(status_code=400, detail="Invalid JSON in request body")
|
||
|
||
signature = request.headers.get("x-node-signature")
|
||
node_id = request.headers.get("x-node-id")
|
||
public_key = request.headers.get("x-node-public-key")
|
||
|
||
# Проверка подписи межузлового сообщения
|
||
crypto_manager = get_ed25519_manager()
|
||
is_valid = crypto_manager.verify_signature(message_data, signature, public_key)
|
||
if not is_valid:
|
||
logger.warning("Invalid signature from node %s", node_id)
|
||
# При невалидной подписи сразу штрафуем доверие и отклоняем
|
||
_trust_manager.update_trust_score(node_id, delta=-0.2, reason="invalid_inter_node_signature")
|
||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Invalid cryptographic signature")
|
||
|
||
# Обновим доверие за валидную подпись
|
||
_trust_manager.update_trust_score(node_id, delta=0.02, reason="valid_inter_node_signature")
|
||
|
||
# Проверка blacklist/override/минимального порога
|
||
if not _trust_manager.is_node_trusted(node_id):
|
||
logger.warning("Request rejected by trust policy: node_id=%s", node_id)
|
||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Untrusted node")
|
||
|
||
request.state.inter_node_communication = True
|
||
request.state.source_node_id = node_id
|
||
request.state.source_public_key = public_key
|
||
return {"node_id": node_id, "public_key": public_key, "message": message_data}
|
||
|
||
|
||
def _create_signed_response(data: Dict[str, Any]) -> JSONResponse:
|
||
"""Формирование подписанного ответа и стандартных заголовков межузлового взаимодействия."""
|
||
crypto_manager = get_ed25519_manager()
|
||
payload = {
|
||
"success": True,
|
||
"timestamp": datetime.utcnow().isoformat(),
|
||
"node_id": crypto_manager.node_id,
|
||
"data": data,
|
||
}
|
||
signature = crypto_manager.sign_message(payload)
|
||
headers = {
|
||
"X-Node-ID": crypto_manager.node_id,
|
||
"X-Node-Public-Key": crypto_manager.public_key_hex,
|
||
"X-Node-Communication": "true",
|
||
"X-Node-Signature": signature,
|
||
}
|
||
return JSONResponse(content=payload, headers=headers)
|
||
|
||
|
||
@router.post("/sync")
|
||
async def node_content_sync(request: Request, body: ContentRequest):
|
||
"""
|
||
POST /api/node/content/sync
|
||
Универсальный endpoint для межузловой синхронизации чанков.
|
||
|
||
Поддерживаемые сценарии:
|
||
- sync_type == "content_request": получить набор чанков по content_id и списку индексов
|
||
ожидается content_info: { content_id: str, indexes: List[int] }
|
||
Ответ: ContentProvideResponse со списком чанков (валидированные и подписанные при создании).
|
||
- sync_type == "new_content": уведомление о новом контенте (пока лишь логируем, ок подтверждаем)
|
||
- sync_type == "content_list": запрос списка контента (пока возвращаем пусто)
|
||
"""
|
||
# Проверка подписи и доверия запроса
|
||
ctx = await _verify_inter_node_request(request)
|
||
source_node_id = ctx["node_id"]
|
||
|
||
sync_mgr = ContentSyncManager()
|
||
chunk_mgr = sync_mgr.chunk_manager
|
||
|
||
try:
|
||
if body.sync_type == "content_request":
|
||
content_info = body.content_info
|
||
content_id = content_info["content_id"]
|
||
indexes: List[int] = list(map(int, content_info["indexes"]))
|
||
|
||
# Локальный storage_reader. В реальном проекте заменить на обращение к хранилищу чанков.
|
||
def storage_reader(cid: str, idx: int) -> Optional[ContentChunk]:
|
||
# Здесь можно реализовать доступ к БД/файловой системе. Пока возвращаем None.
|
||
return None
|
||
|
||
provided = await sync_mgr.provide_chunks(
|
||
content_id=content_id,
|
||
indexes=indexes,
|
||
storage_reader=storage_reader,
|
||
)
|
||
# Доп. защита: прогоняем полученные чанки через IntegrityChecker (если есть)
|
||
chunks_models: List[ContentChunk] = []
|
||
for c in provided.get("chunks", []):
|
||
try:
|
||
chunks_models.append(ContentChunk.from_dict(c))
|
||
except Exception as e:
|
||
logger.error("content_request: invalid provided chunk from storage: %s", e)
|
||
|
||
if chunks_models:
|
||
chain_result = _integrity_checker.verify_content_chain(chunks_models, verify_signatures=True)
|
||
if not chain_result.ok:
|
||
logger.warning("integrity check failed for provided chunks: %s", chain_result.reason)
|
||
# Понижаем доверие источнику запроса (как попытка манипуляции/атаки)
|
||
_trust_manager.update_trust_score(source_node_id, delta=-0.05, reason="invalid_chain_on_provide")
|
||
|
||
# Pydantic-ответ
|
||
resp = ContentProvideResponse(
|
||
success=True,
|
||
chunks=[c.to_dict() for c in chunks_models],
|
||
errors=provided.get("errors", []),
|
||
)
|
||
return _create_signed_response(resp.dict())
|
||
|
||
elif body.sync_type == "new_content":
|
||
# Нода сообщает о новом контенте — можно валидировать метаданные/подписи при наличии
|
||
logger.info("new_content received: %s", body.content_info)
|
||
_trust_manager.update_trust_score(source_node_id, delta=0.01, reason="announce_new_content")
|
||
return _create_signed_response({"sync_result": "ack", "info": body.content_info})
|
||
|
||
elif body.sync_type == "content_list":
|
||
return _create_signed_response({"content_list": [], "total_items": 0})
|
||
|
||
else:
|
||
raise HTTPException(status_code=400, detail=f"Unknown sync_type: {body.sync_type}")
|
||
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
logger.exception("node_content_sync error")
|
||
_trust_manager.update_trust_score(source_node_id, delta=-0.02, reason="sync_handler_exception")
|
||
raise HTTPException(status_code=500, detail=str(e))
|
||
|
||
|
||
@router.get("/status/{content_id}")
|
||
async def node_content_status(content_id: str):
|
||
"""
|
||
GET /api/node/content/status/{content_id}
|
||
Вернуть статус хранения контента на ноде:
|
||
- какие индексы имеются
|
||
- какие отсутствуют
|
||
- общий ожидаемый total_chunks (если известен; иначе 0)
|
||
"""
|
||
try:
|
||
have_indexes: List[int] = []
|
||
total_chunks = 0
|
||
missing = sorted(set(range(total_chunks)) - set(have_indexes)) if total_chunks else []
|
||
|
||
resp = ContentStatusResponse(
|
||
content_id=content_id,
|
||
total_chunks=total_chunks,
|
||
have_indexes=have_indexes,
|
||
missing_indexes=missing,
|
||
verified=None,
|
||
message="ok",
|
||
)
|
||
return resp.dict()
|
||
except Exception as e:
|
||
logger.exception("node_content_status error")
|
||
raise HTTPException(status_code=500, detail=str(e))
|
||
|
||
|
||
@router.post("/verify")
|
||
async def node_content_verify(request: Request, body: ContentVerifyRequest):
|
||
"""
|
||
POST /api/node/content/verify
|
||
Проверка валидности набора чанков (хеш и Ed25519 подпись каждой записи),
|
||
а также расширенная проверка целостности цепочки чанков и оценка доверия источнику.
|
||
"""
|
||
ctx = await _verify_inter_node_request(request)
|
||
source_node_id = ctx["node_id"]
|
||
source_pubkey = ctx["public_key"]
|
||
|
||
try:
|
||
chunk_mgr = ChunkManager()
|
||
errors: List[Dict[str, Any]] = []
|
||
ok_count = 0
|
||
chunks_models: List[ContentChunk] = []
|
||
|
||
for ch in body.chunks:
|
||
try:
|
||
model = ContentChunk.from_dict(ch.dict())
|
||
chunks_models.append(model)
|
||
ok, err = chunk_mgr.verify_chunk_integrity(model, verify_signature=body.verify_signatures)
|
||
if not ok:
|
||
errors.append({"chunk_id": model.chunk_id, "error": err})
|
||
else:
|
||
ok_count += 1
|
||
except Exception as ce:
|
||
logger.error("verify: failed to parse/validate chunk", extra={"error": str(ce)})
|
||
errors.append({"error": str(ce), "chunk_ref": ch.dict()})
|
||
|
||
# Дополнительно проверим целостность всей цепочки
|
||
if chunks_models:
|
||
chain_res = _integrity_checker.verify_content_chain(chunks_models, verify_signatures=body.verify_signatures)
|
||
if not chain_res.ok:
|
||
errors.append({"chain_error": chain_res.reason, "details": chain_res.details})
|
||
|
||
# Итоговая оценка доверия по исходу операции
|
||
if errors:
|
||
_trust_manager.update_trust_score(source_node_id, delta=-0.05, reason="verify_errors_detected")
|
||
else:
|
||
_trust_manager.update_trust_score(source_node_id, delta=0.02, reason="verify_ok")
|
||
|
||
result = {
|
||
"verified_ok": ok_count,
|
||
"errors": errors,
|
||
"trust": _trust_manager.assess_node_trust(source_node_id).to_dict(),
|
||
}
|
||
return _create_signed_response(result)
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
logger.exception("node_content_verify error")
|
||
_trust_manager.update_trust_score(source_node_id, delta=-0.02, reason="verify_exception")
|
||
raise HTTPException(status_code=500, detail=str(e)) |