# MY Network Node Specification ## Обзор MY Network Node MY Network Node представляет собой расширение существующего my-uploader-bot, добавляющее возможности участия в децентрализованной сети без нарушения текущей функциональности. ## Процесс Запуска и Инициализации Ноды ### 1. Startup Sequence ```python class MYNetworkNode: async def initialize(self): """Полная инициализация MY Network ноды""" # Phase 1: Core System Check await self.verify_core_system() # Phase 2: Load Configuration await self.load_configuration() # Phase 3: Initialize Cryptography await self.initialize_crypto_system() # Phase 4: Start Network Services await self.start_network_services() # Phase 5: Connect to Bootstrap Nodes await self.connect_to_bootstrap_nodes() # Phase 6: Discover Peers await self.start_peer_discovery() # Phase 7: Sync Content await self.start_initial_sync() # Phase 8: Start Background Services await self.start_background_services() logger.info("MY Network Node successfully initialized") ``` ### 2. Configuration Loading ```python async def load_configuration(self): """Загрузка конфигурации ноды""" # Загрузка bootstrap.json self.bootstrap_config = await self.load_bootstrap_config() # Загрузка локальной конфигурации self.node_config = await self.load_node_config() # Объединение конфигураций self.config = { **self.bootstrap_config, **self.node_config, "node_id": self.generate_or_load_node_id(), "startup_time": datetime.utcnow().isoformat() } ``` ### 3. Crypto System Initialization ```python async def initialize_crypto_system(self): """Инициализация криптографической системы""" # Загрузка или генерация ключевой пары ноды self.node_keypair = await self.load_or_generate_keypair() # Создание подписывающего объекта self.signer = Ed25519Signer(self.node_keypair.private_key) # Инициализация шифрования транспорта self.transport_crypto = TransportCrypto(self.node_keypair) # Проверка криптографической системы await self.verify_crypto_system() ``` ## Подключение к Bootstrap Нодам ### 1. Bootstrap Discovery Process ```python class BootstrapConnector: async def connect_to_bootstrap_nodes(self): """Подключение к bootstrap нодам""" bootstrap_nodes = self.config["bootstrap_nodes"] connected_nodes = [] for node_info in bootstrap_nodes: try: connection = await self.establish_connection(node_info) if connection: connected_nodes.append(connection) logger.info(f"Connected to bootstrap node: {node_info['id']}") except Exception as e: logger.warning(f"Failed to connect to bootstrap node {node_info['id']}: {e}") if len(connected_nodes) == 0: raise BootstrapConnectionError("Failed to connect to any bootstrap nodes") # Сохранение подключений self.bootstrap_connections = connected_nodes # Запрос информации о сети await self.request_network_info() ``` ### 2. Network Information Exchange ```python async def request_network_info(self): """Запрос информации о сети от bootstrap нод""" network_info = {} for connection in self.bootstrap_connections: try: # Запрос списка активных нод peers_response = await connection.request("get_active_peers") network_info["peers"] = peers_response.get("peers", []) # Запрос статистики сети stats_response = await connection.request("get_network_stats") network_info["stats"] = stats_response.get("stats", {}) # Запрос популярного контента content_response = await connection.request("get_popular_content") network_info["popular_content"] = content_response.get("content", []) break # Достаточно получить от одной ноды except Exception as e: logger.warning(f"Failed to get network info from bootstrap: {e}") continue self.network_info = network_info ``` ## Механизм Обнаружения Других Нод ### 1. Peer Discovery Algorithm ```python class PeerDiscovery: def __init__(self, node: MYNetworkNode): self.node = node self.discovered_peers = {} self.discovery_interval = 60 # секунды async def start_discovery(self): """Запуск процесса обнаружения пиров""" # Обнаружение через bootstrap ноды await self.discover_through_bootstrap() # DHT-based discovery await self.discover_through_dht() # Multicast discovery (для локальной сети) await self.discover_through_multicast() # Запуск периодического обнаружения asyncio.create_task(self.periodic_discovery()) async def discover_through_bootstrap(self): """Обнаружение через bootstrap ноды""" for connection in self.node.bootstrap_connections: try: # Запрос списка пиров response = await connection.request("discover_peers", { "requesting_node": self.node.node_id, "max_peers": 50, "exclude_nodes": list(self.discovered_peers.keys()) }) # Обработка найденных пиров for peer_info in response.get("peers", []): await self.process_discovered_peer(peer_info) except Exception as e: logger.error(f"Peer discovery through bootstrap failed: {e}") async def discover_through_dht(self): """Обнаружение через DHT""" # Поиск нод ответственных за наш ключ our_key_hash = self.node.calculate_key_hash(self.node.node_id) responsible_nodes = await self.find_responsible_nodes(our_key_hash) for node_info in responsible_nodes: await self.process_discovered_peer(node_info) async def process_discovered_peer(self, peer_info: dict): """Обработка обнаруженного пира""" peer_id = peer_info["node_id"] # Проверка, что это не мы сами if peer_id == self.node.node_id: return # Проверка, что пир не в черном списке if peer_id in self.node.blacklisted_peers: return # Попытка подключения try: connection = await self.establish_peer_connection(peer_info) if connection: self.discovered_peers[peer_id] = { "info": peer_info, "connection": connection, "discovered_at": datetime.utcnow(), "status": "connected" } logger.info(f"Successfully connected to peer: {peer_id}") except Exception as e: logger.warning(f"Failed to connect to peer {peer_id}: {e}") ``` ### 2. Peer Connection Management ```python class PeerConnectionManager: def __init__(self): self.connections = {} self.connection_limits = { "max_outbound": 50, "max_inbound": 100, "max_per_ip": 3 } async def establish_peer_connection(self, peer_info: dict) -> Optional[PeerConnection]: """Установка соединения с пиром""" peer_id = peer_info["node_id"] peer_address = peer_info["address"] # Проверка лимитов соединений if not await self.check_connection_limits(peer_info): return None try: # Создание TCP соединения connection = await self.create_tcp_connection(peer_address) # MY Network handshake handshake_success = await self.perform_handshake(connection, peer_info) if not handshake_success: await connection.close() return None # Создание MY Network peer connection peer_connection = PeerConnection( peer_id=peer_id, connection=connection, peer_info=peer_info ) # Регистрация соединения self.connections[peer_id] = peer_connection # Запуск обработчика сообщений asyncio.create_task(self.handle_peer_messages(peer_connection)) return peer_connection except Exception as e: logger.error(f"Failed to establish connection to {peer_id}: {e}") return None async def perform_handshake(self, connection: Connection, peer_info: dict) -> bool: """Выполнение MY Network handshake""" # Отправка handshake сообщения handshake_msg = { "type": "handshake", "node_id": self.node.node_id, "public_key": self.node.public_key_hex, "network_version": "1.0.0", "supported_features": ["content_sync", "consensus", "dht"], "timestamp": datetime.utcnow().isoformat() } # Подписание сообщения handshake_msg["signature"] = await self.node.sign_message(handshake_msg) # Отправка handshake await connection.send_message(handshake_msg) # Ожидание ответа response = await connection.receive_message(timeout=10) # Проверка ответа if response.get("type") != "handshake_response": return False # Проверка подписи пира peer_signature = response.get("signature") if not await self.verify_peer_signature(peer_info, response, peer_signature): return False return True ``` ## Протокол Синхронизации Контента ### 1. Content Sync Protocol ```python class ContentSyncProtocol: def __init__(self, node: MYNetworkNode): self.node = node self.sync_queue = asyncio.Queue() self.sync_stats = { "synced_content": 0, "failed_syncs": 0, "bytes_synced": 0 } async def start_initial_sync(self): """Начальная синхронизация контента""" logger.info("Starting initial content synchronization") # Получение списка доступного контента от пиров available_content = await self.discover_available_content() # Определение контента для синхронизации content_to_sync = await self.select_content_for_sync(available_content) # Добавление в очередь синхронизации for content_hash in content_to_sync: await self.sync_queue.put(content_hash) # Запуск worker'ов синхронизации workers = [] for i in range(3): # 3 параллельных worker'а worker = asyncio.create_task(self.sync_worker(f"worker-{i}")) workers.append(worker) # Ожидание завершения начальной синхронизации await self.sync_queue.join() # Остановка worker'ов for worker in workers: worker.cancel() logger.info(f"Initial sync completed. Synced {self.sync_stats['synced_content']} items") async def sync_worker(self, worker_id: str): """Worker для синхронизации контента""" while True: try: # Получение контента для синхронизации content_hash = await self.sync_queue.get() # Синхронизация контента success = await self.sync_single_content(content_hash) if success: self.sync_stats["synced_content"] += 1 else: self.sync_stats["failed_syncs"] += 1 # Отметка о завершении задачи self.sync_queue.task_done() except asyncio.CancelledError: break except Exception as e: logger.error(f"Sync worker {worker_id} error: {e}") self.sync_queue.task_done() async def sync_single_content(self, content_hash: str) -> bool: """Синхронизация одного элемента контента""" try: # Проверка, есть ли уже этот контент локально local_content = await StoredContent.get_by_hash(self.node.db_session, content_hash) if local_content: return True # Уже есть # Поиск пиров с этим контентом peers_with_content = await self.find_peers_with_content(content_hash) if not peers_with_content: logger.warning(f"No peers found with content: {content_hash}") return False # Выбор оптимального пира для загрузки selected_peer = await self.select_optimal_peer(peers_with_content) # Загрузка метаданных контента content_metadata = await self.download_content_metadata(selected_peer, content_hash) if not content_metadata: return False # Загрузка данных контента content_data = await self.download_content_data(selected_peer, content_hash) if not content_data: return False # Проверка целостности if not await self.verify_content_integrity(content_hash, content_data): logger.error(f"Content integrity check failed: {content_hash}") return False # Сохранение контента локально await self.store_content_locally(content_metadata, content_data) # Обновление статистики self.sync_stats["bytes_synced"] += len(content_data) logger.info(f"Successfully synced content: {content_hash}") return True except Exception as e: logger.error(f"Failed to sync content {content_hash}: {e}") return False ``` ### 2. Differential Sync Algorithm ```python class DifferentialSync: async def perform_differential_sync(self, peer_id: str): """Дифференциальная синхронизация с пиром""" # Получение локального state local_state = await self.get_local_content_state() # Запрос state от пира peer_state = await self.request_peer_content_state(peer_id) # Вычисление различий diff = await self.calculate_state_diff(local_state, peer_state) # Синхронизация различий if diff["missing_locally"]: await self.sync_missing_content(peer_id, diff["missing_locally"]) if diff["missing_on_peer"]: await self.send_missing_content(peer_id, diff["missing_on_peer"]) # Разрешение конфликтов if diff["conflicts"]: await self.resolve_content_conflicts(peer_id, diff["conflicts"]) async def calculate_state_diff(self, local_state: dict, peer_state: dict) -> dict: """Вычисление различий между состояниями""" local_hashes = set(local_state.keys()) peer_hashes = set(peer_state.keys()) missing_locally = peer_hashes - local_hashes missing_on_peer = local_hashes - peer_hashes common_content = local_hashes & peer_hashes # Проверка конфликтов в общем контенте conflicts = [] for content_hash in common_content: local_meta = local_state[content_hash] peer_meta = peer_state[content_hash] if local_meta["version"] != peer_meta["version"]: conflicts.append({ "content_hash": content_hash, "local_version": local_meta["version"], "peer_version": peer_meta["version"] }) return { "missing_locally": list(missing_locally), "missing_on_peer": list(missing_on_peer), "conflicts": conflicts } ``` ## Обработка Всех Состояний Контента ### 1. Multi-State Content Handler ```python class MultiStateContentHandler: def __init__(self, node: MYNetworkNode): self.node = node self.state_processors = { "encrypted": EncryptedContentProcessor(), "decrypted": DecryptedContentProcessor(), "preview": PreviewContentProcessor() } async def process_content_group(self, content_group_id: str): """Обработка группы связанных состояний контента""" # Обнаружение всех состояний в группе content_states = await self.discover_content_states(content_group_id) # Обработка каждого состояния for state in content_states: processor = self.state_processors.get(state.type) if processor: await processor.process_state(state) # Проверка целостности связей между состояниями await self.verify_state_relationships(content_states) # Репликация всех состояний await self.replicate_content_group(content_states) async def discover_content_states(self, content_group_id: str) -> List[ContentState]: """Обнаружение всех состояний контента в группе""" # Поиск корневого контента root_content = await StoredContent.get_by_id( self.node.db_session, content_group_id ) if not root_content: return [] states = [ContentState( content=root_content, type=self.determine_content_state_type(root_content) )] # Поиск связанных состояний через relationships relationships = await self.get_content_relationships(content_group_id) for rel in relationships: related_content = await StoredContent.get_by_id( self.node.db_session, rel.child_content_id ) if related_content: states.append(ContentState( content=related_content, type=rel.relationship_type, parent_id=content_group_id )) return states def determine_content_state_type(self, content: StoredContent) -> str: """Определение типа состояния контента""" if content.encrypted: return "encrypted" elif "preview" in content.tags: return "preview" else: return "decrypted" ``` ### 2. State-Specific Processors ```python class EncryptedContentProcessor: async def process_state(self, state: ContentState): """Обработка зашифрованного состояния""" content = state.content # Проверка наличия ключа шифрования if not content.encryption_key_id: logger.warning(f"Encrypted content without key: {content.id}") return # Проверка доступности ключа encryption_key = await EncryptionKey.get_by_id( self.node.db_session, content.encryption_key_id ) if not encryption_key or not encryption_key.is_valid: logger.warning(f"Invalid encryption key for content: {content.id}") return # Обработка зашифрованного контента await self.process_encrypted_content(content, encryption_key) class DecryptedContentProcessor: async def process_state(self, state: ContentState): """Обработка расшифрованного состояния""" content = state.content # Проверка связи с зашифрованной версией if state.parent_id: encrypted_content = await StoredContent.get_by_id( self.node.db_session, state.parent_id ) if encrypted_content and encrypted_content.encrypted: # Проверка соответствия с зашифрованной версией await self.verify_encrypted_decrypted_relationship( encrypted_content, content ) # Обработка расшифрованного контента await self.process_decrypted_content(content) class PreviewContentProcessor: async def process_state(self, state: ContentState): """Обработка preview состояния""" content = state.content # Проверка, что это действительно preview if "preview" not in content.tags: content.add_tag("preview") # Проверка размера (preview должен быть меньше оригинала) if state.parent_id: original_content = await StoredContent.get_by_id( self.node.db_session, state.parent_id ) if original_content and content.file_size >= original_content.file_size: logger.warning(f"Preview size is not smaller than original: {content.id}") # Обработка preview контента await self.process_preview_content(content) ``` ## Node Status Monitoring ### 1. Real-time Status Monitoring ```python class NodeStatusMonitor: def __init__(self, node: MYNetworkNode): self.node = node self.status_data = {} self.monitoring_interval = 30 # секунды async def start_monitoring(self): """Запуск мониторинга статуса ноды""" # Запуск периодического сбора метрик asyncio.create_task(self.collect_metrics_loop()) # Запуск health check'ов asyncio.create_task(self.health_check_loop()) # Запуск отправки статуса в сеть asyncio.create_task(self.broadcast_status_loop()) async def collect_current_status(self) -> dict: """Сбор текущего статуса ноды""" return { "node_id": self.node.node_id, "uptime": self.get_uptime_seconds(), "peer_count": len(self.node.peer_connections), "content_count": await self.get_local_content_count(), "storage_usage": await self.get_storage_usage(), "network_stats": await self.get_network_stats(), "sync_status": await self.get_sync_status(), "resource_usage": await self.get_resource_usage(), "timestamp": datetime.utcnow().isoformat() } async def get_sync_status(self) -> dict: """Получение статуса синхронизации""" return { "is_syncing": self.node.sync_manager.is_syncing, "sync_progress": self.node.sync_manager.get_progress(), "last_sync": self.node.sync_manager.last_sync_time, "pending_sync_items": self.node.sync_manager.get_pending_count(), "sync_errors": self.node.sync_manager.get_error_count() } ``` ### 2. Node Performance Metrics ```python class NodePerformanceMetrics: def __init__(self): self.metrics_history = [] self.max_history_size = 1000 async def collect_performance_metrics(self) -> dict: """Сбор метрик производительности""" metrics = { "cpu_usage": psutil.cpu_percent(), "memory_usage": psutil.virtual_memory().percent, "disk_usage": psutil.disk_usage('/').percent, "network_io": await self.get_network_io_stats(), "content_operations": await self.get_content_operations_stats(), "p2p_performance": await self.get_p2p_performance_stats(), "consensus_performance": await self.get_consensus_performance_stats() } # Сохранение в историю self.metrics_history.append({ "timestamp": datetime.utcnow(), "metrics": metrics }) # Ограничение размера истории if len(self.metrics_history) > self.max_history_size: self.metrics_history.pop(0) return metrics async def get_p2p_performance_stats(self) -> dict: """Статистика производительности P2P""" return { "avg_latency": await self.calculate_avg_peer_latency(), "message_throughput": await self.get_message_throughput(), "connection_success_rate": await self.get_connection_success_rate(), "bandwidth_usage": await self.get_bandwidth_usage() } ``` ## Configuration Management ### 1. Dynamic Configuration ```python class NodeConfigManager: def __init__(self, node: MYNetworkNode): self.node = node self.config_file = "node_config.json" self.config_watchers = [] async def load_configuration(self) -> dict: """Загрузка конфигурации ноды""" # Загрузка базовой конфигурации base_config = await self.load_base_config() # Загрузка bootstrap конфигурации bootstrap_config = await self.load_bootstrap_config() # Загрузка локальных настроек local_config = await self.load_local_config() # Объединение конфигураций merged_config = { **base_config, **bootstrap_config, **local_config } # Валидация конфигурации await self.validate_configuration(merged_config) return merged_config async def update_configuration(self, new_config: dict): """Обновление конфигурации на лету""" # Валидация новой конфигурации await self.validate_configuration(new_config) # Создание backup старой конфигурации old_config = self.node.config.copy() try: # Применение новой конфигурации self.node.config.update(new_config) # Уведомление watchers об изменениях for watcher in self.config_watchers: await watcher.on_config_changed(old_config, new_config) # Сохранение конфигурации await self.save_configuration(new_config) except Exception as e: # Откат к старой конфигурации в случае ошибки self.node.config = old_config raise ConfigurationError(f"Failed to update configuration: {e}") ``` Эта спецификация определяет полный жизненный цикл MY Network ноды от инициализации до мониторинга производительности, обеспечивая надежную работу в децентрализованной сети.