uploader-bot/app/core/crypto/ed25519_manager.py

362 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
MY Network v3.0 - Ed25519 Cryptographic Manager for uploader-bot
Модуль для работы с ed25519 ключами и подписями.
Все inter-node сообщения должны быть подписаны и проверены.
"""
import os
import base64
import json
import hashlib
from typing import Dict, Any, Optional, Tuple
from pathlib import Path
import logging
import time
try:
import ed25519
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ed25519 as crypto_ed25519
CRYPTO_AVAILABLE = True
except ImportError as e:
logging.warning(f"Cryptographic libraries not found, using mock for testing: {e}")
CRYPTO_AVAILABLE = False
# Create mock classes for testing
class MockEd25519PrivateKey:
def sign(self, data): return b"mock_signature_64_bytes_long_for_testing_purposes_only_not_real"
def public_key(self): return MockEd25519PublicKey()
class MockEd25519PublicKey:
def verify(self, signature, data): pass # Always succeeds in mock
def public_bytes(self, encoding=None, format=None): return b"mock_public_key_32_bytes_for_testing"
@classmethod
def from_public_bytes(cls, data): return cls()
class MockSerialization:
class Encoding:
Raw = "raw"
class PublicFormat:
Raw = "raw"
@staticmethod
def load_pem_private_key(data, password=None): return MockEd25519PrivateKey()
serialization = MockSerialization()
crypto_ed25519 = type('MockEd25519', (), {'Ed25519PublicKey': MockEd25519PublicKey})()
logger = logging.getLogger(__name__)
class Ed25519Manager:
"""Менеджер для ed25519 криптографических операций в uploader-bot"""
def __init__(self, private_key_path: Optional[str] = None, public_key_path: Optional[str] = None):
"""
Инициализация Ed25519Manager
Args:
private_key_path: Путь к приватному ключу
public_key_path: Путь к публичному ключу
"""
self.private_key_path = private_key_path or os.getenv('NODE_PRIVATE_KEY_PATH')
self.public_key_path = public_key_path or os.getenv('NODE_PUBLIC_KEY_PATH')
self._private_key = None
self._public_key = None
self._node_id = None
# Загружаем ключи при инициализации
self._load_keys()
def _load_keys(self) -> None:
"""Загрузка ключей из файлов"""
try:
# Загрузка приватного ключа
if self.private_key_path and os.path.exists(self.private_key_path):
with open(self.private_key_path, 'rb') as f:
private_key_data = f.read()
# Загружаем PEM ключ
self._private_key = serialization.load_pem_private_key(
private_key_data,
password=None
)
# Получаем публичный ключ из приватного
self._public_key = self._private_key.public_key()
# Генерируем NODE_ID из публичного ключа
self._node_id = self._generate_node_id()
logger.info(f"Ed25519 ключи загружены. Node ID: {self._node_id}")
else:
# Генерируем заглушки для тестирования
if not CRYPTO_AVAILABLE:
logger.warning("Using mock keys for testing (crypto libraries not available)")
self._private_key = MockEd25519PrivateKey()
self._public_key = MockEd25519PublicKey()
self._node_id = "node-mock-testing-12345"
else:
logger.warning(f"Private key file not found: {self.private_key_path}")
# Создаем временные ключи для тестирования
from cryptography.hazmat.primitives.asymmetric import ed25519
self._private_key = ed25519.Ed25519PrivateKey.generate()
self._public_key = self._private_key.public_key()
self._node_id = self._generate_node_id()
logger.info(f"Generated temporary keys for testing. Node ID: {self._node_id}")
except Exception as e:
logger.error(f"Error loading Ed25519 keys: {e}")
# Для тестирования создаем заглушки
if not CRYPTO_AVAILABLE:
logger.warning("Using mock keys for testing due to error")
self._private_key = MockEd25519PrivateKey()
self._public_key = MockEd25519PublicKey()
self._node_id = "node-mock-error-fallback"
else:
raise
def _generate_node_id(self) -> str:
"""Генерация NODE_ID из публичного ключа"""
if not self._public_key:
raise ValueError("Public key not loaded")
try:
# Получаем raw bytes публичного ключа
public_key_bytes = self._public_key.public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw
)
# Создаем упрощенный base58-подобный NODE_ID
# В реальной реализации здесь должен быть полный base58
hex_key = public_key_bytes.hex()
return f"node-{hex_key[:16]}"
except:
# Fallback для mock ключей
return f"node-mock-{hash(str(self._public_key)) % 1000000:06d}"
@property
def node_id(self) -> str:
"""Получить NODE_ID"""
if not self._node_id:
raise ValueError("Node ID not generated. Check if keys are loaded.")
return self._node_id
@property
def public_key_hex(self) -> str:
"""Получить публичный ключ в hex формате"""
if not self._public_key:
raise ValueError("Public key not loaded")
public_key_bytes = self._public_key.public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw
)
return public_key_bytes.hex()
def sign_message(self, message: Dict[str, Any]) -> str:
"""
Подписать сообщение ed25519 ключом
Args:
message: Словарь с данными для подписи
Returns:
base64-encoded подпись
"""
if not self._private_key:
raise ValueError("Private key not loaded")
# Сериализуем сообщение в JSON для подписи
message_json = json.dumps(message, sort_keys=True, ensure_ascii=False)
message_bytes = message_json.encode('utf-8')
# Создаем хеш сообщения для подписи
message_hash = hashlib.sha256(message_bytes).digest()
# Подписываем хеш
signature = self._private_key.sign(message_hash)
# Возвращаем подпись в base64
return base64.b64encode(signature).decode('ascii')
def verify_signature(self, message: Dict[str, Any], signature: str, public_key_hex: str) -> bool:
"""
Проверить подпись сообщения
Args:
message: Словарь с данными
signature: base64-encoded подпись
public_key_hex: Публичный ключ в hex формате
Returns:
True если подпись валидна
"""
try:
# Восстанавливаем публичный ключ из hex
public_key_bytes = bytes.fromhex(public_key_hex)
public_key = crypto_ed25519.Ed25519PublicKey.from_public_bytes(public_key_bytes)
# Сериализуем сообщение так же как при подписи
message_json = json.dumps(message, sort_keys=True, ensure_ascii=False)
message_bytes = message_json.encode('utf-8')
message_hash = hashlib.sha256(message_bytes).digest()
# Декодируем подпись
signature_bytes = base64.b64decode(signature.encode('ascii'))
# Проверяем подпись
public_key.verify(signature_bytes, message_hash)
return True
except Exception as e:
logger.warning(f"Signature verification failed: {e}")
return False
def create_signed_message(self, message_type: str, data: Dict[str, Any]) -> Dict[str, Any]:
"""
Создать подписанное сообщение для отправки
Args:
message_type: Тип сообщения (handshake, sync_request, etc.)
data: Данные сообщения
Returns:
Подписанное сообщение
"""
# Основная структура сообщения
message = {
"type": message_type,
"node_id": self.node_id,
"public_key": self.public_key_hex,
"timestamp": int(time.time()),
"data": data
}
# Подписываем сообщение
signature = self.sign_message(message)
# Добавляем подпись
signed_message = message.copy()
signed_message["signature"] = signature
return signed_message
def verify_incoming_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""
Проверить входящее подписанное сообщение
Args:
message: Входящее сообщение
Returns:
(is_valid, error_message)
"""
try:
# Проверяем обязательные поля
required_fields = ["type", "node_id", "public_key", "timestamp", "data", "signature"]
for field in required_fields:
if field not in message:
return False, f"Missing required field: {field}"
# Извлекаем подпись и создаем сообщение без подписи для проверки
signature = message.pop("signature")
# Проверяем подпись
is_valid = self.verify_signature(message, signature, message["public_key"])
if not is_valid:
return False, "Invalid signature"
# Проверяем временную метку (не старше 5 минут)
current_time = int(time.time())
if abs(current_time - message["timestamp"]) > 300:
return False, "Message timestamp too old"
return True, None
except Exception as e:
return False, f"Verification error: {str(e)}"
def create_handshake_message(self, target_node_id: str, additional_data: Optional[Dict] = None) -> Dict[str, Any]:
"""
Создать сообщение для handshake с другой нодой
Args:
target_node_id: ID целевой ноды
additional_data: Дополнительные данные
Returns:
Подписанное handshake сообщение
"""
handshake_data = {
"target_node_id": target_node_id,
"protocol_version": "3.0",
"node_type": os.getenv("NODE_TYPE", "uploader"),
"capabilities": ["upload", "content_streaming", "conversion", "storage"]
}
if additional_data:
handshake_data.update(additional_data)
return self.create_signed_message("handshake", handshake_data)
def create_upload_message(self, content_hash: str, metadata: Dict[str, Any]) -> Dict[str, Any]:
"""
Создать подписанное сообщение для загрузки контента
Args:
content_hash: Хеш контента
metadata: Метаданные файла
Returns:
Подписанное upload сообщение
"""
upload_data = {
"content_hash": content_hash,
"metadata": metadata,
"uploader_node": self.node_id,
"upload_timestamp": int(time.time())
}
return self.create_signed_message("content_upload", upload_data)
def create_sync_message(self, content_list: list, operation: str = "announce") -> Dict[str, Any]:
"""
Создать сообщение для синхронизации контента
Args:
content_list: Список контента для синхронизации
operation: Тип операции (announce, request, response)
Returns:
Подписанное sync сообщение
"""
sync_data = {
"operation": operation,
"content_list": content_list,
"sync_id": hashlib.sha256(
(self.node_id + str(int(time.time()))).encode()
).hexdigest()[:16]
}
return self.create_signed_message("content_sync", sync_data)
# Глобальный экземпляр менеджера
_ed25519_manager = None
def get_ed25519_manager() -> Ed25519Manager:
"""Получить глобальный экземпляр Ed25519Manager"""
global _ed25519_manager
if _ed25519_manager is None:
_ed25519_manager = Ed25519Manager()
return _ed25519_manager
def init_ed25519_manager(private_key_path: str, public_key_path: str) -> Ed25519Manager:
"""Инициализировать Ed25519Manager с путями к ключам"""
global _ed25519_manager
_ed25519_manager = Ed25519Manager(private_key_path, public_key_path)
return _ed25519_manager