uploader-bot/docs/MY_NODE_SPEC.md

30 KiB
Raw Permalink Blame History

MY Network Node Specification

Обзор MY Network Node

MY Network Node представляет собой расширение существующего my-uploader-bot, добавляющее возможности участия в децентрализованной сети без нарушения текущей функциональности.

Процесс Запуска и Инициализации Ноды

1. Startup Sequence

class MYNetworkNode:
    async def initialize(self):
        """Полная инициализация MY Network ноды"""
        
        # Phase 1: Core System Check
        await self.verify_core_system()
        
        # Phase 2: Load Configuration
        await self.load_configuration()
        
        # Phase 3: Initialize Cryptography
        await self.initialize_crypto_system()
        
        # Phase 4: Start Network Services
        await self.start_network_services()
        
        # Phase 5: Connect to Bootstrap Nodes
        await self.connect_to_bootstrap_nodes()
        
        # Phase 6: Discover Peers
        await self.start_peer_discovery()
        
        # Phase 7: Sync Content
        await self.start_initial_sync()
        
        # Phase 8: Start Background Services
        await self.start_background_services()
        
        logger.info("MY Network Node successfully initialized")

2. Configuration Loading

async def load_configuration(self):
    """Загрузка конфигурации ноды"""
    
    # Загрузка bootstrap.json
    self.bootstrap_config = await self.load_bootstrap_config()
    
    # Загрузка локальной конфигурации
    self.node_config = await self.load_node_config()
    
    # Объединение конфигураций
    self.config = {
        **self.bootstrap_config,
        **self.node_config,
        "node_id": self.generate_or_load_node_id(),
        "startup_time": datetime.utcnow().isoformat()
    }

3. Crypto System Initialization

async def initialize_crypto_system(self):
    """Инициализация криптографической системы"""
    
    # Загрузка или генерация ключевой пары ноды
    self.node_keypair = await self.load_or_generate_keypair()
    
    # Создание подписывающего объекта
    self.signer = Ed25519Signer(self.node_keypair.private_key)
    
    # Инициализация шифрования транспорта
    self.transport_crypto = TransportCrypto(self.node_keypair)
    
    # Проверка криптографической системы
    await self.verify_crypto_system()

Подключение к Bootstrap Нодам

1. Bootstrap Discovery Process

class BootstrapConnector:
    async def connect_to_bootstrap_nodes(self):
        """Подключение к bootstrap нодам"""
        
        bootstrap_nodes = self.config["bootstrap_nodes"]
        connected_nodes = []
        
        for node_info in bootstrap_nodes:
            try:
                connection = await self.establish_connection(node_info)
                if connection:
                    connected_nodes.append(connection)
                    logger.info(f"Connected to bootstrap node: {node_info['id']}")
                    
            except Exception as e:
                logger.warning(f"Failed to connect to bootstrap node {node_info['id']}: {e}")
        
        if len(connected_nodes) == 0:
            raise BootstrapConnectionError("Failed to connect to any bootstrap nodes")
        
        # Сохранение подключений
        self.bootstrap_connections = connected_nodes
        
        # Запрос информации о сети
        await self.request_network_info()

2. Network Information Exchange

async def request_network_info(self):
    """Запрос информации о сети от bootstrap нод"""
    
    network_info = {}
    
    for connection in self.bootstrap_connections:
        try:
            # Запрос списка активных нод
            peers_response = await connection.request("get_active_peers")
            network_info["peers"] = peers_response.get("peers", [])
            
            # Запрос статистики сети
            stats_response = await connection.request("get_network_stats")
            network_info["stats"] = stats_response.get("stats", {})
            
            # Запрос популярного контента
            content_response = await connection.request("get_popular_content")
            network_info["popular_content"] = content_response.get("content", [])
            
            break  # Достаточно получить от одной ноды
            
        except Exception as e:
            logger.warning(f"Failed to get network info from bootstrap: {e}")
            continue
    
    self.network_info = network_info

Механизм Обнаружения Других Нод

1. Peer Discovery Algorithm

