uploader-bot/docs/NEW_PROTOCOL_V3.md

27 KiB
Raw Permalink Blame History

MY Network v3.0 - Новый протокол децентрализованной синхронизации

🎯 Обзор протокола v3.0

MY Network v3.0 полностью отказывается от кворумной системы в пользу децентрализованной архитектуры, где каждая нода принимает независимые решения о принятии и хранении контента.

🔄 Протокол индивидуальной синхронизации

Принципы новой системы

  1. Автономность решений: Каждая нода самостоятельно решает принимать контент или нет
  2. Отсутствие консенсуса: Нет необходимости в голосовании других нод
  3. Гибкая фильтрация: Настраиваемые правила принятия контента на уровне ноды
  4. Устойчивость к цензуре: Контент доступен пока есть хотя бы одна нода

Новый алгоритм синхронизации

class DecentralizedSyncProtocol:
    """Протокол децентрализованной синхронизации v3.0"""
    
    def __init__(self, node_config: dict):
        self.node_id = node_config["node_id"]
        self.content_filter = IndividualContentFilter(node_config)
        self.sync_manager = SyncManager()
        self.active_peers = {}
        
    async def handle_content_announcement(self, peer_id: str, announcement: dict) -> bool:
        """
        Обработка анонса нового контента от пира
        
        Новая логика v3.0:
        1. Каждая нода принимает решение индивидуально
        2. Нет консенсуса или голосования
        3. Простая заглушка для фильтрации (пока всегда True)
        """
        
        content_hash = announcement.get("content_hash")
        content_metadata = announcement.get("metadata", {})
        
        logger.info(f"Received content announcement from {peer_id}: {content_hash}")
        
        # Проверка - не дубликат ли это
        if await self._content_already_exists(content_hash):
            logger.debug(f"Content {content_hash} already exists locally")
            return False
        
        # Индивидуальное решение о принятии контента
        should_accept = await self.content_filter.should_accept_content(
            content_hash, content_metadata, peer_id
        )
        
        if should_accept:
            logger.info(f"Accepting content {content_hash} from {peer_id}")
            
            # Добавление в очередь синхронизации
            await self.sync_manager.queue_content_sync(
                peer_id=peer_id,
                content_hash=content_hash,
                metadata=content_metadata
            )
            return True
        else:
            logger.info(f"Rejecting content {content_hash} from {peer_id}")
            return False
    
    async def sync_content_from_peer(self, peer_id: str, content_hash: str, 
                                   metadata: dict) -> bool:
        """Синхронизация контента с пира (без консенсуса)"""
        
        try:
            # Запрос метаданных контента
            content_info = await self._request_content_info(peer_id, content_hash)
            if not content_info:
                logger.error(f"Failed to get content info for {content_hash}")
                return False
            
            # Проверка возможности получения контента от пира
            access_granted = await self._request_content_access(peer_id, content_hash)
            if not access_granted:
                logger.warning(f"Access denied for content {content_hash} from {peer_id}")
                return False
            
            # Загрузка контента
            content_data = await self._download_content(peer_id, content_hash)
            if not content_data:
                logger.error(f"Failed to download content {content_hash}")
                return False
            
            # Проверка целостности
            if not await self._verify_content_integrity(content_hash, content_data):
                logger.error(f"Content integrity check failed: {content_hash}")
                return False
            
            # Сохранение контента локально
            stored_content = await self._store_content_locally(
                content_hash, content_data, metadata
            )
            
            if stored_content:
                logger.info(f"Successfully synced content {content_hash} from {peer_id}")
                
                # Анонс нового контента другим пирам (кроме источника)
                await self._announce_content_to_peers(stored_content, exclude_peer=peer_id)
                
                return True
            
        except Exception as e:
            logger.error(f"Failed to sync content {content_hash} from {peer_id}: {e}")
        
        return False

