241 lines
9.4 KiB
Python
241 lines
9.4 KiB
Python
from __future__ import annotations
|
||
|
||
import json
|
||
import logging
|
||
import os
|
||
import time
|
||
from datetime import datetime
|
||
from typing import Dict, Any, List, Optional
|
||
|
||
from fastapi import APIRouter, HTTPException, Request, status
|
||
from fastapi.responses import JSONResponse
|
||
|
||
from app.core.crypto import get_ed25519_manager
|
||
from app.core.content.chunk_manager import ChunkManager
|
||
from app.core.models.api.stats_models import (
|
||
NodeHealthResponse,
|
||
NodeContentStatsResponse,
|
||
ContentStatsItem,
|
||
NodeStatsReport,
|
||
)
|
||
# NEW imports for detailed stats and network overview
|
||
from app.core.stats.metrics_collector import MetricsCollector
|
||
from app.core.stats.stats_aggregator import StatsAggregator
|
||
from app.core.stats.gossip_manager import GossipManager
|
||
from app.core.models.stats.metrics_models import NodeStats
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
router = APIRouter(prefix="/api/node/stats", tags=["node-stats"])
|
||
|
||
# Singleton-ish local instances for this router scope
|
||
_metrics_collector = MetricsCollector()
|
||
_stats_aggregator = StatsAggregator()
|
||
_gossip_manager = GossipManager()
|
||
|
||
|
||
async def _verify_inter_node_request_optional(request: Request) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
Опциональная проверка межузловых заголовков + подписи.
|
||
Используется там, где межузловой вызов возможен (например, report).
|
||
Возвращает dict с информацией о ноде при успехе, иначе None.
|
||
"""
|
||
if request.headers.get("x-node-communication") != "true":
|
||
return None
|
||
|
||
# Требуются обязательные заголовки
|
||
required_headers = ["x-node-id", "x-node-public-key", "x-node-signature"]
|
||
for header in required_headers:
|
||
if header not in request.headers:
|
||
logger.warning("Missing header on inter-node request: %s", header)
|
||
raise HTTPException(status_code=400, detail=f"Missing required header: {header}")
|
||
|
||
# Читаем тело
|
||
body = await request.body()
|
||
if not body:
|
||
raise HTTPException(status_code=400, detail="Empty message body")
|
||
|
||
try:
|
||
message_data = json.loads(body.decode("utf-8"))
|
||
except json.JSONDecodeError:
|
||
raise HTTPException(status_code=400, detail="Invalid JSON in request body")
|
||
|
||
signature = request.headers.get("x-node-signature")
|
||
node_id = request.headers.get("x-node-id")
|
||
public_key = request.headers.get("x-node-public-key")
|
||
|
||
crypto_manager = get_ed25519_manager()
|
||
is_valid = crypto_manager.verify_signature(message_data, signature, public_key)
|
||
if not is_valid:
|
||
logger.warning("Invalid signature from node %s (stats)", node_id)
|
||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Invalid cryptographic signature")
|
||
|
||
request.state.inter_node_communication = True
|
||
request.state.source_node_id = node_id
|
||
request.state.source_public_key = public_key
|
||
return {"node_id": node_id, "public_key": public_key, "message": message_data}
|
||
|
||
|
||
def _create_signed_response(data: Dict[str, Any]) -> JSONResponse:
|
||
"""Формирование подписанного ответа и стандартных межузловых заголовков."""
|
||
crypto_manager = get_ed25519_manager()
|
||
payload = {
|
||
"success": True,
|
||
"timestamp": datetime.utcnow().isoformat(),
|
||
"node_id": crypto_manager.node_id,
|
||
"data": data,
|
||
}
|
||
signature = crypto_manager.sign_message(payload)
|
||
headers = {
|
||
"X-Node-ID": crypto_manager.node_id,
|
||
"X-Node-Public-Key": crypto_manager.public_key_hex,
|
||
"X-Node-Communication": "true",
|
||
"X-Node-Signature": signature,
|
||
}
|
||
return JSONResponse(content=payload, headers=headers)
|
||
|
||
|
||
@router.get("/health")
|
||
async def node_health():
|
||
"""
|
||
GET /api/node/stats/health
|
||
Возвращает состояние ноды и базовые метрики.
|
||
"""
|
||
try:
|
||
crypto_manager = get_ed25519_manager()
|
||
|
||
# Собираем базовые метрики (простые заглушки без psutil, чтобы не добавлять зависимостей)
|
||
uptime = int(time.time() - int(os.getenv("NODE_START_TS", str(int(time.time())))))
|
||
cpu_usage = None
|
||
mem_usage = None
|
||
disk_free = None
|
||
|
||
resp = NodeHealthResponse(
|
||
status="ok",
|
||
node_id=crypto_manager.node_id,
|
||
public_key=crypto_manager.public_key_hex,
|
||
uptime_seconds=uptime,
|
||
cpu_usage=cpu_usage,
|
||
memory_usage_mb=mem_usage,
|
||
disk_free_mb=disk_free,
|
||
last_sync_ts=None,
|
||
details={
|
||
"version": "3.0.0",
|
||
"protocols": ["ed25519", "content_sync"],
|
||
},
|
||
)
|
||
# Открытый health можно вернуть без подписи, чтобы не ломать мониторинги
|
||
return resp.dict()
|
||
except Exception as e:
|
||
logger.exception("node_health error")
|
||
raise HTTPException(status_code=500, detail=str(e))
|
||
|
||
|
||
@router.get("/content")
|
||
async def node_content_stats():
|
||
"""
|
||
GET /api/node/stats/content
|
||
Аггрегированная статистика по контенту на ноде.
|
||
"""
|
||
try:
|
||
# Заглушка: интеграция со стореджем ноды/БД для реальных значений
|
||
contents: List[ContentStatsItem] = []
|
||
total_chunks = sum(c.total_chunks for c in contents)
|
||
stored_chunks = sum(c.stored_chunks for c in contents)
|
||
missing_chunks = sum(c.missing_chunks for c in contents)
|
||
|
||
resp = NodeContentStatsResponse(
|
||
total_contents=len(contents),
|
||
total_chunks=total_chunks,
|
||
stored_chunks=stored_chunks,
|
||
missing_chunks=missing_chunks,
|
||
contents=contents,
|
||
)
|
||
return resp.dict()
|
||
except Exception as e:
|
||
logger.exception("node_content_stats error")
|
||
raise HTTPException(status_code=500, detail=str(e))
|
||
|
||
|
||
@router.post("/report")
|
||
async def node_stats_report(request: Request, body: NodeStatsReport):
|
||
"""
|
||
POST /api/node/stats/report
|
||
Прием отчета от других нод (подписанного ed25519).
|
||
"""
|
||
await _verify_inter_node_request_optional(request)
|
||
try:
|
||
# Бизнес-логика обработки отчета: логируем, возможно сохраняем в БД
|
||
logger.info("Received stats report", extra={"report": body.dict()})
|
||
|
||
# Вытаскиваем вложенную метрику если есть и валидируем через GossipManager
|
||
metrics = body.metrics
|
||
if isinstance(metrics, dict) and metrics.get("node_id") and metrics.get("signature"):
|
||
try:
|
||
node_stats = await _gossip_manager.receive_stats(metrics)
|
||
await _stats_aggregator.add_peer_snapshot(node_stats)
|
||
except Exception as ge:
|
||
logger.warning("Peer stats rejected: %s", ge)
|
||
|
||
return _create_signed_response({"accepted": True})
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
logger.exception("node_stats_report error")
|
||
raise HTTPException(status_code=500, detail=str(e))
|
||
|
||
|
||
# NEW: подробная статистика ноды
|
||
@router.get("/detailed")
|
||
async def node_detailed_stats():
|
||
"""
|
||
GET /api/node/stats/detailed
|
||
Подробная системная и прикладная статистика текущей ноды, с историческими агрегатами.
|
||
"""
|
||
try:
|
||
crypto = get_ed25519_manager()
|
||
# собрать свежие метрики и добавить в агрегатор
|
||
system, app = await _metrics_collector.get_current_stats()
|
||
local_snapshot = NodeStats(
|
||
node_id=crypto.node_id,
|
||
public_key=crypto.public_key_hex,
|
||
system=system,
|
||
app=app,
|
||
)
|
||
await _stats_aggregator.add_local_snapshot(local_snapshot)
|
||
|
||
aggregates = await _stats_aggregator.aggregate_node_stats(node_id=None, last_n=20)
|
||
trends = await _stats_aggregator.calculate_trends(node_id=None, window=60)
|
||
latest = await _stats_aggregator.get_latest_local()
|
||
latest_dict = latest.to_dict() if latest else None
|
||
|
||
data = {
|
||
"node_id": crypto.node_id,
|
||
"latest": latest_dict,
|
||
"aggregates": aggregates,
|
||
"trends": trends,
|
||
"timestamp": datetime.utcnow().isoformat(),
|
||
}
|
||
return _create_signed_response(data)
|
||
except Exception as e:
|
||
logger.exception("node_detailed_stats error")
|
||
raise HTTPException(status_code=500, detail=str(e))
|
||
|
||
|
||
# NEW: статистика сети (агрегированная по известным нодам)
|
||
@router.get("/network")
|
||
async def node_network_stats():
|
||
"""
|
||
GET /api/node/stats/network
|
||
Сводка по сети: число нод, активные, средние CPU/MEM, суммарный доступный контент и т.д.
|
||
"""
|
||
try:
|
||
overview = await _stats_aggregator.get_network_overview()
|
||
data = {
|
||
"overview": overview.to_dict(),
|
||
"timestamp": datetime.utcnow().isoformat(),
|
||
}
|
||
return _create_signed_response(data)
|
||
except Exception as e:
|
||
logger.exception("node_network_stats error")
|
||
raise HTTPException(status_code=500, detail=str(e)) |