class PeerDiscovery:
    def __init__(self, node: MYNetworkNode):
        self.node = node
        self.discovered_peers = {}
        self.discovery_interval = 60  # секунды
        
    async def start_discovery(self):
        """Запуск процесса обнаружения пиров"""
        
        # Обнаружение через bootstrap ноды
        await self.discover_through_bootstrap()
        
        # DHT-based discovery
        await self.discover_through_dht()
        
        # Multicast discovery (для локальной сети)
        await self.discover_through_multicast()
        
        # Запуск периодического обнаружения
        asyncio.create_task(self.periodic_discovery())
    
    async def discover_through_bootstrap(self):
        """Обнаружение через bootstrap ноды"""
        
        for connection in self.node.bootstrap_connections:
            try:
                # Запрос списка пиров
                response = await connection.request("discover_peers", {
                    "requesting_node": self.node.node_id,
                    "max_peers": 50,
                    "exclude_nodes": list(self.discovered_peers.keys())
                })
                
                # Обработка найденных пиров
                for peer_info in response.get("peers", []):
                    await self.process_discovered_peer(peer_info)
                    
            except Exception as e:
                logger.error(f"Peer discovery through bootstrap failed: {e}")
    
    async def discover_through_dht(self):
        """Обнаружение через DHT"""
        
        # Поиск нод ответственных за наш ключ
        our_key_hash = self.node.calculate_key_hash(self.node.node_id)
        responsible_nodes = await self.find_responsible_nodes(our_key_hash)
        
        for node_info in responsible_nodes:
            await self.process_discovered_peer(node_info)
    
    async def process_discovered_peer(self, peer_info: dict):
        """Обработка обнаруженного пира"""
        
        peer_id = peer_info["node_id"]
        
        # Проверка, что это не мы сами
        if peer_id == self.node.node_id:
            return
        
        # Проверка, что пир не в черном списке
        if peer_id in self.node.blacklisted_peers:
            return
        
        # Попытка подключения
        try:
            connection = await self.establish_peer_connection(peer_info)
            if connection:
                self.discovered_peers[peer_id] = {
                    "info": peer_info,
                    "connection": connection,
                    "discovered_at": datetime.utcnow(),
                    "status": "connected"
                }
                
                logger.info(f"Successfully connected to peer: {peer_id}")
                
        except Exception as e:
            logger.warning(f"Failed to connect to peer {peer_id}: {e}")

2. Peer Connection Management

class PeerConnectionManager:
    def __init__(self):
        self.connections = {}
        self.connection_limits = {
            "max_outbound": 50,
            "max_inbound": 100,
            "max_per_ip": 3
        }
    
    async def establish_peer_connection(self, peer_info: dict) -> Optional[PeerConnection]:
        """Установка соединения с пиром"""
        
        peer_id = peer_info["node_id"]
        peer_address = peer_info["address"]
        
        # Проверка лимитов соединений
        if not await self.check_connection_limits(peer_info):
            return None
        
        try:
            # Создание TCP соединения
            connection = await self.create_tcp_connection(peer_address)
            
            # MY Network handshake
            handshake_success = await self.perform_handshake(connection, peer_info)
            if not handshake_success:
                await connection.close()
                return None
            
            # Создание MY Network peer connection
            peer_connection = PeerConnection(
                peer_id=peer_id,
                connection=connection,
                peer_info=peer_info
            )
            
            # Регистрация соединения
            self.connections[peer_id] = peer_connection
            
            # Запуск обработчика сообщений
            asyncio.create_task(self.handle_peer_messages(peer_connection))
            
            return peer_connection
            
        except Exception as e:
            logger.error(f"Failed to establish connection to {peer_id}: {e}")
            return None
    
    async def perform_handshake(self, connection: Connection, peer_info: dict) -> bool:
        """Выполнение MY Network handshake"""
        
        # Отправка handshake сообщения
        handshake_msg = {
            "type": "handshake",
            "node_id": self.node.node_id,
            "public_key": self.node.public_key_hex,
            "network_version": "1.0.0",
            "supported_features": ["content_sync", "consensus", "dht"],
            "timestamp": datetime.utcnow().isoformat()
        }
        
        # Подписание сообщения
        handshake_msg["signature"] = await self.node.sign_message(handshake_msg)
        
        # Отправка handshake
        await connection.send_message(handshake_msg)
        
        # Ожидание ответа
        response = await connection.receive_message(timeout=10)
        
        # Проверка ответа
        if response.get("type") != "handshake_response":
            return False
        
        # Проверка подписи пира
        peer_signature = response.get("signature")
        if not await self.verify_peer_signature(peer_info, response, peer_signature):
            return False
        
        return True

Протокол Синхронизации Контента

1. Content Sync Protocol