class IndividualContentFilter:
    """Система индивидуальной фильтрации контента"""
    
    def __init__(self, node_config: dict):
        self.node_config = node_config
        self.whitelist_authors = set(node_config.get("whitelist_authors", []))
        self.blacklist_authors = set(node_config.get("blacklist_authors", []))
        self.allowed_content_types = set(node_config.get("allowed_content_types", ["*"]))
        self.max_file_size = node_config.get("max_file_size", 100 * 1024 * 1024)  # 100MB
        
    async def should_accept_content(self, content_hash: str, metadata: dict, 
                                  peer_id: str) -> bool:
        """
        Решение о принятии контента (заглушка для будущего расширения)
        
        В текущей версии v3.0 всегда возвращает True.
        В будущем здесь будут фильтры по:
        - Авторам (whitelist/blacklist)
        - Типам контента
        - Размерам файлов
        - Тегам и категориям
        - Репутации пира
        """
        
        # TODO: Добавить реальную логику фильтрации
        # Пока принимаем весь контент
        return True
        
        # Пример будущей логики фильтрации:
        """
        author_id = metadata.get("author_id")
        content_type = metadata.get("content_type")
        file_size = metadata.get("file_size", 0)
        
        # Проверка черного списка авторов
        if author_id in self.blacklist_authors:
            return False
        
        # Проверка белого списка (если не пустой)
        if self.whitelist_authors and author_id not in self.whitelist_authors:
            return False
        
        # Проверка типа контента
        if "*" not in self.allowed_content_types:
            if content_type not in self.allowed_content_types:
                return False
        
        # Проверка размера файла
        if file_size > self.max_file_size:
            return False
        
        return True
        """

🔐 Новая система шифрования и безопасности

Архитектура безопасности v3.0

class ContentSecurityManager:
    """Управление безопасностью контента в MY Network v3.0"""
    
    def __init__(self):
        self.hash_algorithm = "sha256"
        self.encryption_algorithm = "AES-256-GCM"
        
    async def create_encrypted_content(self, content_data: bytes, 
                                     content_metadata: dict) -> dict:
        """
        Создание зашифрованного контента с единым хэшем
        
        Система v3.0:
        1. Симметричное шифрование с уникальным ключом для каждого контента
        2. Единый хэш encrypted_content на всех нодах
        3. Отдельные preview_id не связанные с основным хэшем
        """
        
        # Генерация уникального ключа шифрования для этого контента
        encryption_key = self._generate_content_key()
        
        # Шифрование контента
        encrypted_data = await self._encrypt_content(content_data, encryption_key)
        
        # Вычисление единого хэша зашифрованного контента
        encrypted_content_hash = self._calculate_deterministic_hash(encrypted_data)
        
        # Создание отдельного preview_id (не связанного с хэшем)
        preview_id = str(uuid4())
        
        return {
            "encrypted_content_hash": encrypted_content_hash,  # Одинаковый на всех нодах
            "encrypted_data": encrypted_data,
            "encryption_key": encryption_key,  # Передается при продаже
            "preview_id": preview_id,  # Для публичного доступа к preview
            "metadata": content_metadata
        }
    
    def _generate_content_key(self) -> str:
        """Генерация уникального ключа для контента"""
        return secrets.token_hex(32)  # 256-bit key
    
    async def _encrypt_content(self, content_data: bytes, encryption_key: str) -> bytes:
        """Симметричное шифрование контента"""
        
        # Преобразование ключа в байты
        key_bytes = bytes.fromhex(encryption_key)
        
        # Создание шифратора AES-256-GCM
        cipher = AES.new(key_bytes, AES.MODE_GCM)
        
        # Шифрование
        ciphertext, auth_tag = cipher.encrypt_and_digest(content_data)
        
        # Объединение nonce, auth_tag и ciphertext для детерминированного хэша
        encrypted_data = cipher.nonce + auth_tag + ciphertext
        
        return encrypted_data
    
    def _calculate_deterministic_hash(self, encrypted_data: bytes) -> str:
        """
        Вычисление детерминированного хэша зашифрованного контента
        
        ВАЖНО: Этот хэш будет одинаковым на всех нодах для одного контента,
        но через него нельзя получить доступ к самому контенту
        """
        return hashlib.sha256(encrypted_data).hexdigest()
    
    async def decrypt_content(self, encrypted_data: bytes, encryption_key: str) -> bytes:
        """Расшифровка контента с помощью ключа"""
        
        # Извлечение компонентов
        nonce = encrypted_data[:16]  # AES-GCM nonce: 16 bytes
        auth_tag = encrypted_data[16:32]  # Auth tag: 16 bytes
        ciphertext = encrypted_data[32:]  # Остальное - зашифрованные данные
        
        # Создание дешифратора
        key_bytes = bytes.fromhex(encryption_key)
        cipher = AES.new(key_bytes, AES.MODE_GCM, nonce=nonce)
        
        # Расшифровка с проверкой подлинности
        decrypted_data = cipher.decrypt_and_verify(ciphertext, auth_tag)
        
        return decrypted_data
    
    async def create_preview_content(self, original_content: bytes, 
                                   content_type: str) -> tuple[bytes, str]:
        """
        Создание preview контента (не зашифрованного)
        
        Preview:
        - Не связан с основным хэшем контента
        - Имеет собственный уникальный ID
        - Доступен публично без ключей
        """
        
        # Создание preview в зависимости от типа контента
        if content_type.startswith("audio/"):
            # Для аудио - первые 30 секунд
            preview_content = await self._create_audio_preview(original_content)
        elif content_type.startswith("video/"):
            # Для видео - первые 30 секунд + watermark
            preview_content = await self._create_video_preview(original_content)
        elif content_type.startswith("image/"):
            # Для изображений - уменьшенная версия с watermark
            preview_content = await self._create_image_preview(original_content)
        else:
            # Для других типов - описание или thumbnail
            preview_content = await self._create_generic_preview(original_content)
        
        # Уникальный ID для preview (НЕ хэш!)
        preview_id = str(uuid4())
        
        return preview_content, preview_id

