434 lines
16 KiB
Python
434 lines
16 KiB
Python
"""
|
||
FastAPI маршруты для межузлового общения с ed25519 подписями
|
||
"""
|
||
import json
|
||
from typing import Dict, Any, Optional
|
||
from datetime import datetime
|
||
|
||
from fastapi import APIRouter, HTTPException, Request, Depends
|
||
from fastapi.responses import JSONResponse
|
||
|
||
from app.core.crypto import get_ed25519_manager
|
||
from app.core.logging import get_logger
|
||
from app.core.database import get_cache_manager
|
||
|
||
logger = get_logger(__name__)
|
||
|
||
# Router для межузловых коммуникаций в FastAPI
|
||
router = APIRouter(prefix="/api/node", tags=["node-communication"])
|
||
|
||
|
||
async def validate_node_request(request: Request) -> Dict[str, Any]:
|
||
"""Валидация межузлового запроса с обязательной проверкой подписи"""
|
||
# Проверяем наличие обязательных заголовков
|
||
required_headers = ["x-node-communication", "x-node-id", "x-node-public-key", "x-node-signature"]
|
||
for header in required_headers:
|
||
if header not in request.headers:
|
||
raise HTTPException(status_code=400, detail=f"Missing required header: {header}")
|
||
|
||
# Проверяем, что это межузловое общение
|
||
if request.headers.get("x-node-communication") != "true":
|
||
raise HTTPException(status_code=400, detail="Not a valid inter-node communication")
|
||
|
||
try:
|
||
crypto_manager = get_ed25519_manager()
|
||
|
||
# Получаем заголовки
|
||
signature = request.headers.get("x-node-signature")
|
||
node_id = request.headers.get("x-node-id")
|
||
public_key = request.headers.get("x-node-public-key")
|
||
|
||
# Читаем тело запроса
|
||
body = await request.body()
|
||
if not body:
|
||
raise HTTPException(status_code=400, detail="Empty message body")
|
||
|
||
try:
|
||
message_data = json.loads(body.decode())
|
||
# Anti-replay: validate timestamp and nonce
|
||
try:
|
||
ts = message_data.get("timestamp")
|
||
nonce = message_data.get("nonce")
|
||
if ts:
|
||
from datetime import datetime, timezone
|
||
now = datetime.now(timezone.utc).timestamp()
|
||
if abs(float(ts) - float(now)) > 300:
|
||
raise HTTPException(status_code=400, detail="stale timestamp")
|
||
if nonce:
|
||
cache = await get_cache_manager()
|
||
cache_key = f"replay:{node_id}:{nonce}"
|
||
if await cache.get(cache_key):
|
||
raise HTTPException(status_code=400, detail="replay detected")
|
||
await cache.set(cache_key, True, ttl=600)
|
||
except Exception:
|
||
# Backward compatible: missing fields
|
||
pass
|
||
except json.JSONDecodeError:
|
||
raise HTTPException(status_code=400, detail="Invalid JSON in request body")
|
||
|
||
# Проверяем подпись
|
||
is_valid = crypto_manager.verify_signature(message_data, signature, public_key)
|
||
|
||
if not is_valid:
|
||
logger.warning(f"Invalid signature from node {node_id}")
|
||
raise HTTPException(status_code=403, detail="Invalid cryptographic signature")
|
||
|
||
logger.debug(f"Valid signature verified for node {node_id}")
|
||
|
||
return {
|
||
"node_id": node_id,
|
||
"public_key": public_key,
|
||
"message": message_data
|
||
}
|
||
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
logger.error(f"Crypto verification error: {e}")
|
||
raise HTTPException(status_code=500, detail="Cryptographic verification failed")
|
||
|
||
|
||
async def create_node_response(data: Dict[str, Any], request: Request) -> JSONResponse:
|
||
"""Создать ответ для межузлового общения с подписью"""
|
||
try:
|
||
crypto_manager = get_ed25519_manager()
|
||
|
||
# Добавляем информацию о нашей ноде
|
||
response_data = {
|
||
"success": True,
|
||
"timestamp": datetime.utcnow().isoformat(),
|
||
"node_id": crypto_manager.node_id,
|
||
"data": data
|
||
}
|
||
|
||
# Подписываем ответ
|
||
signature = crypto_manager.sign_message(response_data)
|
||
|
||
# Создаем ответ с заголовками
|
||
headers = {
|
||
"X-Node-ID": crypto_manager.node_id,
|
||
"X-Node-Public-Key": crypto_manager.public_key_hex,
|
||
"X-Node-Communication": "true",
|
||
"X-Node-Signature": signature
|
||
}
|
||
|
||
return JSONResponse(content=response_data, headers=headers)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error creating node response: {e}")
|
||
raise HTTPException(status_code=500, detail="Failed to create signed response")
|
||
|
||
|
||
@router.post("/handshake")
|
||
async def node_handshake(request: Request):
|
||
"""
|
||
Обработка хэндшейка между нодами
|
||
|
||
Ожидаемый формат сообщения:
|
||
{
|
||
"action": "handshake",
|
||
"node_info": {
|
||
"node_id": "...",
|
||
"version": "...",
|
||
"capabilities": [...],
|
||
"network_info": {...}
|
||
},
|
||
"timestamp": "..."
|
||
}
|
||
"""
|
||
try:
|
||
# Валидация межузлового запроса
|
||
node_data = await validate_node_request(request)
|
||
message = node_data["message"]
|
||
source_node_id = node_data["node_id"]
|
||
|
||
logger.info(f"Handshake request from node {source_node_id}")
|
||
|
||
# Проверяем формат сообщения хэндшейка
|
||
if message.get("action") != "handshake":
|
||
raise HTTPException(status_code=400, detail="Invalid handshake message format")
|
||
|
||
node_info = message.get("node_info", {})
|
||
if not node_info.get("node_id") or not node_info.get("version"):
|
||
raise HTTPException(status_code=400, detail="Missing required node information")
|
||
|
||
# Создаем информацию о нашей ноде для ответа
|
||
crypto_manager = get_ed25519_manager()
|
||
our_node_info = {
|
||
"node_id": crypto_manager.node_id,
|
||
"version": "3.0.0", # Версия MY Network
|
||
"capabilities": [
|
||
"content_upload",
|
||
"content_sync",
|
||
"decentralized_filtering",
|
||
"ed25519_signatures"
|
||
],
|
||
"network_info": {
|
||
"public_key": crypto_manager.public_key_hex,
|
||
"protocol_version": "1.0"
|
||
}
|
||
}
|
||
|
||
# Сохраняем информацию о ноде (здесь можно добавить в базу данных)
|
||
logger.info(f"Successful handshake with node {source_node_id}",
|
||
extra={"peer_node_info": node_info})
|
||
|
||
response_data = {
|
||
"handshake_accepted": True,
|
||
"node_info": our_node_info
|
||
}
|
||
|
||
return await create_node_response(response_data, request)
|
||
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
logger.error(f"Handshake error: {e}")
|
||
raise HTTPException(status_code=500, detail="Internal server error")
|
||
|
||
|
||
@router.post("/content/sync")
|
||
async def content_sync(request: Request):
|
||
"""
|
||
Синхронизация контента между нодами
|
||
|
||
Ожидаемый формат сообщения:
|
||
{
|
||
"action": "content_sync",
|
||
"sync_type": "new_content|content_list|content_request",
|
||
"content_info": {...},
|
||
"timestamp": "..."
|
||
}
|
||
"""
|
||
try:
|
||
# Валидация межузлового запроса
|
||
node_data = await validate_node_request(request)
|
||
message = node_data["message"]
|
||
source_node_id = node_data["node_id"]
|
||
|
||
logger.info(f"Content sync request from node {source_node_id}")
|
||
|
||
# Проверяем формат сообщения синхронизации
|
||
if message.get("action") != "content_sync":
|
||
raise HTTPException(status_code=400, detail="Invalid sync message format")
|
||
|
||
sync_type = message.get("sync_type")
|
||
content_info = message.get("content_info", {})
|
||
|
||
if sync_type == "new_content":
|
||
# Обработка нового контента от другой ноды
|
||
content_hash = content_info.get("hash")
|
||
if not content_hash:
|
||
raise HTTPException(status_code=400, detail="Missing content hash")
|
||
|
||
# Здесь добавить логику обработки нового контента
|
||
# через decentralized_filter и content_storage_manager
|
||
|
||
response_data = {
|
||
"sync_result": "content_accepted",
|
||
"content_hash": content_hash
|
||
}
|
||
|
||
elif sync_type == "content_list":
|
||
# Запрос списка доступного контента
|
||
# Здесь добавить логику получения списка контента
|
||
|
||
response_data = {
|
||
"content_list": [], # Заглушка - добавить реальный список
|
||
"total_items": 0
|
||
}
|
||
|
||
elif sync_type == "content_request":
|
||
# Запрос конкретного контента
|
||
requested_hash = content_info.get("hash")
|
||
if not requested_hash:
|
||
raise HTTPException(status_code=400, detail="Missing content hash for request")
|
||
|
||
# Здесь добавить логику поиска и передачи контента
|
||
|
||
response_data = {
|
||
"content_found": False, # Заглушка - добавить реальную проверку
|
||
"content_hash": requested_hash
|
||
}
|
||
|
||
else:
|
||
raise HTTPException(status_code=400, detail=f"Unknown sync type: {sync_type}")
|
||
|
||
return await create_node_response(response_data, request)
|
||
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
logger.error(f"Content sync error: {e}")
|
||
raise HTTPException(status_code=500, detail="Internal server error")
|
||
|
||
|
||
@router.post("/network/ping")
|
||
async def network_ping(request: Request):
|
||
"""
|
||
Пинг между нодами для проверки доступности
|
||
|
||
Ожидаемый формат сообщения:
|
||
{
|
||
"action": "ping",
|
||
"timestamp": "...",
|
||
"data": {...}
|
||
}
|
||
"""
|
||
try:
|
||
# Валидация межузлового запроса
|
||
node_data = await validate_node_request(request)
|
||
message = node_data["message"]
|
||
source_node_id = node_data["node_id"]
|
||
|
||
logger.debug(f"Ping from node {source_node_id}")
|
||
|
||
# Проверяем формат пинга
|
||
if message.get("action") != "ping":
|
||
raise HTTPException(status_code=400, detail="Invalid ping message format")
|
||
|
||
# Создаем ответ pong
|
||
response_data = {
|
||
"action": "pong",
|
||
"ping_timestamp": message.get("timestamp"),
|
||
"response_timestamp": datetime.utcnow().isoformat()
|
||
}
|
||
|
||
return await create_node_response(response_data, request)
|
||
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
logger.error(f"Ping error: {e}")
|
||
raise HTTPException(status_code=500, detail="Internal server error")
|
||
|
||
|
||
@router.get("/network/status")
|
||
async def network_status():
|
||
"""
|
||
Получение статуса ноды (GET запрос без обязательной подписи)
|
||
"""
|
||
try:
|
||
crypto_manager = get_ed25519_manager()
|
||
|
||
status_data = {
|
||
"node_id": crypto_manager.node_id,
|
||
"public_key": crypto_manager.public_key_hex,
|
||
"version": "3.0.0",
|
||
"status": "active",
|
||
"capabilities": [
|
||
"content_upload",
|
||
"content_sync",
|
||
"decentralized_filtering",
|
||
"ed25519_signatures"
|
||
],
|
||
"timestamp": datetime.utcnow().isoformat()
|
||
}
|
||
|
||
return {
|
||
"success": True,
|
||
"data": status_data
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"Status error: {e}")
|
||
raise HTTPException(status_code=500, detail="Internal server error")
|
||
|
||
|
||
@router.post("/network/discover")
|
||
async def network_discover(request: Request):
|
||
"""
|
||
Обнаружение и обмен информацией о других нодах в сети
|
||
|
||
Ожидаемый формат сообщения:
|
||
{
|
||
"action": "discover",
|
||
"known_nodes": [...],
|
||
"timestamp": "..."
|
||
}
|
||
"""
|
||
try:
|
||
# Валидация межузлового запроса
|
||
node_data = await validate_node_request(request)
|
||
message = node_data["message"]
|
||
source_node_id = node_data["node_id"]
|
||
|
||
logger.info(f"Discovery request from node {source_node_id}")
|
||
|
||
# Проверяем формат сообщения
|
||
if message.get("action") != "discover":
|
||
raise HTTPException(status_code=400, detail="Invalid discovery message format")
|
||
|
||
known_nodes = message.get("known_nodes", [])
|
||
|
||
# Здесь добавить логику обработки информации о известных нодах
|
||
# и возврат информации о наших известных нодах
|
||
|
||
response_data = {
|
||
"known_nodes": [], # Заглушка - добавить реальный список
|
||
"discovery_timestamp": datetime.utcnow().isoformat()
|
||
}
|
||
|
||
return await create_node_response(response_data, request)
|
||
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
logger.error(f"Discovery error: {e}")
|
||
raise HTTPException(status_code=500, detail="Internal server error")
|
||
|
||
|
||
# V3 API compatibility endpoints (без подписи для совместимости)
|
||
@router.get("/v3/node/status")
|
||
async def v3_node_status():
|
||
"""
|
||
V3 API: Статус ноды для совместимости со скриптами
|
||
"""
|
||
try:
|
||
crypto_manager = get_ed25519_manager()
|
||
|
||
return {
|
||
"status": "online",
|
||
"node_id": crypto_manager.node_id,
|
||
"version": "3.0.0",
|
||
"network": "MY Network",
|
||
"capabilities": [
|
||
"content_upload",
|
||
"content_sync",
|
||
"decentralized_filtering",
|
||
"ed25519_signatures"
|
||
],
|
||
"timestamp": datetime.utcnow().isoformat()
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"V3 status error: {e}")
|
||
raise HTTPException(status_code=500, detail="Internal server error")
|
||
|
||
|
||
@router.get("/v3/network/stats")
|
||
async def v3_network_stats():
|
||
"""
|
||
V3 API: Статистика сети для совместимости со скриптами
|
||
"""
|
||
try:
|
||
# Заглушка для сетевой статистики
|
||
return {
|
||
"network_stats": {
|
||
"total_nodes": 1,
|
||
"active_nodes": 1,
|
||
"total_content": 0,
|
||
"network_health": "good"
|
||
},
|
||
"node_stats": {
|
||
"uptime": "online",
|
||
"connections": 0,
|
||
"content_shared": 0
|
||
},
|
||
"timestamp": datetime.utcnow().isoformat()
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"V3 network stats error: {e}")
|
||
raise HTTPException(status_code=500, detail="Internal server error")
|