# MY Network v3.0 - Децентрализованная архитектура контента ## 🎯 Обзор MY Network v3.0 представляет собой полностью децентрализованную платформу дистрибьюции контента, где каждая нода принимает независимые решения о принятии и хранении контента. Система исключает централизованное управление и кворумное принятие решений. ## 🏗️ Принципы децентрализации ### 1. Автономность нод - **Независимое принятие решений**: Каждая нода самостоятельно решает принимать контент или нет - **Локальные правила**: Настройки фильтрации и политики хранения на уровне ноды - **Отсутствие консенсуса**: Нет необходимости в голосовании или кворуме ### 2. Доступность без цензуры - **Множественные источники**: Контент может быть доступен на разных нодах - **Отсутствие единой точки отказа**: Недоступность одной ноды не влияет на сеть - **Устойчивость к цензуре**: Контент остается доступным пока есть хотя бы одна нода ### 3. Гибкая топология сети - **Публичные ноды**: С открытыми портами для входящих соединений - **Приватные ноды**: Только исходящие соединения, получение контента - **Bootstrap ноды**: Точки входа в сеть для новых участников ## 🔄 Архитектура синхронизации ### Замена кворумной системы ```python class ContentAcceptanceManager: """Управление принятием контента без консенсуса""" def __init__(self, node_config: dict): self.node_config = node_config self.filters = ContentFilters(node_config) async def should_accept_content(self, content_metadata: dict) -> bool: """ Решение о принятии контента (заглушка для будущего расширения) В текущей версии всегда возвращает True. В будущем здесь будут фильтры по: - Авторам (whitelist/blacklist) - Типам контента - Размерам файлов - Тегам и категориям """ # TODO: Добавить логику фильтрации return True async def process_content_announcement(self, peer_id: str, announcement: dict): """Обработка анонса нового контента от пира""" content_hash = announcement.get("content_hash") # Проверка - нужен ли нам этот контент if await self.should_accept_content(announcement): # Добавляем в очередь синхронизации await self.sync_manager.queue_content_sync(peer_id, content_hash) else: logger.info(f"Content {content_hash} rejected by local filters") ``` ### Протокол синхронизации P2P ```python class P2PSyncProtocol: """Децентрализованный протокол синхронизации""" async def announce_new_content(self, content: StoredContent): """Анонс нового контента всем подключенным пирам""" announcement = { "type": "content_announcement", "content_hash": content.hash, "file_size": content.file_size, "content_type": content.content_type, "encrypted": content.encrypted, "timestamp": datetime.utcnow().isoformat(), "author_id": content.author_id, "tags": content.tags } # Рассылаем анонс всем активным пирам for peer_id in self.active_peers: try: await self.send_to_peer(peer_id, announcement) except Exception as e: logger.warning(f"Failed to announce to peer {peer_id}: {e}") async def request_content_sync(self, peer_id: str, content_hash: str): """Запрос синхронизации конкретного контента""" request = { "type": "sync_request", "content_hash": content_hash, "requesting_node": self.node_id, "timestamp": datetime.utcnow().isoformat() } response = await self.send_request_to_peer(peer_id, request) if response.get("status") == "approved": await self._sync_content_from_peer(peer_id, content_hash) else: logger.info(f"Sync request for {content_hash} denied by {peer_id}") ``` ## 🔐 Система шифрования и хэширования ### Единые хэши encrypted_content ```python class ContentHashManager: """Управление хэшированием контента для децентрализованной сети""" @staticmethod def calculate_content_hash(content_data: bytes) -> str: """ Вычисление единого хэша для encrypted_content Использует SHA-256 для обеспечения одинакового хэша на всех нодах для одного и того же контента """ return hashlib.sha256(content_data).hexdigest() @staticmethod def encrypt_content_for_storage(content_data: bytes) -> tuple[bytes, str]: """ Шифрование контента для хранения Returns: tuple: (encrypted_data, encryption_key) """ # Генерируем уникальный ключ для этого контента encryption_key = secrets.token_hex(32) # 256-bit key # Шифруем контент симметричным алгоритмом cipher = AES.new( key=bytes.fromhex(encryption_key), mode=AES.MODE_GCM ) encrypted_data, auth_tag = cipher.encrypt_and_digest(content_data) # Добавляем nonce и auth_tag к зашифрованным данным final_encrypted = cipher.nonce + auth_tag + encrypted_data return final_encrypted, encryption_key @staticmethod def decrypt_content(encrypted_data: bytes, encryption_key: str) -> bytes: """Расшифровка контента""" # Извлекаем nonce, auth_tag и данные nonce = encrypted_data[:16] auth_tag = encrypted_data[16:32] ciphertext = encrypted_data[32:] # Расшифровываем cipher = AES.new( key=bytes.fromhex(encryption_key), mode=AES.MODE_GCM, nonce=nonce ) return cipher.decrypt_and_verify(ciphertext, auth_tag) ``` ### Контроль доступа к контенту ```python class ContentAccessControl: """Система контроля доступа к контенту""" async def can_provide_content(self, requester_node: str, content_hash: str) -> bool: """ Решение о предоставлении контента запрашивающей ноде В текущей версии разрешает доступ всем. В будущем здесь будут проверки: - Доверенность ноды - Коммерческая лицензия - Региональные ограничения """ # TODO: Добавить логику контроля доступа return True async def generate_preview_id(self, content_hash: str) -> str: """ Генерация ID для preview, не связанного с основным хэшем Preview ID намеренно не связан с encrypted_content хэшем для обеспечения безопасности """ # Используем случайный UUID для preview preview_id = str(uuid4()) # Сохраняем связь в локальной базе await self._store_preview_mapping(content_hash, preview_id) return preview_id async def get_content_by_preview_id(self, preview_id: str) -> Optional[bytes]: """Получение preview контента по preview_id""" content_hash = await self._get_content_hash_by_preview_id(preview_id) if not content_hash: return None # Загружаем preview версию (не основной контент!) preview_content = await StoredContent.get_preview_by_hash(content_hash) return preview_content.data if preview_content else None ``` ## 🌐 Топология сети ### Типы нод ```python class NodeTypes: """Типы нод в MY Network""" BOOTSTRAP = "bootstrap" # Точки входа в сеть PUBLIC = "public" # Открытые ноды с входящими соединениями PRIVATE = "private" # Закрытые ноды, только исходящие соединения SEED = "seed" # Ноды с большим объемом контента для новых участников class NodeConfiguration: """Конфигурация ноды""" def __init__(self, config: dict): self.node_type = config.get("node_type", NodeTypes.PUBLIC) self.allow_incoming = config.get("allow_incoming", True) self.max_connections = config.get("max_connections", 50) self.bootstrap_nodes = config.get("bootstrap_nodes", []) def is_private_node(self) -> bool: """Проверка является ли нода приватной""" return self.node_type == NodeTypes.PRIVATE or not self.allow_incoming def can_accept_incoming_connections(self) -> bool: """Может ли нода принимать входящие соединения""" return self.allow_incoming and self.node_type != NodeTypes.PRIVATE ``` ### Обнаружение и подключение к нодам ```python class NetworkDiscovery: """Обнаружение нод в децентрализованной сети""" async def discover_network_nodes(self) -> List[NodeInfo]: """Обнаружение доступных нод в сети""" discovered_nodes = [] # Получаем список нод от bootstrap серверов for bootstrap_node in self.config.bootstrap_nodes: try: nodes = await self._get_nodes_from_bootstrap(bootstrap_node) discovered_nodes.extend(nodes) except Exception as e: logger.warning(f"Failed to get nodes from bootstrap {bootstrap_node}: {e}") # Получаем список нод от подключенных пиров for peer_id in self.connected_peers: try: nodes = await self._get_nodes_from_peer(peer_id) discovered_nodes.extend(nodes) except Exception as e: logger.warning(f"Failed to get nodes from peer {peer_id}: {e}") # Убираем дубликаты и себя unique_nodes = self._deduplicate_nodes(discovered_nodes) return [node for node in unique_nodes if node.node_id != self.node_id] async def connect_to_network(self): """Подключение к сети MY Network""" if not self.config.bootstrap_nodes: logger.error("No bootstrap nodes configured") return connected_count = 0 target_connections = min(10, len(self.config.bootstrap_nodes) * 2) # Подключаемся к bootstrap нодам for bootstrap_node in self.config.bootstrap_nodes: try: if await self._connect_to_node(bootstrap_node): connected_count += 1 logger.info(f"Connected to bootstrap node: {bootstrap_node}") except Exception as e: logger.warning(f"Failed to connect to bootstrap {bootstrap_node}: {e}") # Если недостаточно соединений, ищем дополнительные ноды if connected_count < target_connections: additional_nodes = await self.discover_network_nodes() for node_info in additional_nodes: if connected_count >= target_connections: break try: if await self._connect_to_node(node_info): connected_count += 1 logger.info(f"Connected to discovered node: {node_info.node_id}") except Exception as e: logger.warning(f"Failed to connect to node {node_info.node_id}: {e}") logger.info(f"Network connection complete. Connected to {connected_count} nodes") ``` ## 📊 Мониторинг децентрализованной сети ### Статистика сети ```python class NetworkStatistics: """Сбор статистики децентрализованной сети""" async def collect_network_stats(self) -> dict: """Сбор статистики о состоянии сети""" return { "node_info": { "node_id": self.node_id, "node_type": self.config.node_type, "version": self.version, "uptime": self.get_uptime_seconds() }, "connections": { "total_peers": len(self.connected_peers), "bootstrap_peers": len(self.bootstrap_connections), "incoming_connections": self.get_incoming_connection_count(), "outgoing_connections": self.get_outgoing_connection_count() }, "content": { "total_items": await self.get_local_content_count(), "encrypted_items": await self.get_encrypted_content_count(), "public_items": await self.get_public_content_count(), "storage_used": await self.get_storage_usage_bytes() }, "sync_activity": { "content_synced_24h": await self.get_sync_count_24h(), "pending_sync_items": self.sync_manager.get_pending_count(), "failed_sync_attempts": self.sync_manager.get_failed_count(), "last_sync_time": self.sync_manager.last_sync_time }, "network_health": { "reachable_nodes": await self.count_reachable_nodes(), "network_latency_avg": await self.calculate_avg_network_latency(), "content_availability": await self.calculate_content_availability() } } ``` ## 🔧 API для децентрализованной сети ### Новые эндпоинты ```python # Управление нодой GET /api/v1/node/status # Статус ноды и подключений GET /api/v1/node/peers # Список подключенных пиров POST /api/v1/node/connect # Подключение к новому пиру POST /api/v1/node/disconnect # Отключение от пира # Обнаружение сети GET /api/v1/network/discover # Обнаружение доступных нод GET /api/v1/network/stats # Статистика сети GET /api/v1/network/topology # Топология подключений # Синхронизация контента GET /api/v1/sync/status # Статус синхронизации POST /api/v1/sync/request # Запрос синхронизации контента GET /api/v1/sync/pending # Список ожидающей синхронизации POST /api/v1/sync/cancel # Отмена синхронизации # Поиск контента в сети GET /api/v1/content/search # Поиск контента по хэшу в сети GET /api/v1/content/{hash}/nodes # Список нод с конкретным контентом POST /api/v1/content/announce # Анонс нового контента в сеть # Управление доступом POST /api/v1/access/request # Запрос доступа к контенту GET /api/v1/access/policies # Политики доступа к контенту PUT /api/v1/access/policies # Обновление политик доступа ``` Эта архитектура обеспечивает полную децентрализацию при сохранении гибкости и безопасности системы.