class ContentSyncProtocol:
    def __init__(self, node: MYNetworkNode):
        self.node = node
        self.sync_queue = asyncio.Queue()
        self.sync_stats = {
            "synced_content": 0,
            "failed_syncs": 0,
            "bytes_synced": 0
        }
    
    async def start_initial_sync(self):
        """Начальная синхронизация контента"""
        
        logger.info("Starting initial content synchronization")
        
        # Получение списка доступного контента от пиров
        available_content = await self.discover_available_content()
        
        # Определение контента для синхронизации
        content_to_sync = await self.select_content_for_sync(available_content)
        
        # Добавление в очередь синхронизации
        for content_hash in content_to_sync:
            await self.sync_queue.put(content_hash)
        
        # Запуск worker'ов синхронизации
        workers = []
        for i in range(3):  # 3 параллельных worker'а
            worker = asyncio.create_task(self.sync_worker(f"worker-{i}"))
            workers.append(worker)
        
        # Ожидание завершения начальной синхронизации
        await self.sync_queue.join()
        
        # Остановка worker'ов
        for worker in workers:
            worker.cancel()
        
        logger.info(f"Initial sync completed. Synced {self.sync_stats['synced_content']} items")
    
    async def sync_worker(self, worker_id: str):
        """Worker для синхронизации контента"""
        
        while True:
            try:
                # Получение контента для синхронизации
                content_hash = await self.sync_queue.get()
                
                # Синхронизация контента
                success = await self.sync_single_content(content_hash)
                
                if success:
                    self.sync_stats["synced_content"] += 1
                else:
                    self.sync_stats["failed_syncs"] += 1
                
                # Отметка о завершении задачи
                self.sync_queue.task_done()
                
            except asyncio.CancelledError:
                break
            except Exception as e:
                logger.error(f"Sync worker {worker_id} error: {e}")
                self.sync_queue.task_done()
    
    async def sync_single_content(self, content_hash: str) -> bool:
        """Синхронизация одного элемента контента"""
        
        try:
            # Проверка, есть ли уже этот контент локально
            local_content = await StoredContent.get_by_hash(self.node.db_session, content_hash)
            if local_content:
                return True  # Уже есть
            
            # Поиск пиров с этим контентом
            peers_with_content = await self.find_peers_with_content(content_hash)
            if not peers_with_content:
                logger.warning(f"No peers found with content: {content_hash}")
                return False
            
            # Выбор оптимального пира для загрузки
            selected_peer = await self.select_optimal_peer(peers_with_content)
            
            # Загрузка метаданных контента
            content_metadata = await self.download_content_metadata(selected_peer, content_hash)
            if not content_metadata:
                return False
            
            # Загрузка данных контента
            content_data = await self.download_content_data(selected_peer, content_hash)
            if not content_data:
                return False
            
            # Проверка целостности
            if not await self.verify_content_integrity(content_hash, content_data):
                logger.error(f"Content integrity check failed: {content_hash}")
                return False
            
            # Сохранение контента локально
            await self.store_content_locally(content_metadata, content_data)
            
            # Обновление статистики
            self.sync_stats["bytes_synced"] += len(content_data)
            
            logger.info(f"Successfully synced content: {content_hash}")
            return True
            
        except Exception as e:
            logger.error(f"Failed to sync content {content_hash}: {e}")
            return False

2. Differential Sync Algorithm

class DifferentialSync:
    async def perform_differential_sync(self, peer_id: str):
        """Дифференциальная синхронизация с пиром"""
        
        # Получение локального state
        local_state = await self.get_local_content_state()
        
        # Запрос state от пира
        peer_state = await self.request_peer_content_state(peer_id)
        
        # Вычисление различий
        diff = await self.calculate_state_diff(local_state, peer_state)
        
        # Синхронизация различий
        if diff["missing_locally"]:
            await self.sync_missing_content(peer_id, diff["missing_locally"])
        
        if diff["missing_on_peer"]:
            await self.send_missing_content(peer_id, diff["missing_on_peer"])
        
        # Разрешение конфликтов
        if diff["conflicts"]:
            await self.resolve_content_conflicts(peer_id, diff["conflicts"])
    
    async def calculate_state_diff(self, local_state: dict, peer_state: dict) -> dict:
        """Вычисление различий между состояниями"""
        
        local_hashes = set(local_state.keys())
        peer_hashes = set(peer_state.keys())
        
        missing_locally = peer_hashes - local_hashes
        missing_on_peer = local_hashes - peer_hashes
        common_content = local_hashes & peer_hashes
        
        # Проверка конфликтов в общем контенте
        conflicts = []
        for content_hash in common_content:
            local_meta = local_state[content_hash]
            peer_meta = peer_state[content_hash]
            
            if local_meta["version"] != peer_meta["version"]:
                conflicts.append({
                    "content_hash": content_hash,
                    "local_version": local_meta["version"],
                    "peer_version": peer_meta["version"]
                })
        
        return {
            "missing_locally": list(missing_locally),
            "missing_on_peer": list(missing_on_peer),
            "conflicts": conflicts
        }

