452 lines
14 KiB
Python
452 lines
14 KiB
Python
"""
|
|
Enhanced Sanic API application with async support and monitoring
|
|
"""
|
|
import asyncio
|
|
from contextlib import asynccontextmanager
|
|
from typing import Dict, Any, Optional
|
|
|
|
from sanic import Sanic, Request, HTTPResponse
|
|
from sanic.response import json as json_response
|
|
from sanic.exceptions import SanicException
|
|
import structlog
|
|
|
|
from app.core.config import settings
|
|
from app.core.database import init_database, close_database
|
|
from app.core.logging import get_logger, AsyncContextLogger
|
|
from app.api.middleware import (
|
|
request_middleware,
|
|
response_middleware,
|
|
exception_middleware,
|
|
maintenance_middleware
|
|
)
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
class EnhancedSanic(Sanic):
|
|
"""Enhanced Sanic application with additional features"""
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.ctx.startup_tasks = []
|
|
self.ctx.shutdown_tasks = []
|
|
self.ctx.background_tasks = set()
|
|
|
|
async def startup(self):
|
|
"""Application startup tasks"""
|
|
async with AsyncContextLogger("app_startup"):
|
|
# Initialize database
|
|
await init_database()
|
|
logger.info("Database initialized")
|
|
|
|
# Initialize cache connections
|
|
from app.core.database import get_cache
|
|
cache = await get_cache()
|
|
await cache.redis.ping()
|
|
logger.info("Redis cache initialized")
|
|
|
|
# Run custom startup tasks
|
|
for task in self.ctx.startup_tasks:
|
|
try:
|
|
await task()
|
|
except Exception as e:
|
|
logger.error("Startup task failed", task=task.__name__, error=str(e))
|
|
|
|
logger.info("Application startup completed")
|
|
|
|
async def shutdown(self):
|
|
"""Application shutdown tasks"""
|
|
async with AsyncContextLogger("app_shutdown"):
|
|
# Cancel background tasks
|
|
for task in self.ctx.background_tasks:
|
|
if not task.done():
|
|
task.cancel()
|
|
|
|
# Wait for tasks to complete
|
|
if self.ctx.background_tasks:
|
|
await asyncio.gather(*self.ctx.background_tasks, return_exceptions=True)
|
|
|
|
# Run custom shutdown tasks
|
|
for task in self.ctx.shutdown_tasks:
|
|
try:
|
|
await task()
|
|
except Exception as e:
|
|
logger.error("Shutdown task failed", task=task.__name__, error=str(e))
|
|
|
|
# Close database connections
|
|
await close_database()
|
|
logger.info("Database connections closed")
|
|
|
|
logger.info("Application shutdown completed")
|
|
|
|
def add_startup_task(self, task):
|
|
"""Add startup task"""
|
|
self.ctx.startup_tasks.append(task)
|
|
|
|
def add_shutdown_task(self, task):
|
|
"""Add shutdown task"""
|
|
self.ctx.shutdown_tasks.append(task)
|
|
|
|
def add_background_task(self, coro):
|
|
"""Add background task"""
|
|
task = asyncio.create_task(coro)
|
|
self.ctx.background_tasks.add(task)
|
|
task.add_done_callback(self.ctx.background_tasks.discard)
|
|
return task
|
|
|
|
|
|
# Create Sanic app instance
|
|
app = EnhancedSanic(
|
|
name="my_uploader_bot",
|
|
configure_logging=False # We handle logging ourselves
|
|
)
|
|
|
|
# Configure app settings
|
|
app.config.update({
|
|
"REQUEST_MAX_SIZE": settings.MAX_FILE_SIZE,
|
|
"REQUEST_TIMEOUT": 60,
|
|
"RESPONSE_TIMEOUT": 60,
|
|
"KEEP_ALIVE_TIMEOUT": 5,
|
|
"KEEP_ALIVE": True,
|
|
"ACCESS_LOG": False, # We handle access logging in middleware
|
|
"AUTO_RELOAD": settings.DEBUG,
|
|
"DEBUG": settings.DEBUG,
|
|
})
|
|
|
|
|
|
# Register middleware
|
|
app.register_middleware(maintenance_middleware, "request")
|
|
app.register_middleware(request_middleware, "request")
|
|
app.register_middleware(response_middleware, "response")
|
|
|
|
|
|
# Global exception handler
|
|
@app.exception(Exception)
|
|
async def handle_exception(request: Request, exception: Exception):
|
|
"""Global exception handler"""
|
|
return await exception_middleware(request, exception)
|
|
|
|
|
|
# Health check endpoint
|
|
@app.get("/health")
|
|
async def health_check(request: Request):
|
|
"""Health check endpoint"""
|
|
try:
|
|
# Check database connection
|
|
from app.core.database import db_manager
|
|
async with db_manager.get_session() as session:
|
|
await session.execute("SELECT 1")
|
|
|
|
# Check Redis connection
|
|
from app.core.database import get_cache
|
|
cache = await get_cache()
|
|
await cache.redis.ping()
|
|
|
|
return json_response({
|
|
"status": "healthy",
|
|
"version": settings.PROJECT_VERSION,
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error("Health check failed", error=str(e))
|
|
return json_response({
|
|
"status": "unhealthy",
|
|
"error": str(e),
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
}, status=503)
|
|
|
|
|
|
# Metrics endpoint (if enabled)
|
|
if settings.METRICS_ENABLED:
|
|
@app.get("/metrics")
|
|
async def metrics_endpoint(request: Request):
|
|
"""Prometheus metrics endpoint"""
|
|
try:
|
|
from prometheus_client import generate_latest, CONTENT_TYPE_LATEST
|
|
metrics_data = generate_latest()
|
|
return HTTPResponse(
|
|
body=metrics_data,
|
|
headers={"Content-Type": CONTENT_TYPE_LATEST},
|
|
status=200
|
|
)
|
|
except Exception as e:
|
|
logger.error("Metrics generation failed", error=str(e))
|
|
return json_response({"error": "Metrics unavailable"}, status=503)
|
|
|
|
|
|
# System info endpoint
|
|
@app.get("/api/system/info")
|
|
async def system_info(request: Request):
|
|
"""System information endpoint"""
|
|
try:
|
|
import psutil
|
|
import sys
|
|
|
|
# Get system metrics
|
|
memory = psutil.virtual_memory()
|
|
disk = psutil.disk_usage('/')
|
|
|
|
info = {
|
|
"application": {
|
|
"name": settings.PROJECT_NAME,
|
|
"version": settings.PROJECT_VERSION,
|
|
"python_version": sys.version,
|
|
"debug": settings.DEBUG,
|
|
},
|
|
"system": {
|
|
"cpu_percent": psutil.cpu_percent(),
|
|
"memory": {
|
|
"total": memory.total,
|
|
"available": memory.available,
|
|
"percent": memory.percent
|
|
},
|
|
"disk": {
|
|
"total": disk.total,
|
|
"free": disk.free,
|
|
"percent": (disk.used / disk.total) * 100
|
|
}
|
|
},
|
|
"services": {
|
|
"database": "connected",
|
|
"redis": "connected",
|
|
"indexer": "running" if settings.INDEXER_ENABLED else "disabled",
|
|
"ton_daemon": "running" if settings.TON_DAEMON_ENABLED else "disabled",
|
|
}
|
|
}
|
|
|
|
return json_response(info)
|
|
|
|
except Exception as e:
|
|
logger.error("System info failed", error=str(e))
|
|
return json_response({"error": "System info unavailable"}, status=500)
|
|
|
|
|
|
# Register API routes
|
|
def register_routes():
|
|
"""Register all API routes"""
|
|
from app.api.routes import (
|
|
auth_routes,
|
|
content_routes,
|
|
storage_routes,
|
|
blockchain_routes,
|
|
admin_routes,
|
|
user_routes,
|
|
system_routes
|
|
)
|
|
|
|
# Register route blueprints
|
|
app.blueprint(auth_routes.bp)
|
|
app.blueprint(content_routes.bp)
|
|
app.blueprint(storage_routes.bp)
|
|
app.blueprint(blockchain_routes.bp)
|
|
app.blueprint(admin_routes.bp)
|
|
app.blueprint(user_routes.bp)
|
|
app.blueprint(system_routes.bp)
|
|
|
|
# Попробовать добавить MY Network маршруты
|
|
try:
|
|
from app.api.routes import my_network_sanic, my_monitoring_sanic
|
|
|
|
# Создать MY Network blueprint'ы
|
|
app.blueprint(my_network_sanic.bp)
|
|
app.blueprint(my_monitoring_sanic.bp)
|
|
|
|
logger.info("MY Network routes registered")
|
|
|
|
except ImportError as e:
|
|
logger.warning("MY Network routes not available", error=str(e))
|
|
|
|
logger.info("API routes registered")
|
|
|
|
|
|
# Application lifecycle hooks
|
|
@app.before_server_start
|
|
async def before_server_start(app, loop):
|
|
"""Tasks to run before server starts"""
|
|
await app.startup()
|
|
|
|
|
|
@app.after_server_stop
|
|
async def after_server_stop(app, loop):
|
|
"""Tasks to run after server stops"""
|
|
await app.shutdown()
|
|
|
|
|
|
# Background task management
|
|
class BackgroundTaskManager:
|
|
"""Manager for background tasks"""
|
|
|
|
def __init__(self, app: EnhancedSanic):
|
|
self.app = app
|
|
self.tasks: Dict[str, asyncio.Task] = {}
|
|
|
|
async def start_service(self, name: str, service_func, *args, **kwargs):
|
|
"""Start a background service"""
|
|
if name in self.tasks:
|
|
logger.warning("Service already running", service=name)
|
|
return
|
|
|
|
logger.info("Starting background service", service=name)
|
|
task = self.app.add_background_task(service_func(*args, **kwargs))
|
|
self.tasks[name] = task
|
|
|
|
# Add error handling
|
|
task.add_done_callback(lambda t: self._handle_task_completion(name, t))
|
|
|
|
def _handle_task_completion(self, name: str, task: asyncio.Task):
|
|
"""Handle background task completion"""
|
|
if name in self.tasks:
|
|
del self.tasks[name]
|
|
|
|
if task.cancelled():
|
|
logger.info("Background service cancelled", service=name)
|
|
elif task.exception():
|
|
logger.error("Background service failed", service=name, error=str(task.exception()))
|
|
else:
|
|
logger.info("Background service completed", service=name)
|
|
|
|
async def stop_service(self, name: str):
|
|
"""Stop a background service"""
|
|
if name not in self.tasks:
|
|
logger.warning("Service not running", service=name)
|
|
return
|
|
|
|
logger.info("Stopping background service", service=name)
|
|
task = self.tasks[name]
|
|
task.cancel()
|
|
|
|
try:
|
|
await task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
async def stop_all_services(self):
|
|
"""Stop all background services"""
|
|
for name in list(self.tasks.keys()):
|
|
await self.stop_service(name)
|
|
|
|
def get_service_status(self) -> Dict[str, str]:
|
|
"""Get status of all services"""
|
|
status = {}
|
|
for name, task in self.tasks.items():
|
|
if task.done():
|
|
if task.cancelled():
|
|
status[name] = "cancelled"
|
|
elif task.exception():
|
|
status[name] = "failed"
|
|
else:
|
|
status[name] = "completed"
|
|
else:
|
|
status[name] = "running"
|
|
return status
|
|
|
|
|
|
# Initialize background task manager
|
|
task_manager = BackgroundTaskManager(app)
|
|
app.ctx.task_manager = task_manager
|
|
|
|
|
|
# Service startup functions
|
|
async def start_background_services():
|
|
"""Start all background services"""
|
|
from app.core.background import (
|
|
indexer_service,
|
|
ton_service,
|
|
license_service,
|
|
convert_service,
|
|
uploader_service
|
|
)
|
|
|
|
if settings.INDEXER_ENABLED:
|
|
await task_manager.start_service("indexer", indexer_service.main_fn)
|
|
|
|
if settings.TON_DAEMON_ENABLED:
|
|
await task_manager.start_service("ton_daemon", ton_service.main_fn)
|
|
|
|
if settings.LICENSE_SERVICE_ENABLED:
|
|
await task_manager.start_service("license_service", license_service.main_fn)
|
|
|
|
if settings.CONVERT_SERVICE_ENABLED:
|
|
await task_manager.start_service("convert_service", convert_service.main_fn)
|
|
|
|
# Попробовать запустить MY Network сервис
|
|
try:
|
|
await start_my_network_service()
|
|
except Exception as e:
|
|
logger.warning("MY Network service not started", error=str(e))
|
|
|
|
logger.info("Background services started")
|
|
|
|
|
|
async def start_my_network_service():
|
|
"""Запустить MY Network сервис."""
|
|
try:
|
|
from app.core.my_network.node_service import NodeService
|
|
|
|
# Создать и запустить сервис ноды
|
|
node_service = NodeService()
|
|
|
|
# Добавить как фоновую задачу
|
|
async def my_network_task():
|
|
await node_service.start()
|
|
|
|
# Держать сервис активным
|
|
try:
|
|
while True:
|
|
await asyncio.sleep(60) # Проверять каждую минуту
|
|
|
|
# Проверить состояние сервиса
|
|
if not node_service.is_running:
|
|
logger.warning("MY Network service stopped unexpectedly")
|
|
break
|
|
|
|
except asyncio.CancelledError:
|
|
logger.info("MY Network service shutdown requested")
|
|
await node_service.stop()
|
|
raise
|
|
except Exception as e:
|
|
logger.error("MY Network service error", error=str(e))
|
|
await node_service.stop()
|
|
raise
|
|
|
|
await task_manager.start_service("my_network", my_network_task)
|
|
logger.info("MY Network service started")
|
|
|
|
except ImportError as e:
|
|
logger.info("MY Network modules not available", error=str(e))
|
|
except Exception as e:
|
|
logger.error("Failed to start MY Network service", error=str(e))
|
|
raise
|
|
|
|
|
|
# Add startup task
|
|
app.add_startup_task(start_background_services)
|
|
app.add_shutdown_task(task_manager.stop_all_services)
|
|
|
|
# Register routes
|
|
register_routes()
|
|
|
|
|
|
# Main application factory
|
|
def create_app() -> EnhancedSanic:
|
|
"""Application factory"""
|
|
return app
|
|
|
|
|
|
# Development server runner
|
|
async def run_dev_server():
|
|
"""Run development server"""
|
|
await app.create_server(
|
|
host="0.0.0.0",
|
|
port=settings.SANIC_PORT,
|
|
debug=settings.DEBUG,
|
|
auto_reload=settings.DEBUG,
|
|
access_log=False
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
logger.info("Starting development server")
|
|
asyncio.run(run_dev_server())
|