uploader-bot/app/api/routes/node_stats_routes.py

241 lines
9.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import json
import logging
import os
import time
from datetime import datetime
from typing import Dict, Any, List, Optional
from fastapi import APIRouter, HTTPException, Request, status
from fastapi.responses import JSONResponse
from app.core.crypto import get_ed25519_manager
from app.core.content.chunk_manager import ChunkManager
from app.core.models.api.stats_models import (
NodeHealthResponse,
NodeContentStatsResponse,
ContentStatsItem,
NodeStatsReport,
)
# NEW imports for detailed stats and network overview
from app.core.stats.metrics_collector import MetricsCollector
from app.core.stats.stats_aggregator import StatsAggregator
from app.core.stats.gossip_manager import GossipManager
from app.core.models.stats.metrics_models import NodeStats
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/node/stats", tags=["node-stats"])
# Singleton-ish local instances for this router scope
_metrics_collector = MetricsCollector()
_stats_aggregator = StatsAggregator()
_gossip_manager = GossipManager()
async def _verify_inter_node_request_optional(request: Request) -> Optional[Dict[str, Any]]:
"""
Опциональная проверка межузловых заголовков + подписи.
Используется там, где межузловой вызов возможен (например, report).
Возвращает dict с информацией о ноде при успехе, иначе None.
"""
if request.headers.get("x-node-communication") != "true":
return None
# Требуются обязательные заголовки
required_headers = ["x-node-id", "x-node-public-key", "x-node-signature"]
for header in required_headers:
if header not in request.headers:
logger.warning("Missing header on inter-node request: %s", header)
raise HTTPException(status_code=400, detail=f"Missing required header: {header}")
# Читаем тело
body = await request.body()
if not body:
raise HTTPException(status_code=400, detail="Empty message body")
try:
message_data = json.loads(body.decode("utf-8"))
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="Invalid JSON in request body")
signature = request.headers.get("x-node-signature")
node_id = request.headers.get("x-node-id")
public_key = request.headers.get("x-node-public-key")
crypto_manager = get_ed25519_manager()
is_valid = crypto_manager.verify_signature(message_data, signature, public_key)
if not is_valid:
logger.warning("Invalid signature from node %s (stats)", node_id)
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Invalid cryptographic signature")
request.state.inter_node_communication = True
request.state.source_node_id = node_id
request.state.source_public_key = public_key
return {"node_id": node_id, "public_key": public_key, "message": message_data}
def _create_signed_response(data: Dict[str, Any]) -> JSONResponse:
"""Формирование подписанного ответа и стандартных межузловых заголовков."""
crypto_manager = get_ed25519_manager()
payload = {
"success": True,
"timestamp": datetime.utcnow().isoformat(),
"node_id": crypto_manager.node_id,
"data": data,
}
signature = crypto_manager.sign_message(payload)
headers = {
"X-Node-ID": crypto_manager.node_id,
"X-Node-Public-Key": crypto_manager.public_key_hex,
"X-Node-Communication": "true",
"X-Node-Signature": signature,
}
return JSONResponse(content=payload, headers=headers)
@router.get("/health")
async def node_health():
"""
GET /api/node/stats/health
Возвращает состояние ноды и базовые метрики.
"""
try:
crypto_manager = get_ed25519_manager()
# Собираем базовые метрики (простые заглушки без psutil, чтобы не добавлять зависимостей)
uptime = int(time.time() - int(os.getenv("NODE_START_TS", str(int(time.time())))))
cpu_usage = None
mem_usage = None
disk_free = None
resp = NodeHealthResponse(
status="ok",
node_id=crypto_manager.node_id,
public_key=crypto_manager.public_key_hex,
uptime_seconds=uptime,
cpu_usage=cpu_usage,
memory_usage_mb=mem_usage,
disk_free_mb=disk_free,
last_sync_ts=None,
details={
"version": "3.0.0",
"protocols": ["ed25519", "content_sync"],
},
)
# Открытый health можно вернуть без подписи, чтобы не ломать мониторинги
return resp.dict()
except Exception as e:
logger.exception("node_health error")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/content")
async def node_content_stats():
"""
GET /api/node/stats/content
Аггрегированная статистика по контенту на ноде.
"""
try:
# Заглушка: интеграция со стореджем ноды/БД для реальных значений
contents: List[ContentStatsItem] = []
total_chunks = sum(c.total_chunks for c in contents)
stored_chunks = sum(c.stored_chunks for c in contents)
missing_chunks = sum(c.missing_chunks for c in contents)
resp = NodeContentStatsResponse(
total_contents=len(contents),
total_chunks=total_chunks,
stored_chunks=stored_chunks,
missing_chunks=missing_chunks,
contents=contents,
)
return resp.dict()
except Exception as e:
logger.exception("node_content_stats error")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/report")
async def node_stats_report(request: Request, body: NodeStatsReport):
"""
POST /api/node/stats/report
Прием отчета от других нод (подписанного ed25519).
"""
await _verify_inter_node_request_optional(request)
try:
# Бизнес-логика обработки отчета: логируем, возможно сохраняем в БД
logger.info("Received stats report", extra={"report": body.dict()})
# Вытаскиваем вложенную метрику если есть и валидируем через GossipManager
metrics = body.metrics
if isinstance(metrics, dict) and metrics.get("node_id") and metrics.get("signature"):
try:
node_stats = await _gossip_manager.receive_stats(metrics)
await _stats_aggregator.add_peer_snapshot(node_stats)
except Exception as ge:
logger.warning("Peer stats rejected: %s", ge)
return _create_signed_response({"accepted": True})
except HTTPException:
raise
except Exception as e:
logger.exception("node_stats_report error")
raise HTTPException(status_code=500, detail=str(e))
# NEW: подробная статистика ноды
@router.get("/detailed")
async def node_detailed_stats():
"""
GET /api/node/stats/detailed
Подробная системная и прикладная статистика текущей ноды, с историческими агрегатами.
"""
try:
crypto = get_ed25519_manager()
# собрать свежие метрики и добавить в агрегатор
system, app = await _metrics_collector.get_current_stats()
local_snapshot = NodeStats(
node_id=crypto.node_id,
public_key=crypto.public_key_hex,
system=system,
app=app,
)
await _stats_aggregator.add_local_snapshot(local_snapshot)
aggregates = await _stats_aggregator.aggregate_node_stats(node_id=None, last_n=20)
trends = await _stats_aggregator.calculate_trends(node_id=None, window=60)
latest = await _stats_aggregator.get_latest_local()
latest_dict = latest.to_dict() if latest else None
data = {
"node_id": crypto.node_id,
"latest": latest_dict,
"aggregates": aggregates,
"trends": trends,
"timestamp": datetime.utcnow().isoformat(),
}
return _create_signed_response(data)
except Exception as e:
logger.exception("node_detailed_stats error")
raise HTTPException(status_code=500, detail=str(e))
# NEW: статистика сети (агрегированная по известным нодам)
@router.get("/network")
async def node_network_stats():
"""
GET /api/node/stats/network
Сводка по сети: число нод, активные, средние CPU/MEM, суммарный доступный контент и т.д.
"""
try:
overview = await _stats_aggregator.get_network_overview()
data = {
"overview": overview.to_dict(),
"timestamp": datetime.utcnow().isoformat(),
}
return _create_signed_response(data)
except Exception as e:
logger.exception("node_network_stats error")
raise HTTPException(status_code=500, detail=str(e))