class ContentAccessControl:
    """Система контроля доступа к контенту"""
    
    async def can_provide_content(self, requesting_node: str, content_hash: str, 
                                access_type: str = "full") -> dict:
        """
        Проверка возможности предоставления контента
        
        В v3.0 пока разрешаем всем (заглушка для будущего расширения)
        """
        
        # TODO: Добавить реальную логику контроля доступа
        # Пока разрешаем всем
        return {
            "access_granted": True,
            "access_type": access_type,
            "reason": "Open access policy"
        }
        
        # Пример будущей логики:
        """
        # Проверка репутации ноды
        node_reputation = await self._get_node_reputation(requesting_node)
        if node_reputation < 0.5:
            return {"access_granted": False, "reason": "Low node reputation"}
        
        # Проверка коммерческих лицензий
        content = await StoredContent.get_by_hash(content_hash)
        if content and content.commercial_license:
            license_valid = await self._check_commercial_license(
                requesting_node, content_hash
            )
            if not license_valid:
                return {"access_granted": False, "reason": "No valid license"}
        
        # Проверка региональных ограничений
        node_region = await self._get_node_region(requesting_node)
        if node_region in content.restricted_regions:
            return {"access_granted": False, "reason": "Regional restrictions"}
        
        return {"access_granted": True, "access_type": access_type}
        """
    
    async def get_content_by_preview_id(self, preview_id: str) -> Optional[bytes]:
        """
        Получение preview контента по preview_id
        
        Preview доступен всем без ограничений
        """
        
        # Поиск preview по ID в базе данных
        preview_content = await PreviewContent.get_by_preview_id(preview_id)
        
        if preview_content:
            return preview_content.data
        
        return None
    
    async def request_content_key(self, content_hash: str, user_license: dict) -> Optional[str]:
        """
        Запрос ключа расшифровки контента
        
        Ключ выдается только при наличии соответствующих прав
        """
        
        # Проверка лицензии пользователя
        if await self._validate_user_license(content_hash, user_license):
            # Получение ключа расшифровки
            content = await StoredContent.get_by_hash(content_hash)
            if content:
                return content.encryption_key
        
        return None

