486 lines
17 KiB
Python
486 lines
17 KiB
Python
"""
|
||
Клиент для межузлового общения с ed25519 подписями
|
||
"""
|
||
import asyncio
|
||
import json
|
||
import aiohttp
|
||
from typing import Dict, Any, Optional, List
|
||
from datetime import datetime
|
||
from urllib.parse import urljoin
|
||
|
||
from app.core.crypto import get_ed25519_manager
|
||
from app.core.logging import get_logger
|
||
|
||
logger = get_logger(__name__)
|
||
|
||
|
||
class NodeClient:
|
||
"""Клиент для подписанного межузлового общения"""
|
||
|
||
def __init__(self, timeout: int = 30):
|
||
self.timeout = aiohttp.ClientTimeout(total=timeout)
|
||
self.session: Optional[aiohttp.ClientSession] = None
|
||
|
||
async def __aenter__(self):
|
||
"""Async context manager entry"""
|
||
self.session = aiohttp.ClientSession(timeout=self.timeout)
|
||
return self
|
||
|
||
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
||
"""Async context manager exit"""
|
||
if self.session:
|
||
await self.session.close()
|
||
|
||
async def _create_signed_request(
|
||
self,
|
||
action: str,
|
||
data: Dict[str, Any],
|
||
target_url: str
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
Создать подписанный запрос для межузлового общения
|
||
|
||
Args:
|
||
action: Тип действия (handshake, content_sync, ping, etc.)
|
||
data: Данные сообщения
|
||
target_url: URL целевой ноды
|
||
|
||
Returns:
|
||
Заголовки и тело запроса
|
||
"""
|
||
crypto_manager = get_ed25519_manager()
|
||
|
||
# Создаем сообщение
|
||
message = {
|
||
"action": action,
|
||
"timestamp": datetime.utcnow().isoformat(),
|
||
**data
|
||
}
|
||
|
||
# Подписываем сообщение
|
||
signature = crypto_manager.sign_message(message)
|
||
|
||
# Создаем заголовки
|
||
headers = {
|
||
"Content-Type": "application/json",
|
||
"X-Node-Communication": "true",
|
||
"X-Node-ID": crypto_manager.node_id,
|
||
"X-Node-Public-Key": crypto_manager.public_key_hex,
|
||
"X-Node-Signature": signature
|
||
}
|
||
|
||
return {
|
||
"headers": headers,
|
||
"json": message
|
||
}
|
||
|
||
async def send_handshake(
|
||
self,
|
||
target_url: str,
|
||
our_node_info: Dict[str, Any]
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
Отправить хэндшейк ноде
|
||
|
||
Args:
|
||
target_url: URL целевой ноды (например, "http://node.example.com:8000")
|
||
our_node_info: Информация о нашей ноде
|
||
|
||
Returns:
|
||
Ответ от ноды или информация об ошибке
|
||
"""
|
||
endpoint_url = urljoin(target_url, "/api/node/handshake")
|
||
|
||
try:
|
||
request_data = await self._create_signed_request(
|
||
"handshake",
|
||
{"node_info": our_node_info},
|
||
target_url
|
||
)
|
||
|
||
logger.info(f"Sending handshake to {target_url}")
|
||
|
||
async with self.session.post(endpoint_url, **request_data) as response:
|
||
response_data = await response.json()
|
||
|
||
if response.status == 200:
|
||
logger.info(f"Handshake successful with {target_url}")
|
||
return {
|
||
"success": True,
|
||
"data": response_data,
|
||
"node_url": target_url
|
||
}
|
||
else:
|
||
logger.warning(f"Handshake failed with {target_url}: {response.status}")
|
||
return {
|
||
"success": False,
|
||
"error": f"HTTP {response.status}",
|
||
"data": response_data,
|
||
"node_url": target_url
|
||
}
|
||
|
||
except asyncio.TimeoutError:
|
||
logger.warning(f"Handshake timeout with {target_url}")
|
||
return {
|
||
"success": False,
|
||
"error": "timeout",
|
||
"node_url": target_url
|
||
}
|
||
except Exception as e:
|
||
logger.error(f"Handshake error with {target_url}: {e}")
|
||
return {
|
||
"success": False,
|
||
"error": str(e),
|
||
"node_url": target_url
|
||
}
|
||
|
||
async def send_content_sync(
|
||
self,
|
||
target_url: str,
|
||
sync_type: str,
|
||
content_info: Dict[str, Any]
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
Отправить запрос синхронизации контента
|
||
|
||
Args:
|
||
target_url: URL целевой ноды
|
||
sync_type: Тип синхронизации (new_content, content_list, content_request)
|
||
content_info: Информация о контенте
|
||
|
||
Returns:
|
||
Ответ от ноды
|
||
"""
|
||
endpoint_url = urljoin(target_url, "/api/node/content/sync")
|
||
|
||
try:
|
||
request_data = await self._create_signed_request(
|
||
"content_sync",
|
||
{
|
||
"sync_type": sync_type,
|
||
"content_info": content_info
|
||
},
|
||
target_url
|
||
)
|
||
|
||
logger.info(f"Sending content sync ({sync_type}) to {target_url}")
|
||
|
||
async with self.session.post(endpoint_url, **request_data) as response:
|
||
response_data = await response.json()
|
||
|
||
if response.status == 200:
|
||
logger.debug(f"Content sync successful with {target_url}")
|
||
return {
|
||
"success": True,
|
||
"data": response_data,
|
||
"node_url": target_url
|
||
}
|
||
else:
|
||
logger.warning(f"Content sync failed with {target_url}: {response.status}")
|
||
return {
|
||
"success": False,
|
||
"error": f"HTTP {response.status}",
|
||
"data": response_data,
|
||
"node_url": target_url
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"Content sync error with {target_url}: {e}")
|
||
return {
|
||
"success": False,
|
||
"error": str(e),
|
||
"node_url": target_url
|
||
}
|
||
|
||
async def send_ping(self, target_url: str) -> Dict[str, Any]:
|
||
"""
|
||
Отправить пинг ноде
|
||
|
||
Args:
|
||
target_url: URL целевой ноды
|
||
|
||
Returns:
|
||
Ответ от ноды (pong)
|
||
"""
|
||
endpoint_url = urljoin(target_url, "/api/node/network/ping")
|
||
|
||
try:
|
||
request_data = await self._create_signed_request(
|
||
"ping",
|
||
{"data": {"test": True}},
|
||
target_url
|
||
)
|
||
|
||
start_time = datetime.utcnow()
|
||
|
||
async with self.session.post(endpoint_url, **request_data) as response:
|
||
end_time = datetime.utcnow()
|
||
duration = (end_time - start_time).total_seconds() * 1000 # ms
|
||
|
||
response_data = await response.json()
|
||
|
||
if response.status == 200:
|
||
return {
|
||
"success": True,
|
||
"data": response_data,
|
||
"latency_ms": round(duration, 2),
|
||
"node_url": target_url
|
||
}
|
||
else:
|
||
return {
|
||
"success": False,
|
||
"error": f"HTTP {response.status}",
|
||
"data": response_data,
|
||
"node_url": target_url
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"Ping error with {target_url}: {e}")
|
||
return {
|
||
"success": False,
|
||
"error": str(e),
|
||
"node_url": target_url
|
||
}
|
||
|
||
async def get_node_status(self, target_url: str) -> Dict[str, Any]:
|
||
"""
|
||
Получить статус ноды (GET запрос без подписи)
|
||
|
||
Args:
|
||
target_url: URL целевой ноды
|
||
|
||
Returns:
|
||
Статус ноды
|
||
"""
|
||
endpoint_url = urljoin(target_url, "/api/node/network/status")
|
||
|
||
try:
|
||
async with self.session.get(endpoint_url) as response:
|
||
response_data = await response.json()
|
||
|
||
if response.status == 200:
|
||
return {
|
||
"success": True,
|
||
"data": response_data,
|
||
"node_url": target_url
|
||
}
|
||
else:
|
||
return {
|
||
"success": False,
|
||
"error": f"HTTP {response.status}",
|
||
"data": response_data,
|
||
"node_url": target_url
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"Status request error with {target_url}: {e}")
|
||
return {
|
||
"success": False,
|
||
"error": str(e),
|
||
"node_url": target_url
|
||
}
|
||
|
||
async def send_discovery(
|
||
self,
|
||
target_url: str,
|
||
known_nodes: List[Dict[str, Any]]
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
Отправить запрос обнаружения нод
|
||
|
||
Args:
|
||
target_url: URL целевой ноды
|
||
known_nodes: Список известных нам нод
|
||
|
||
Returns:
|
||
Список нод от целевой ноды
|
||
"""
|
||
endpoint_url = urljoin(target_url, "/api/node/network/discover")
|
||
|
||
try:
|
||
request_data = await self._create_signed_request(
|
||
"discover",
|
||
{"known_nodes": known_nodes},
|
||
target_url
|
||
)
|
||
|
||
logger.info(f"Sending discovery request to {target_url}")
|
||
|
||
async with self.session.post(endpoint_url, **request_data) as response:
|
||
response_data = await response.json()
|
||
|
||
if response.status == 200:
|
||
logger.debug(f"Discovery successful with {target_url}")
|
||
return {
|
||
"success": True,
|
||
"data": response_data,
|
||
"node_url": target_url
|
||
}
|
||
else:
|
||
logger.warning(f"Discovery failed with {target_url}: {response.status}")
|
||
return {
|
||
"success": False,
|
||
"error": f"HTTP {response.status}",
|
||
"data": response_data,
|
||
"node_url": target_url
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"Discovery error with {target_url}: {e}")
|
||
return {
|
||
"success": False,
|
||
"error": str(e),
|
||
"node_url": target_url
|
||
}
|
||
|
||
|
||
class NodeNetworkManager:
|
||
"""Менеджер для работы с сетью нод"""
|
||
|
||
def __init__(self):
|
||
self.known_nodes: List[str] = []
|
||
self.active_nodes: List[str] = []
|
||
|
||
async def discover_nodes(self, bootstrap_nodes: List[str]) -> List[str]:
|
||
"""
|
||
Обнаружить ноды в сети через bootstrap ноды
|
||
|
||
Args:
|
||
bootstrap_nodes: Список bootstrap нод для начального подключения
|
||
|
||
Returns:
|
||
Список обнаруженных активных нод
|
||
"""
|
||
discovered_nodes = set()
|
||
|
||
async with NodeClient() as client:
|
||
# Получить информацию о нашей ноде
|
||
crypto_manager = get_ed25519_manager()
|
||
our_node_info = {
|
||
"node_id": crypto_manager.node_id,
|
||
"version": "3.0.0",
|
||
"capabilities": [
|
||
"content_upload",
|
||
"content_sync",
|
||
"decentralized_filtering",
|
||
"ed25519_signatures"
|
||
],
|
||
"network_info": {
|
||
"public_key": crypto_manager.public_key_hex,
|
||
"protocol_version": "1.0"
|
||
}
|
||
}
|
||
|
||
# Попробовать подключиться к bootstrap нодам
|
||
for node_url in bootstrap_nodes:
|
||
try:
|
||
# Выполнить хэндшейк
|
||
handshake_result = await client.send_handshake(node_url, our_node_info)
|
||
|
||
if handshake_result["success"]:
|
||
discovered_nodes.add(node_url)
|
||
|
||
# Запросить список известных нод
|
||
discovery_result = await client.send_discovery(node_url, list(discovered_nodes))
|
||
|
||
if discovery_result["success"]:
|
||
# Добавить ноды из ответа
|
||
known_nodes = discovery_result["data"]["data"]["known_nodes"]
|
||
for node_info in known_nodes:
|
||
if "url" in node_info:
|
||
discovered_nodes.add(node_info["url"])
|
||
|
||
except Exception as e:
|
||
logger.warning(f"Failed to discover through {node_url}: {e}")
|
||
|
||
self.known_nodes = list(discovered_nodes)
|
||
return self.known_nodes
|
||
|
||
async def check_node_health(self, nodes: List[str]) -> Dict[str, Dict[str, Any]]:
|
||
"""
|
||
Проверить состояние нод
|
||
|
||
Args:
|
||
nodes: Список нод для проверки
|
||
|
||
Returns:
|
||
Словарь с результатами проверки для каждой ноды
|
||
"""
|
||
results = {}
|
||
|
||
async with NodeClient() as client:
|
||
# Создаем задачи для параллельной проверки
|
||
tasks = []
|
||
for node_url in nodes:
|
||
task = asyncio.create_task(client.send_ping(node_url))
|
||
tasks.append((node_url, task))
|
||
|
||
# Ждем завершения всех задач
|
||
for node_url, task in tasks:
|
||
try:
|
||
result = await task
|
||
results[node_url] = result
|
||
except Exception as e:
|
||
results[node_url] = {
|
||
"success": False,
|
||
"error": str(e),
|
||
"node_url": node_url
|
||
}
|
||
|
||
# Обновляем список активных нод
|
||
self.active_nodes = [
|
||
node_url for node_url, result in results.items()
|
||
if result.get("success", False)
|
||
]
|
||
|
||
return results
|
||
|
||
async def broadcast_content(
|
||
self,
|
||
content_info: Dict[str, Any],
|
||
target_nodes: Optional[List[str]] = None
|
||
) -> Dict[str, Dict[str, Any]]:
|
||
"""
|
||
Транслировать информацию о новом контенте всем активным нодам
|
||
|
||
Args:
|
||
content_info: Информация о контенте
|
||
target_nodes: Список целевых нод (по умолчанию все активные)
|
||
|
||
Returns:
|
||
Результаты трансляции для каждой ноды
|
||
"""
|
||
nodes = target_nodes or self.active_nodes
|
||
results = {}
|
||
|
||
async with NodeClient() as client:
|
||
# Создаем задачи для параллельной отправки
|
||
tasks = []
|
||
for node_url in nodes:
|
||
task = asyncio.create_task(
|
||
client.send_content_sync(node_url, "new_content", content_info)
|
||
)
|
||
tasks.append((node_url, task))
|
||
|
||
# Ждем завершения всех задач
|
||
for node_url, task in tasks:
|
||
try:
|
||
result = await task
|
||
results[node_url] = result
|
||
except Exception as e:
|
||
results[node_url] = {
|
||
"success": False,
|
||
"error": str(e),
|
||
"node_url": node_url
|
||
}
|
||
|
||
return results
|
||
|
||
|
||
# Глобальный экземпляр менеджера сети
|
||
network_manager = NodeNetworkManager()
|
||
|
||
|
||
async def get_network_manager() -> NodeNetworkManager:
|
||
"""Получить глобальный экземпляр менеджера сети"""
|
||
return network_manager |