uploader-bot/app/api/routes/my_network_routes.py

653 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""MY Network API Routes - эндпоинты для работы с распределенной сетью."""
import asyncio
import logging
import json
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Any
from fastapi import APIRouter, HTTPException, Depends, UploadFile, File, Query
from fastapi.responses import FileResponse, StreamingResponse
from sqlalchemy import select, and_, func
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import db_manager
from app.core.security import get_current_user_optional
from app.core.cache import cache
# Import content models directly to avoid circular imports
from app.core.models.content_models import StoredContent as Content, UserContent as ContentMetadata
logger = logging.getLogger(__name__)
# Создать router для MY Network API
router = APIRouter(prefix="/api/my", tags=["MY Network"])
def get_node_service():
"""Получить сервис ноды."""
try:
from app.core.my_network.node_service import get_node_service
return get_node_service()
except Exception as e:
logger.error(f"Error getting node service: {e}")
raise HTTPException(status_code=503, detail="MY Network service unavailable")
@router.get("/node/info")
async def get_node_info():
"""Получить информацию о текущей ноде."""
try:
node_service = get_node_service()
if not node_service:
raise HTTPException(status_code=503, detail="Node service not available")
node_info = await node_service.get_node_info()
return {
"success": True,
"data": node_info,
"timestamp": datetime.utcnow().isoformat()
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting node info: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/node/peers")
async def get_node_peers():
"""Получить список подключенных пиров."""
try:
node_service = get_node_service()
peers_info = await node_service.get_peers_info()
return {
"success": True,
"data": {
"connected_peers": peers_info["connected_peers"],
"peer_count": peers_info["peer_count"],
"peers": peers_info["peers"]
},
"timestamp": datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"Error getting peers: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/node/peers/connect")
async def connect_to_peer(peer_data: Dict[str, Any]):
"""Подключиться к новому пиру."""
try:
peer_address = peer_data.get("address")
if not peer_address:
raise HTTPException(status_code=400, detail="Peer address is required")
node_service = get_node_service()
success = await node_service.peer_manager.connect_to_peer(peer_address)
if success:
return {
"success": True,
"message": f"Successfully connected to peer: {peer_address}",
"timestamp": datetime.utcnow().isoformat()
}
else:
raise HTTPException(status_code=400, detail="Failed to connect to peer")
except HTTPException:
raise
except Exception as e:
logger.error(f"Error connecting to peer: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/node/peers/{peer_id}")
async def disconnect_peer(peer_id: str):
"""Отключиться от пира."""
try:
node_service = get_node_service()
success = await node_service.peer_manager.disconnect_peer(peer_id)
if success:
return {
"success": True,
"message": f"Successfully disconnected from peer: {peer_id}",
"timestamp": datetime.utcnow().isoformat()
}
else:
raise HTTPException(status_code=404, detail="Peer not found or already disconnected")
except HTTPException:
raise
except Exception as e:
logger.error(f"Error disconnecting peer: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/content/list")
async def get_content_list(
limit: int = Query(100, ge=1, le=1000),
offset: int = Query(0, ge=0),
session: AsyncSession = Depends(get_db_session)
):
"""Получить список доступного контента."""
try:
# Кэшировать результат на 5 минут
cache_key = f"my_network:content_list:{limit}:{offset}"
cached_result = await cache.get(cache_key)
if cached_result:
return json.loads(cached_result)
# Получить контент из БД
stmt = (
select(Content, ContentMetadata)
.outerjoin(ContentMetadata, Content.id == ContentMetadata.content_id)
.where(Content.disabled == False)
.order_by(Content.created_at.desc())
.limit(limit)
.offset(offset)
)
result = await session.execute(stmt)
content_items = []
for content, metadata in result:
content_data = {
"hash": content.hash,
"filename": content.filename,
"file_size": content.file_size,
"content_type": content.content_type,
"mime_type": content.mime_type,
"created_at": content.created_at.isoformat(),
"encrypted": content.encrypted,
"metadata": metadata.to_dict() if metadata else {}
}
content_items.append(content_data)
# Получить общее количество
count_stmt = select(func.count(Content.id)).where(Content.disabled == False)
count_result = await session.execute(count_stmt)
total_count = count_result.scalar()
response_data = {
"success": True,
"data": {
"content": content_items,
"total": total_count,
"limit": limit,
"offset": offset
},
"timestamp": datetime.utcnow().isoformat()
}
# Кэшировать результат
await cache.set(cache_key, json.dumps(response_data), expire=300)
return response_data
except Exception as e:
logger.error(f"Error getting content list: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/content/{content_hash}/exists")
async def check_content_exists(
content_hash: str,
session: AsyncSession = Depends(get_db_session)
):
"""Проверить существование контента по хешу."""
try:
# Кэшировать результат на 30 минут
cache_key = f"my_network:content_exists:{content_hash}"
cached_result = await cache.get(cache_key)
if cached_result is not None:
return {"exists": cached_result == "true", "hash": content_hash}
# Проверить в БД
stmt = select(Content.id).where(
and_(
Content.disabled == False,
Content.hash == content_hash
)
)
result = await session.execute(stmt)
exists = result.scalar_one_or_none() is not None
# Кэшировать результат
await cache.set(cache_key, "true" if exists else "false", expire=1800)
return {
"exists": exists,
"hash": content_hash,
"timestamp": datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"Error checking content existence: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/content/{content_hash}/metadata")
async def get_content_metadata(
content_hash: str,
session: AsyncSession = Depends(get_db_session)
):
"""Получить метаданные контента."""
try:
# Кэшировать результат на 10 минут
cache_key = f"my_network:content_metadata:{content_hash}"
cached_result = await cache.get(cache_key)
if cached_result:
return json.loads(cached_result)
# Найти контент в БД
stmt = (
select(Content, ContentMetadata)
.outerjoin(ContentMetadata, Content.id == ContentMetadata.content_id)
.where(
and_(
Content.disabled == False,
Content.hash == content_hash
)
)
)
result = await session.execute(stmt)
content_data = result.first()
if not content_data:
raise HTTPException(status_code=404, detail="Content not found")
content, metadata = content_data
response_data = {
"success": True,
"data": {
"hash": content_hash,
"filename": content.filename,
"file_size": content.file_size,
"content_type": content.content_type,
"mime_type": content.mime_type,
"created_at": content.created_at.isoformat(),
"updated_at": content.updated_at.isoformat() if content.updated_at else None,
"encrypted": content.encrypted,
"processing_status": content.processing_status,
"metadata": metadata.to_dict() if metadata else {}
},
"timestamp": datetime.utcnow().isoformat()
}
# Кэшировать результат
await cache.set(cache_key, json.dumps(response_data), expire=600)
return response_data
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting content metadata: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/content/{content_hash}/download")
async def download_content(
content_hash: str,
session: AsyncSession = Depends(get_db_session)
):
"""Скачать контент по хешу."""
try:
# Найти контент в БД
stmt = select(Content).where(
and_(
Content.disabled == False,
Content.hash == content_hash
)
)
result = await session.execute(stmt)
content = result.scalar_one_or_none()
if not content:
raise HTTPException(status_code=404, detail="Content not found")
# Проверить существование файла
file_path = Path(content.file_path)
if not file_path.exists():
raise HTTPException(status_code=404, detail="File not found on disk")
# Вернуть файл
return FileResponse(
path=str(file_path),
filename=content.filename,
media_type=content.mime_type or "application/octet-stream"
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error downloading content: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/content/{content_hash}/upload")
async def upload_content(
content_hash: str,
file: UploadFile = File(...),
session: AsyncSession = Depends(get_db_session)
):
"""Загрузить контент в ноду."""
try:
# Проверить, не существует ли уже контент
exists_stmt = select(Content.id).where(
and_(
Content.disabled == False,
Content.hash == content_hash
)
)
exists_result = await session.execute(exists_stmt)
if exists_result.scalar_one_or_none():
return {
"success": True,
"message": "Content already exists",
"hash": content_hash
}
# Создать директорию для хранения
storage_path = Path("./storage/my-network/received")
storage_path.mkdir(parents=True, exist_ok=True)
# Сохранить файл
file_path = storage_path / f"{content_hash}_{file.filename}"
with open(file_path, "wb") as buffer:
content_data = await file.read()
buffer.write(content_data)
# Вычислить хеши для проверки
import hashlib
md5_hash = hashlib.md5(content_data).hexdigest()
sha256_hash = hashlib.sha256(content_data).hexdigest()
# Проверить соответствие хеша
if content_hash not in [md5_hash, sha256_hash]:
file_path.unlink() # Удалить файл
raise HTTPException(status_code=400, detail="Content hash mismatch")
# Сохранить в БД
new_content = Content(
filename=file.filename,
hash=sha256_hash, # Используем SHA256 как основной хеш
file_size=len(content_data),
content_type=file.filename.split('.')[-1] if '.' in file.filename else 'unknown',
mime_type=file.content_type or "application/octet-stream",
file_path=str(file_path),
disabled=False,
processing_status="ready"
)
session.add(new_content)
await session.commit()
logger.info(f"Successfully uploaded content {content_hash}")
return {
"success": True,
"message": "Content uploaded successfully",
"hash": content_hash,
"content_id": new_content.id,
"timestamp": datetime.utcnow().isoformat()
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error uploading content: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/content/replicate")
async def replicate_content(replication_request: Dict[str, Any]):
"""Принять запрос на репликацию контента."""
try:
content_hash = replication_request.get("content_hash")
metadata = replication_request.get("metadata", {})
source_node = replication_request.get("source_node")
if not content_hash:
raise HTTPException(status_code=400, detail="Content hash is required")
# Проверить, нужна ли репликация
async with db_manager.get_session() as session:
exists_stmt = select(Content.id).where(
and_(
Content.disabled == False,
Content.hash == content_hash
)
)
exists_result = await session.execute(exists_stmt)
if exists_result.scalar_one_or_none():
return {
"success": True,
"message": "Content already exists, replication not needed",
"hash": content_hash
}
# Подготовить для репликации
logger.info(f"Accepting replication request for {content_hash} from {source_node}")
return {
"success": True,
"message": "Replication request accepted",
"hash": content_hash,
"ready_for_upload": True,
"timestamp": datetime.utcnow().isoformat()
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error processing replication request: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/sync/status")
async def get_sync_status():
"""Получить статус синхронизации."""
try:
node_service = get_node_service()
sync_status = await node_service.sync_manager.get_sync_status()
return {
"success": True,
"data": sync_status,
"timestamp": datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"Error getting sync status: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/sync/start")
async def start_network_sync():
"""Запустить синхронизацию с сетью."""
try:
node_service = get_node_service()
sync_result = await node_service.sync_manager.sync_with_network()
return {
"success": True,
"data": sync_result,
"timestamp": datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"Error starting network sync: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/sync/content/{content_hash}")
async def get_content_sync_status(content_hash: str):
"""Получить статус синхронизации конкретного контента."""
try:
node_service = get_node_service()
sync_status = await node_service.sync_manager.get_content_sync_status(content_hash)
return {
"success": True,
"data": sync_status,
"timestamp": datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"Error getting content sync status: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/content/{content_hash}/replicate")
async def replicate_content_to_nodes(
content_hash: str,
replication_config: Dict[str, Any]
):
"""Реплицировать контент на указанные ноды."""
try:
target_nodes = replication_config.get("target_nodes", [])
if not target_nodes:
raise HTTPException(status_code=400, detail="Target nodes are required")
node_service = get_node_service()
replication_result = await node_service.sync_manager.replicate_content_to_nodes(
content_hash,
target_nodes
)
return {
"success": True,
"data": replication_result,
"timestamp": datetime.utcnow().isoformat()
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error replicating content: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/network/stats")
async def get_network_stats():
"""Получить статистику сети."""
try:
node_service = get_node_service()
# Получить информацию о ноде и пирах
node_info = await node_service.get_node_info()
peers_info = await node_service.get_peers_info()
sync_status = await node_service.sync_manager.get_sync_status()
# Статистика контента
async with db_manager.get_session() as session:
# Общее количество контента
content_count_stmt = select(func.count(Content.id)).where(Content.disabled == False)
content_count_result = await session.execute(content_count_stmt)
total_content = content_count_result.scalar()
# Размер контента
size_stmt = select(func.sum(Content.file_size)).where(Content.disabled == False)
size_result = await session.execute(size_stmt)
total_size = size_result.scalar() or 0
# Контент по типам
type_stmt = select(Content.content_type, func.count(Content.id)).where(Content.disabled == False).group_by(Content.content_type)
type_result = await session.execute(type_stmt)
content_by_type = {row[0]: row[1] for row in type_result}
network_stats = {
"node_info": {
"node_id": node_info["node_id"],
"uptime": node_info["uptime"],
"version": node_info["version"],
"status": node_info["status"]
},
"network": {
"connected_peers": peers_info["peer_count"],
"known_peers": len(peers_info["peers"]),
"network_health": "good" if peers_info["peer_count"] > 0 else "isolated"
},
"content": {
"total_items": total_content,
"total_size_bytes": total_size,
"total_size_mb": round(total_size / (1024 * 1024), 2),
"content_by_type": content_by_type
},
"sync": {
"active_syncs": sync_status["active_syncs"],
"queue_size": sync_status["queue_size"],
"is_running": sync_status["is_running"]
}
}
return {
"success": True,
"data": network_stats,
"timestamp": datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"Error getting network stats: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/health")
async def health_check():
"""Проверка здоровья MY Network ноды."""
try:
node_service = get_node_service()
# Базовая проверка сервисов
health_status = {
"status": "healthy",
"timestamp": datetime.utcnow().isoformat(),
"services": {
"node_service": node_service is not None,
"peer_manager": hasattr(node_service, 'peer_manager') if node_service else False,
"sync_manager": hasattr(node_service, 'sync_manager') if node_service else False,
"database": True # Если дошли до этой точки, БД работает
}
}
# Проверить подключение к пирам
if node_service:
peers_info = await node_service.get_peers_info()
health_status["network"] = {
"connected_peers": peers_info["peer_count"],
"status": "connected" if peers_info["peer_count"] > 0 else "isolated"
}
# Определить общий статус
if not all(health_status["services"].values()):
health_status["status"] = "unhealthy"
elif node_service and peers_info["peer_count"] == 0:
health_status["status"] = "isolated"
return health_status
except Exception as e:
logger.error(f"Health check failed: {e}")
return {
"status": "unhealthy",
"error": str(e),
"timestamp": datetime.utcnow().isoformat()
}