🌐 Обновленный протокол обмена сообщениями

Новые типы сообщений v3.0

class P2PMessageTypes:
    """Типы сообщений протокола MY Network v3.0"""
    
    # Базовые сообщения
    HANDSHAKE = "handshake"
    HANDSHAKE_RESPONSE = "handshake_response"
    
    # Объявления о контенте (без консенсуса)
    CONTENT_ANNOUNCEMENT = "content_announcement"
    CONTENT_REQUEST = "content_request"
    CONTENT_RESPONSE = "content_response"
    
    # Синхронизация
    SYNC_REQUEST = "sync_request"
    SYNC_RESPONSE = "sync_response"
    SYNC_DATA = "sync_data"
    
    # Доступ к контенту
    ACCESS_REQUEST = "access_request"
    ACCESS_RESPONSE = "access_response"
    
    # Версионирование
    VERSION_INFO = "version_info"
    VERSION_WARNING = "version_warning"

class P2PMessageHandler:
    """Обработчик P2P сообщений протокола v3.0"""
    
    async def handle_content_announcement(self, peer_id: str, message: dict):
        """Обработка анонса контента (без консенсуса)"""
        
        content_hash = message.get("content_hash")
        metadata = message.get("metadata", {})
        
        # Индивидуальное решение о принятии
        should_accept = await self.sync_protocol.handle_content_announcement(
            peer_id, message
        )
        
        # Отправка ответа пиру
        response = {
            "type": P2PMessageTypes.CONTENT_RESPONSE,
            "content_hash": content_hash,
            "accepted": should_accept,
            "node_id": self.node_id,
            "timestamp": datetime.utcnow().isoformat()
        }
        
        await self.send_message_to_peer(peer_id, response)
    
    async def handle_sync_request(self, peer_id: str, message: dict):
        """Обработка запроса синхронизации"""
        
        content_hash = message.get("content_hash")
        
        # Проверка доступа к контенту
        access_check = await self.access_control.can_provide_content(
            peer_id, content_hash
        )
        
        if access_check["access_granted"]:
            # Предоставление контента
            content_data = await self._get_content_data(content_hash)
            
            response = {
                "type": P2PMessageTypes.SYNC_RESPONSE,
                "content_hash": content_hash,
                "status": "approved",
                "data_size": len(content_data) if content_data else 0
            }
        else:
            # Отказ в доступе
            response = {
                "type": P2PMessageTypes.SYNC_RESPONSE,
                "content_hash": content_hash,
                "status": "denied",
                "reason": access_check.get("reason", "Access denied")
            }
        
        await self.send_message_to_peer(peer_id, response)
        
        # Если доступ разрешен, отправляем данные
        if access_check["access_granted"] and content_data:
            await self._send_content_data(peer_id, content_hash, content_data)

📋 Новые API эндпоинты v3.0

Обновленные API для децентрализованной архитектуры

# Индивидуальная синхронизация (без консенсуса)
POST /api/v3/sync/announce          # Анонс контента в сеть
GET  /api/v3/sync/pending           # Ожидающие синхронизации
POST /api/v3/sync/accept/{hash}     # Принять конкретный контент
POST /api/v3/sync/reject/{hash}     # Отклонить конкретный контент

# Управление фильтрами контента
GET  /api/v3/filters/content        # Текущие фильтры
PUT  /api/v3/filters/content        # Обновить фильтры
POST /api/v3/filters/test           # Тестирование фильтров

# Безопасность контента
GET  /api/v3/content/{hash}/preview/{preview_id}  # Получение preview
POST /api/v3/content/{hash}/request-key           # Запрос ключа расшифровки
GET  /api/v3/content/{hash}/access-info           # Информация о доступе

# Статистика децентрализованной сети
GET  /api/v3/network/nodes          # Известные ноды в сети
GET  /api/v3/network/content/distribution  # Распределение контента
GET  /api/v3/network/sync/stats     # Статистика синхронизации

