"""Health check and system status endpoints.""" import logging import asyncio from datetime import datetime from typing import Dict, Any from sanic import Blueprint, Request, response from sanic.response import JSONResponse from app.core.config import get_settings from app.core.database import get_async_session from app.core.metrics import get_metrics, get_metrics_content_type, metrics_collector from app.core.background.indexer_service import indexer_service from app.core.background.convert_service import convert_service from app.core.background.ton_service import ton_service logger = logging.getLogger(__name__) health_bp = Blueprint("health", version=1) @health_bp.route("/health", methods=["GET"]) async def health_check(request: Request) -> JSONResponse: """Basic health check endpoint.""" return response.json({ "status": "healthy", "timestamp": datetime.utcnow().isoformat(), "service": "my-uploader-bot", "version": "2.0.0" }) @health_bp.route("/health/detailed", methods=["GET"]) async def detailed_health_check(request: Request) -> JSONResponse: """Detailed health check with component status.""" health_status = { "status": "healthy", "timestamp": datetime.utcnow().isoformat(), "service": "my-uploader-bot", "version": "2.0.0", "components": {} } overall_healthy = True # Database health try: async with get_async_session() as session: await session.execute("SELECT 1") health_status["components"]["database"] = { "status": "healthy", "message": "Database connection successful" } except Exception as e: health_status["components"]["database"] = { "status": "unhealthy", "message": f"Database error: {str(e)}" } overall_healthy = False # Redis health try: import redis.asyncio as redis settings = get_settings() redis_client = redis.from_url(settings.redis_url) await redis_client.ping() await redis_client.close() health_status["components"]["cache"] = { "status": "healthy", "message": "Redis connection successful" } except Exception as e: health_status["components"]["cache"] = { "status": "unhealthy", "message": f"Redis error: {str(e)}" } overall_healthy = False # TON service health try: # Check if TON service is responsive test_result = await ton_service.ping() health_status["components"]["blockchain"] = { "status": "healthy" if test_result else "degraded", "message": "TON service available" if test_result else "TON service degraded" } if not test_result: overall_healthy = False except Exception as e: health_status["components"]["blockchain"] = { "status": "unhealthy", "message": f"TON service error: {str(e)}" } overall_healthy = False # Background services health health_status["components"]["background_services"] = { "indexer": { "status": "healthy" if indexer_service.is_running else "stopped", "active_tasks": len([t for t in indexer_service.tasks if not t.done()]) }, "converter": { "status": "healthy" if convert_service.is_running else "stopped", "active_tasks": len([t for t in convert_service.tasks if not t.done()]) } } # Update overall status if not overall_healthy: health_status["status"] = "unhealthy" status_code = 200 if overall_healthy else 503 return response.json(health_status, status=status_code) @health_bp.route("/health/ready", methods=["GET"]) async def readiness_check(request: Request) -> JSONResponse: """Kubernetes readiness probe endpoint.""" try: # Quick database check async with get_async_session() as session: await session.execute("SELECT 1") return response.json({ "status": "ready", "timestamp": datetime.utcnow().isoformat() }) except Exception as e: return response.json({ "status": "not_ready", "error": str(e), "timestamp": datetime.utcnow().isoformat() }, status=503) @health_bp.route("/health/live", methods=["GET"]) async def liveness_check(request: Request) -> JSONResponse: """Kubernetes liveness probe endpoint.""" return response.json({ "status": "alive", "timestamp": datetime.utcnow().isoformat() }) @health_bp.route("/metrics", methods=["GET"]) async def prometheus_metrics(request: Request): """Prometheus metrics endpoint.""" try: metrics_data = await get_metrics() return response.raw( metrics_data, content_type=get_metrics_content_type() ) except Exception as e: logger.error(f"Error generating metrics: {e}") return response.json({ "error": "Failed to generate metrics" }, status=500) @health_bp.route("/stats", methods=["GET"]) async def system_stats(request: Request) -> JSONResponse: """System statistics endpoint.""" try: stats = { "timestamp": datetime.utcnow().isoformat(), "uptime": metrics_collector.start_time, "services": {} } # Get indexer stats try: indexer_stats = await indexer_service.get_indexing_stats() stats["services"]["indexer"] = indexer_stats except Exception as e: stats["services"]["indexer"] = {"error": str(e)} # Get converter stats try: converter_stats = await convert_service.get_processing_stats() stats["services"]["converter"] = converter_stats except Exception as e: stats["services"]["converter"] = {"error": str(e)} return response.json(stats) except Exception as e: logger.error(f"Error getting system stats: {e}") return response.json({ "error": "Failed to get system stats" }, status=500) @health_bp.route("/debug/info", methods=["GET"]) async def debug_info(request: Request) -> JSONResponse: """Debug information endpoint (development only).""" settings = get_settings() if settings.environment != "development": return response.json({ "error": "Debug endpoint only available in development" }, status=403) debug_data = { "timestamp": datetime.utcnow().isoformat(), "environment": settings.environment, "debug_mode": settings.debug, "database_url": settings.database_url.replace( settings.database_url.split('@')[0].split('//')[1], "***:***" ) if '@' in settings.database_url else "***", "redis_url": settings.redis_url.replace( settings.redis_url.split('@')[0].split('//')[1], "***:***" ) if '@' in settings.redis_url else "***", "storage_backend": settings.storage_backend, "ton_network": settings.ton_network, "active_tasks": { "indexer": len([t for t in indexer_service.tasks if not t.done()]), "converter": len([t for t in convert_service.tasks if not t.done()]) } } return response.json(debug_data)