Обработка Всех Состояний Контента

1. Multi-State Content Handler

class MultiStateContentHandler:
    def __init__(self, node: MYNetworkNode):
        self.node = node
        self.state_processors = {
            "encrypted": EncryptedContentProcessor(),
            "decrypted": DecryptedContentProcessor(),  
            "preview": PreviewContentProcessor()
        }
    
    async def process_content_group(self, content_group_id: str):
        """Обработка группы связанных состояний контента"""
        
        # Обнаружение всех состояний в группе
        content_states = await self.discover_content_states(content_group_id)
        
        # Обработка каждого состояния
        for state in content_states:
            processor = self.state_processors.get(state.type)
            if processor:
                await processor.process_state(state)
        
        # Проверка целостности связей между состояниями
        await self.verify_state_relationships(content_states)
        
        # Репликация всех состояний
        await self.replicate_content_group(content_states)
    
    async def discover_content_states(self, content_group_id: str) -> List[ContentState]:
        """Обнаружение всех состояний контента в группе"""
        
        # Поиск корневого контента
        root_content = await StoredContent.get_by_id(
            self.node.db_session, content_group_id
        )
        
        if not root_content:
            return []
        
        states = [ContentState(
            content=root_content,
            type=self.determine_content_state_type(root_content)
        )]
        
        # Поиск связанных состояний через relationships
        relationships = await self.get_content_relationships(content_group_id)
        
        for rel in relationships:
            related_content = await StoredContent.get_by_id(
                self.node.db_session, rel.child_content_id
            )
            
            if related_content:
                states.append(ContentState(
                    content=related_content,
                    type=rel.relationship_type,
                    parent_id=content_group_id
                ))
        
        return states
    
    def determine_content_state_type(self, content: StoredContent) -> str:
        """Определение типа состояния контента"""
        
        if content.encrypted:
            return "encrypted"
        elif "preview" in content.tags:
            return "preview"  
        else:
            return "decrypted"

2. State-Specific Processors

class EncryptedContentProcessor:
    async def process_state(self, state: ContentState):
        """Обработка зашифрованного состояния"""
        
        content = state.content
        
        # Проверка наличия ключа шифрования
        if not content.encryption_key_id:
            logger.warning(f"Encrypted content without key: {content.id}")
            return
        
        # Проверка доступности ключа
        encryption_key = await EncryptionKey.get_by_id(
            self.node.db_session, content.encryption_key_id
        )
        
        if not encryption_key or not encryption_key.is_valid:
            logger.warning(f"Invalid encryption key for content: {content.id}")
            return
        
        # Обработка зашифрованного контента
        await self.process_encrypted_content(content, encryption_key)

class DecryptedContentProcessor:
    async def process_state(self, state: ContentState):
        """Обработка расшифрованного состояния"""
        
        content = state.content
        
        # Проверка связи с зашифрованной версией
        if state.parent_id:
            encrypted_content = await StoredContent.get_by_id(
                self.node.db_session, state.parent_id
            )
            
            if encrypted_content and encrypted_content.encrypted:
                # Проверка соответствия с зашифрованной версией
                await self.verify_encrypted_decrypted_relationship(
                    encrypted_content, content
                )
        
        # Обработка расшифрованного контента
        await self.process_decrypted_content(content)

class PreviewContentProcessor:
    async def process_state(self, state: ContentState):
        """Обработка preview состояния"""
        
        content = state.content
        
        # Проверка, что это действительно preview
        if "preview" not in content.tags:
            content.add_tag("preview")
        
        # Проверка размера (preview должен быть меньше оригинала)
        if state.parent_id:
            original_content = await StoredContent.get_by_id(
                self.node.db_session, state.parent_id
            )
            
            if original_content and content.file_size >= original_content.file_size:
                logger.warning(f"Preview size is not smaller than original: {content.id}")
        
        # Обработка preview контента
        await self.process_preview_content(content)

Node Status Monitoring

1. Real-time Status Monitoring

