""" MY Network v3.0 - Ed25519 Cryptographic Manager for uploader-bot Модуль для работы с ed25519 ключами и подписями. Все inter-node сообщения должны быть подписаны и проверены. """ import os import base64 import json import hashlib from typing import Dict, Any, Optional, Tuple from pathlib import Path import logging import time try: import ed25519 from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import ed25519 as crypto_ed25519 except ImportError as e: logging.error(f"Required cryptographic libraries not found: {e}") raise ImportError("Please install: pip install ed25519 cryptography") logger = logging.getLogger(__name__) class Ed25519Manager: """Менеджер для ed25519 криптографических операций в uploader-bot""" def __init__(self, private_key_path: Optional[str] = None, public_key_path: Optional[str] = None): """ Инициализация Ed25519Manager Args: private_key_path: Путь к приватному ключу public_key_path: Путь к публичному ключу """ self.private_key_path = private_key_path or os.getenv('NODE_PRIVATE_KEY_PATH') self.public_key_path = public_key_path or os.getenv('NODE_PUBLIC_KEY_PATH') self._private_key = None self._public_key = None self._node_id = None # Загружаем ключи при инициализации self._load_keys() def _load_keys(self) -> None: """Загрузка ключей из файлов""" try: # Загрузка приватного ключа if self.private_key_path and os.path.exists(self.private_key_path): with open(self.private_key_path, 'rb') as f: private_key_data = f.read() # Загружаем PEM ключ self._private_key = serialization.load_pem_private_key( private_key_data, password=None ) # Получаем публичный ключ из приватного self._public_key = self._private_key.public_key() # Генерируем NODE_ID из публичного ключа self._node_id = self._generate_node_id() logger.info(f"Ed25519 ключи загружены. Node ID: {self._node_id}") else: logger.warning(f"Private key file not found: {self.private_key_path}") except Exception as e: logger.error(f"Error loading Ed25519 keys: {e}") raise def _generate_node_id(self) -> str: """Генерация NODE_ID из публичного ключа""" if not self._public_key: raise ValueError("Public key not loaded") # Получаем raw bytes публичного ключа public_key_bytes = self._public_key.public_bytes( encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw ) # Создаем упрощенный base58-подобный NODE_ID # В реальной реализации здесь должен быть полный base58 hex_key = public_key_bytes.hex() return f"node-{hex_key[:16]}" @property def node_id(self) -> str: """Получить NODE_ID""" if not self._node_id: raise ValueError("Node ID not generated. Check if keys are loaded.") return self._node_id @property def public_key_hex(self) -> str: """Получить публичный ключ в hex формате""" if not self._public_key: raise ValueError("Public key not loaded") public_key_bytes = self._public_key.public_bytes( encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw ) return public_key_bytes.hex() def sign_message(self, message: Dict[str, Any]) -> str: """ Подписать сообщение ed25519 ключом Args: message: Словарь с данными для подписи Returns: base64-encoded подпись """ if not self._private_key: raise ValueError("Private key not loaded") # Сериализуем сообщение в JSON для подписи message_json = json.dumps(message, sort_keys=True, ensure_ascii=False) message_bytes = message_json.encode('utf-8') # Создаем хеш сообщения для подписи message_hash = hashlib.sha256(message_bytes).digest() # Подписываем хеш signature = self._private_key.sign(message_hash) # Возвращаем подпись в base64 return base64.b64encode(signature).decode('ascii') def verify_signature(self, message: Dict[str, Any], signature: str, public_key_hex: str) -> bool: """ Проверить подпись сообщения Args: message: Словарь с данными signature: base64-encoded подпись public_key_hex: Публичный ключ в hex формате Returns: True если подпись валидна """ try: # Восстанавливаем публичный ключ из hex public_key_bytes = bytes.fromhex(public_key_hex) public_key = crypto_ed25519.Ed25519PublicKey.from_public_bytes(public_key_bytes) # Сериализуем сообщение так же как при подписи message_json = json.dumps(message, sort_keys=True, ensure_ascii=False) message_bytes = message_json.encode('utf-8') message_hash = hashlib.sha256(message_bytes).digest() # Декодируем подпись signature_bytes = base64.b64decode(signature.encode('ascii')) # Проверяем подпись public_key.verify(signature_bytes, message_hash) return True except Exception as e: logger.warning(f"Signature verification failed: {e}") return False def create_signed_message(self, message_type: str, data: Dict[str, Any]) -> Dict[str, Any]: """ Создать подписанное сообщение для отправки Args: message_type: Тип сообщения (handshake, sync_request, etc.) data: Данные сообщения Returns: Подписанное сообщение """ # Основная структура сообщения message = { "type": message_type, "node_id": self.node_id, "public_key": self.public_key_hex, "timestamp": int(time.time()), "data": data } # Подписываем сообщение signature = self.sign_message(message) # Добавляем подпись signed_message = message.copy() signed_message["signature"] = signature return signed_message def verify_incoming_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: """ Проверить входящее подписанное сообщение Args: message: Входящее сообщение Returns: (is_valid, error_message) """ try: # Проверяем обязательные поля required_fields = ["type", "node_id", "public_key", "timestamp", "data", "signature"] for field in required_fields: if field not in message: return False, f"Missing required field: {field}" # Извлекаем подпись и создаем сообщение без подписи для проверки signature = message.pop("signature") # Проверяем подпись is_valid = self.verify_signature(message, signature, message["public_key"]) if not is_valid: return False, "Invalid signature" # Проверяем временную метку (не старше 5 минут) current_time = int(time.time()) if abs(current_time - message["timestamp"]) > 300: return False, "Message timestamp too old" return True, None except Exception as e: return False, f"Verification error: {str(e)}" def create_handshake_message(self, target_node_id: str, additional_data: Optional[Dict] = None) -> Dict[str, Any]: """ Создать сообщение для handshake с другой нодой Args: target_node_id: ID целевой ноды additional_data: Дополнительные данные Returns: Подписанное handshake сообщение """ handshake_data = { "target_node_id": target_node_id, "protocol_version": "3.0", "node_type": os.getenv("NODE_TYPE", "uploader"), "capabilities": ["upload", "content_streaming", "conversion", "storage"] } if additional_data: handshake_data.update(additional_data) return self.create_signed_message("handshake", handshake_data) def create_upload_message(self, content_hash: str, metadata: Dict[str, Any]) -> Dict[str, Any]: """ Создать подписанное сообщение для загрузки контента Args: content_hash: Хеш контента metadata: Метаданные файла Returns: Подписанное upload сообщение """ upload_data = { "content_hash": content_hash, "metadata": metadata, "uploader_node": self.node_id, "upload_timestamp": int(time.time()) } return self.create_signed_message("content_upload", upload_data) def create_sync_message(self, content_list: list, operation: str = "announce") -> Dict[str, Any]: """ Создать сообщение для синхронизации контента Args: content_list: Список контента для синхронизации operation: Тип операции (announce, request, response) Returns: Подписанное sync сообщение """ sync_data = { "operation": operation, "content_list": content_list, "sync_id": hashlib.sha256( (self.node_id + str(int(time.time()))).encode() ).hexdigest()[:16] } return self.create_signed_message("content_sync", sync_data) # Глобальный экземпляр менеджера _ed25519_manager = None def get_ed25519_manager() -> Ed25519Manager: """Получить глобальный экземпляр Ed25519Manager""" global _ed25519_manager if _ed25519_manager is None: _ed25519_manager = Ed25519Manager() return _ed25519_manager def init_ed25519_manager(private_key_path: str, public_key_path: str) -> Ed25519Manager: """Инициализировать Ed25519Manager с путями к ключам""" global _ed25519_manager _ed25519_manager = Ed25519Manager(private_key_path, public_key_path) return _ed25519_manager