uploader-bot/app/api/fastapi_system_routes.py

556 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
FastAPI системные эндпоинты для мониторинга, health checks и администрирования
TIER 3 - системные функции для операционного управления
"""
import asyncio
import platform
import psutil
import time
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
from uuid import UUID
from fastapi import APIRouter, HTTPException, Request, Depends, Query
from fastapi.responses import JSONResponse
from sqlalchemy import select, text
from app.core.config import get_settings
from app.core.database import db_manager, get_cache_manager
from app.core.logging import get_logger
from app.core.crypto import get_ed25519_manager
from app.core.models.content_models import StoredContent as Content
from app.core.models.user import User
from app.api.fastapi_middleware import require_auth, require_admin
# Initialize router
router = APIRouter(prefix="/api/system", tags=["system"])
logger = get_logger(__name__)
settings = get_settings()
# Системная информация для мониторинга
_start_time = time.time()
_request_counter = 0
_error_counter = 0
@router.get("/health")
async def health_check():
"""
Базовая проверка здоровья сервиса
Доступна без авторизации для load balancer'ов
"""
try:
# Проверяем подключение к базе данных
db_status = "unknown"
try:
async with db_manager.get_session() as session:
await session.execute(text("SELECT 1"))
db_status = "healthy"
except Exception as e:
db_status = f"unhealthy: {str(e)[:100]}"
# Проверяем кэш
cache_status = "unknown"
try:
cache_manager = await get_cache_manager()
await cache_manager.set("health_check", "ok", ttl=10)
cache_status = "healthy"
except Exception as e:
cache_status = f"unhealthy: {str(e)[:100]}"
# Проверяем криптографию
crypto_status = "unknown"
try:
crypto_manager = get_ed25519_manager()
test_data = {"test": "health_check"}
signature = crypto_manager.sign_message(test_data)
is_valid = crypto_manager.verify_signature(
test_data, signature, crypto_manager.public_key_hex
)
crypto_status = "healthy" if is_valid else "unhealthy: signature verification failed"
except Exception as e:
crypto_status = f"unhealthy: {str(e)[:100]}"
# Определяем общий статус
overall_status = "healthy"
if "unhealthy" in db_status or "unhealthy" in cache_status or "unhealthy" in crypto_status:
overall_status = "degraded"
health_data = {
"status": overall_status,
"timestamp": datetime.utcnow().isoformat(),
"services": {
"database": db_status,
"cache": cache_status,
"cryptography": crypto_status
},
"uptime_seconds": int(time.time() - _start_time)
}
# Возвращаем статус с соответствующим HTTP кодом
status_code = 200 if overall_status == "healthy" else 503
return JSONResponse(
content=health_data,
status_code=status_code
)
except Exception as e:
await logger.aerror(
"Health check failed",
error=str(e)
)
return JSONResponse(
content={
"status": "unhealthy",
"error": "Health check system failure",
"timestamp": datetime.utcnow().isoformat()
},
status_code=503
)
@router.get("/health/detailed")
async def detailed_health_check(
request: Request,
current_user: User = Depends(require_admin)
):
"""
Детальная проверка здоровья системы с метриками
Только для администраторов
"""
try:
# Системные метрики
system_info = {
"cpu_percent": psutil.cpu_percent(interval=1),
"memory": {
"total": psutil.virtual_memory().total,
"available": psutil.virtual_memory().available,
"percent": psutil.virtual_memory().percent
},
"disk": {
"total": psutil.disk_usage('/').total,
"used": psutil.disk_usage('/').used,
"free": psutil.disk_usage('/').free,
"percent": psutil.disk_usage('/').percent
},
"load_average": psutil.getloadavg() if hasattr(psutil, 'getloadavg') else None
}
# Метрики базы данных
db_metrics = {}
try:
async with db_manager.get_session() as session:
# Количество пользователей
user_count = await session.execute(text("SELECT COUNT(*) FROM users"))
db_metrics["users_count"] = user_count.scalar()
# Количество контента
content_count = await session.execute(text("SELECT COUNT(*) FROM stored_content"))
db_metrics["content_count"] = content_count.scalar()
# Размер базы данных (приблизительно)
db_size = await session.execute(text("""
SELECT pg_size_pretty(pg_database_size(current_database()))
"""))
db_metrics["database_size"] = db_size.scalar()
except Exception as e:
db_metrics["error"] = str(e)
# Метрики кэша
cache_metrics = {}
try:
cache_manager = await get_cache_manager()
# Здесь добавить метрики Redis если доступны
cache_metrics["status"] = "connected"
except Exception as e:
cache_metrics["error"] = str(e)
# Метрики приложения
app_metrics = {
"uptime_seconds": int(time.time() - _start_time),
"requests_total": _request_counter,
"errors_total": _error_counter,
"error_rate": _error_counter / max(_request_counter, 1),
"python_version": platform.python_version(),
"platform": platform.platform()
}
# Конфигурация
config_info = {
"debug_mode": getattr(settings, 'DEBUG', False),
"environment": getattr(settings, 'ENVIRONMENT', 'unknown'),
"version": getattr(settings, 'VERSION', 'unknown'),
"node_id": get_ed25519_manager().node_id[:8] + "..." # Частичный ID для безопасности
}
detailed_health = {
"status": "healthy",
"timestamp": datetime.utcnow().isoformat(),
"system": system_info,
"database": db_metrics,
"cache": cache_metrics,
"application": app_metrics,
"configuration": config_info
}
return detailed_health
except Exception as e:
await logger.aerror(
"Detailed health check failed",
user_id=str(current_user.id),
error=str(e)
)
raise HTTPException(status_code=500, detail="Failed to get detailed health status")
@router.get("/metrics")
async def prometheus_metrics():
"""
Метрики в формате Prometheus
"""
try:
# Базовые метрики системы
cpu_usage = psutil.cpu_percent(interval=0.1)
memory = psutil.virtual_memory()
disk = psutil.disk_usage('/')
# Метрики приложения
uptime = int(time.time() - _start_time)
# Формат Prometheus
metrics = f"""# HELP uploader_bot_uptime_seconds Total uptime in seconds
# TYPE uploader_bot_uptime_seconds counter
uploader_bot_uptime_seconds {uptime}
# HELP uploader_bot_requests_total Total number of HTTP requests
# TYPE uploader_bot_requests_total counter
uploader_bot_requests_total {_request_counter}
# HELP uploader_bot_errors_total Total number of errors
# TYPE uploader_bot_errors_total counter
uploader_bot_errors_total {_error_counter}
# HELP system_cpu_usage_percent CPU usage percentage
# TYPE system_cpu_usage_percent gauge
system_cpu_usage_percent {cpu_usage}
# HELP system_memory_usage_percent Memory usage percentage
# TYPE system_memory_usage_percent gauge
system_memory_usage_percent {memory.percent}
# HELP system_disk_usage_percent Disk usage percentage
# TYPE system_disk_usage_percent gauge
system_disk_usage_percent {disk.percent}
# HELP system_memory_total_bytes Total memory in bytes
# TYPE system_memory_total_bytes gauge
system_memory_total_bytes {memory.total}
# HELP system_memory_available_bytes Available memory in bytes
# TYPE system_memory_available_bytes gauge
system_memory_available_bytes {memory.available}
"""
from fastapi.responses import PlainTextResponse
return PlainTextResponse(
content=metrics
)
except Exception as e:
await logger.aerror(
"Metrics collection failed",
error=str(e)
)
raise HTTPException(status_code=500, detail="Failed to collect metrics")
@router.get("/info")
async def system_info():
"""
Общая информация о системе (публичная)
"""
try:
crypto_manager = get_ed25519_manager()
info = {
"service": "uploader-bot",
"version": getattr(settings, 'VERSION', 'unknown'),
"api_version": "v1",
"network": "MY Network v3.0",
"node_id": crypto_manager.node_id,
"public_key": crypto_manager.public_key_hex,
"capabilities": [
"content_upload",
"content_sync",
"decentralized_filtering",
"ed25519_signatures",
"web2_client_api"
],
"supported_formats": [
"image/*",
"video/*",
"audio/*",
"text/*",
"application/pdf"
],
"max_file_size": getattr(settings, 'MAX_FILE_SIZE', 100 * 1024 * 1024),
"timestamp": datetime.utcnow().isoformat()
}
return info
except Exception as e:
await logger.aerror(
"System info failed",
error=str(e)
)
raise HTTPException(status_code=500, detail="Failed to get system information")
@router.get("/stats")
async def system_statistics(
request: Request,
current_user: User = Depends(require_auth),
days: int = Query(7, ge=1, le=30, description="Number of days for statistics")
):
"""
Статистика системы за указанный период
"""
try:
since_date = datetime.utcnow() - timedelta(days=days)
# Статистика из базы данных
stats = {}
async with db_manager.get_session() as session:
# Общая статистика контента
content_stats = await session.execute(text("""
SELECT
COUNT(*) as total_content,
SUM(CASE WHEN created_at >= :since_date THEN 1 ELSE 0 END) as new_content,
SUM(file_size) as total_size,
AVG(file_size) as avg_size
FROM stored_content
"""), {"since_date": since_date})
content_row = content_stats.fetchone()
stats["content"] = {
"total_items": content_row.total_content or 0,
"new_items": content_row.new_content or 0,
"total_size_bytes": content_row.total_size or 0,
"average_size_bytes": float(content_row.avg_size or 0)
}
# Статистика пользователей
user_stats = await session.execute(text("""
SELECT
COUNT(*) as total_users,
SUM(CASE WHEN created_at >= :since_date THEN 1 ELSE 0 END) as new_users
FROM users
"""), {"since_date": since_date})
user_row = user_stats.fetchone()
stats["users"] = {
"total_users": user_row.total_users or 0,
"new_users": user_row.new_users or 0
}
# Системная статистика
stats["system"] = {
"uptime_seconds": int(time.time() - _start_time),
"requests_handled": _request_counter,
"errors_occurred": _error_counter,
"period_days": days,
"generated_at": datetime.utcnow().isoformat()
}
return stats
except Exception as e:
await logger.aerror(
"Statistics generation failed",
user_id=str(current_user.id),
error=str(e)
)
raise HTTPException(status_code=500, detail="Failed to generate statistics")
@router.post("/maintenance")
async def toggle_maintenance_mode(
request: Request,
enabled: bool = Query(description="Enable or disable maintenance mode"),
current_user: User = Depends(require_admin)
):
"""
Включение/отключение режима обслуживания
Только для администраторов
"""
try:
cache_manager = await get_cache_manager()
if enabled:
maintenance_info = {
"enabled": True,
"enabled_at": datetime.utcnow().isoformat(),
"enabled_by": str(current_user.id),
"message": "System is under maintenance. Please try again later."
}
await cache_manager.set("maintenance_mode", maintenance_info, ttl=86400) # 24 часа
await logger.awarning(
"Maintenance mode enabled",
admin_id=str(current_user.id)
)
return {
"message": "Maintenance mode enabled",
"maintenance_info": maintenance_info
}
else:
await cache_manager.delete("maintenance_mode")
await logger.ainfo(
"Maintenance mode disabled",
admin_id=str(current_user.id)
)
return {
"message": "Maintenance mode disabled"
}
except Exception as e:
await logger.aerror(
"Maintenance mode toggle failed",
admin_id=str(current_user.id),
error=str(e)
)
raise HTTPException(status_code=500, detail="Failed to toggle maintenance mode")
@router.get("/logs")
async def get_system_logs(
request: Request,
current_user: User = Depends(require_admin),
level: str = Query("INFO", description="Log level filter"),
lines: int = Query(100, ge=1, le=1000, description="Number of lines to return"),
component: Optional[str] = Query(None, description="Filter by component")
):
"""
Получение системных логов
Только для администраторов
"""
try:
# Здесь должна быть реализация чтения логов
# В реальной системе это может быть подключение к логгеру или файловой системе
# Заглушка для демонстрации
logs = [
{
"timestamp": datetime.utcnow().isoformat(),
"level": "INFO",
"component": "system",
"message": "System logs endpoint accessed",
"user_id": str(current_user.id)
}
]
return {
"logs": logs,
"total_lines": len(logs),
"filters": {
"level": level,
"lines": lines,
"component": component
},
"generated_at": datetime.utcnow().isoformat()
}
except Exception as e:
await logger.aerror(
"Log retrieval failed",
admin_id=str(current_user.id),
error=str(e)
)
raise HTTPException(status_code=500, detail="Failed to retrieve logs")
# Middleware для подсчета запросов (будет использоваться в главном приложении)
async def increment_request_counter():
"""Увеличение счетчика запросов"""
global _request_counter
_request_counter += 1
async def increment_error_counter():
"""Увеличение счетчика ошибок"""
global _error_counter
_error_counter += 1
# Healthcheck для ready probe (Kubernetes)
@router.get("/ready")
async def readiness_check():
"""
Проверка готовности к обслуживанию запросов
Для Kubernetes readiness probe
"""
try:
# Проверяем критически важные сервисы
checks = []
# Проверка базы данных
try:
async with db_manager.get_session() as session:
await session.execute(text("SELECT 1"))
checks.append({"service": "database", "status": "ready"})
except Exception as e:
checks.append({"service": "database", "status": "not_ready", "error": str(e)})
# Проверка кэша
try:
cache_manager = await get_cache_manager()
await cache_manager.set("readiness_check", "ok", ttl=5)
checks.append({"service": "cache", "status": "ready"})
except Exception as e:
checks.append({"service": "cache", "status": "not_ready", "error": str(e)})
# Определяем готовность
all_ready = all(check["status"] == "ready" for check in checks)
return JSONResponse(
content={
"status": "ready" if all_ready else "not_ready",
"checks": checks,
"timestamp": datetime.utcnow().isoformat()
},
status_code=200 if all_ready else 503
)
except Exception as e:
return JSONResponse(
content={
"status": "not_ready",
"error": "Readiness check failed",
"timestamp": datetime.utcnow().isoformat()
},
status_code=503
)
# Liveness probe для Kubernetes
@router.get("/live")
async def liveness_check():
"""
Проверка жизнеспособности приложения
Для Kubernetes liveness probe
"""
return {
"status": "alive",
"timestamp": datetime.utcnow().isoformat(),
"uptime_seconds": int(time.time() - _start_time)
}