class NodeStatusMonitor:
    def __init__(self, node: MYNetworkNode):
        self.node = node
        self.status_data = {}
        self.monitoring_interval = 30  # секунды
        
    async def start_monitoring(self):
        """Запуск мониторинга статуса ноды"""
        
        # Запуск периодического сбора метрик
        asyncio.create_task(self.collect_metrics_loop())
        
        # Запуск health check'ов
        asyncio.create_task(self.health_check_loop())
        
        # Запуск отправки статуса в сеть
        asyncio.create_task(self.broadcast_status_loop())
    
    async def collect_current_status(self) -> dict:
        """Сбор текущего статуса ноды"""
        
        return {
            "node_id": self.node.node_id,
            "uptime": self.get_uptime_seconds(),
            "peer_count": len(self.node.peer_connections),
            "content_count": await self.get_local_content_count(),
            "storage_usage": await self.get_storage_usage(),
            "network_stats": await self.get_network_stats(),
            "sync_status": await self.get_sync_status(),
            "resource_usage": await self.get_resource_usage(),
            "timestamp": datetime.utcnow().isoformat()
        }
    
    async def get_sync_status(self) -> dict:
        """Получение статуса синхронизации"""
        
        return {
            "is_syncing": self.node.sync_manager.is_syncing,
            "sync_progress": self.node.sync_manager.get_progress(),
            "last_sync": self.node.sync_manager.last_sync_time,
            "pending_sync_items": self.node.sync_manager.get_pending_count(),
            "sync_errors": self.node.sync_manager.get_error_count()
        }

2. Node Performance Metrics

class NodePerformanceMetrics:
    def __init__(self):
        self.metrics_history = []
        self.max_history_size = 1000
    
    async def collect_performance_metrics(self) -> dict:
        """Сбор метрик производительности"""
        
        metrics = {
            "cpu_usage": psutil.cpu_percent(),
            "memory_usage": psutil.virtual_memory().percent,
            "disk_usage": psutil.disk_usage('/').percent,
            "network_io": await self.get_network_io_stats(),
            "content_operations": await self.get_content_operations_stats(),
            "p2p_performance": await self.get_p2p_performance_stats(),
            "consensus_performance": await self.get_consensus_performance_stats()
        }
        
        # Сохранение в историю
        self.metrics_history.append({
            "timestamp": datetime.utcnow(),
            "metrics": metrics
        })
        
        # Ограничение размера истории
        if len(self.metrics_history) > self.max_history_size:
            self.metrics_history.pop(0)
        
        return metrics
    
    async def get_p2p_performance_stats(self) -> dict:
        """Статистика производительности P2P"""
        
        return {
            "avg_latency": await self.calculate_avg_peer_latency(),
            "message_throughput": await self.get_message_throughput(),
            "connection_success_rate": await self.get_connection_success_rate(),
            "bandwidth_usage": await self.get_bandwidth_usage()
        }

Configuration Management

1. Dynamic Configuration

class NodeConfigManager:
    def __init__(self, node: MYNetworkNode):
        self.node = node
        self.config_file = "node_config.json"
        self.config_watchers = []
        
    async def load_configuration(self) -> dict:
        """Загрузка конфигурации ноды"""
        
        # Загрузка базовой конфигурации
        base_config = await self.load_base_config()
        
        # Загрузка bootstrap конфигурации
        bootstrap_config = await self.load_bootstrap_config()
        
        # Загрузка локальных настроек
        local_config = await self.load_local_config()
        
        # Объединение конфигураций
        merged_config = {
            **base_config,
            **bootstrap_config,
            **local_config
        }
        
        # Валидация конфигурации
        await self.validate_configuration(merged_config)
        
        return merged_config
    
    async def update_configuration(self, new_config: dict):
        """Обновление конфигурации на лету"""
        
        # Валидация новой конфигурации
        await self.validate_configuration(new_config)
        
        # Создание backup старой конфигурации
        old_config = self.node.config.copy()
        
        try:
            # Применение новой конфигурации
            self.node.config.update(new_config)
            
            # Уведомление watchers об изменениях
            for watcher in self.config_watchers:
                await watcher.on_config_changed(old_config, new_config)
            
            # Сохранение конфигурации
            await self.save_configuration(new_config)
            
        except Exception as e:
            # Откат к старой конфигурации в случае ошибки
            self.node.config = old_config
            raise ConfigurationError(f"Failed to update configuration: {e}")

Эта спецификация определяет полный жизненный цикл MY Network ноды от инициализации до мониторинга производительности, обеспечивая надежную работу в децентрализованной сети.