🔄 Алгоритм удаления неиспользуемого контента

Система очистки контента

class ContentCleanupManager:
    """Управление очисткой контента через 7 дней"""
    
    def __init__(self):
        self.retention_days = 7
        self.cleanup_interval = 3600  # 1 час
        
    async def start_cleanup_scheduler(self):
        """Запуск планировщика очистки"""
        
        while True:
            try:
                await self.cleanup_expired_content()
                await asyncio.sleep(self.cleanup_interval)
            except Exception as e:
                logger.error(f"Cleanup scheduler error: {e}")
                await asyncio.sleep(60)  # Retry in 1 minute
    
    async def cleanup_expired_content(self):
        """Очистка просроченного контента"""
        
        cutoff_date = datetime.utcnow() - timedelta(days=self.retention_days)
        
        # Поиск контента без blockchain регистрации
        expired_content = await self._find_expired_content(cutoff_date)
        
        for content in expired_content:
            try:
                # Проверка отсутствия blockchain записи
                blockchain_exists = await self._check_blockchain_registration(content.hash)
                
                if not blockchain_exists:
                    # Удаление файла и записи
                    await self._remove_content(content)
                    
                    logger.info(f"Removed expired content: {content.hash}")
                else:
                    # Обновление статуса - контент зарегистрирован
                    content.blockchain_registered = True
                    await content.save()
                    
            except Exception as e:
                logger.error(f"Failed to cleanup content {content.hash}: {e}")
    
    async def _find_expired_content(self, cutoff_date: datetime) -> List[StoredContent]:
        """Поиск просроченного контента"""
        
        return await StoredContent.filter(
            created_at__lt=cutoff_date,
            blockchain_registered=False
        ).all()
    
    async def _check_blockchain_registration(self, content_hash: str) -> bool:
        """Проверка регистрации контента в блокчейне"""
        
        # TODO: Реализовать проверку TON блокчейна
        # Пока возвращаем False для тестирования
        return False
    
    async def _remove_content(self, content: StoredContent):
        """Удаление контента и связанных файлов"""
        
        # Удаление основного файла
        if os.path.exists(content.file_path):
            os.remove(content.file_path)
        
        # Удаление preview файлов
        preview_files = await PreviewContent.filter(parent_content_id=content.id).all()
        for preview in preview_files:
            if os.path.exists(preview.file_path):
                os.remove(preview.file_path)
            await preview.delete()
        
        # Удаление записи из базы данных
        await content.delete()

📊 Мониторинг новой системы

Расширенная система мониторинга

class DecentralizedNetworkMonitor:
    """Мониторинг децентрализованной сети v3.0"""
    
    async def get_network_health(self) -> dict:
        """Здоровье децентрализованной сети"""
        
        return {
            "decentralization_metrics": {
                "total_known_nodes": await self._count_known_nodes(),
                "active_connections": len(self.active_peers),
                "content_distribution": await self._analyze_content_distribution(),
                "network_redundancy": await self._calculate_network_redundancy()
            },
            "sync_metrics": {
                "pending_sync_items": self.sync_manager.get_pending_count(),
                "successful_syncs_24h": await self._get_sync_count(hours=24),
                "failed_syncs_24h": await self._get_failed_sync_count(hours=24),
                "avg_sync_time": await self._get_avg_sync_time()
            },
            "content_metrics": {
                "total_content_items": await self._count_local_content(),
                "content_accepted_rate": await self._get_acceptance_rate(),
                "content_cleanup_stats": await self._get_cleanup_stats(),
                "storage_usage": await self._get_storage_usage()
            },
            "security_metrics": {
                "access_requests_24h": await self._count_access_requests(hours=24),
                "denied_access_rate": await self._get_denied_access_rate(),
                "encryption_status": "AES-256-GCM",
                "version_compatibility": await self._get_version_compatibility_stats()
            }
        }

Этот новый протокол обеспечивает полную децентрализацию MY Network при сохранении безопасности и эффективности системы.