uploader-bot/docs/NEW_PROTOCOL_V3.md

602 lines
27 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# MY Network v3.0 - Новый протокол децентрализованной синхронизации
## 🎯 Обзор протокола v3.0
MY Network v3.0 полностью отказывается от кворумной системы в пользу децентрализованной архитектуры, где каждая нода принимает независимые решения о принятии и хранении контента.
## 🔄 Протокол индивидуальной синхронизации
### Принципы новой системы
1. **Автономность решений**: Каждая нода самостоятельно решает принимать контент или нет
2. **Отсутствие консенсуса**: Нет необходимости в голосовании других нод
3. **Гибкая фильтрация**: Настраиваемые правила принятия контента на уровне ноды
4. **Устойчивость к цензуре**: Контент доступен пока есть хотя бы одна нода
### Новый алгоритм синхронизации
```python
class DecentralizedSyncProtocol:
"""Протокол децентрализованной синхронизации v3.0"""
def __init__(self, node_config: dict):
self.node_id = node_config["node_id"]
self.content_filter = IndividualContentFilter(node_config)
self.sync_manager = SyncManager()
self.active_peers = {}
async def handle_content_announcement(self, peer_id: str, announcement: dict) -> bool:
"""
Обработка анонса нового контента от пира
Новая логика v3.0:
1. Каждая нода принимает решение индивидуально
2. Нет консенсуса или голосования
3. Простая заглушка для фильтрации (пока всегда True)
"""
content_hash = announcement.get("content_hash")
content_metadata = announcement.get("metadata", {})
logger.info(f"Received content announcement from {peer_id}: {content_hash}")
# Проверка - не дубликат ли это
if await self._content_already_exists(content_hash):
logger.debug(f"Content {content_hash} already exists locally")
return False
# Индивидуальное решение о принятии контента
should_accept = await self.content_filter.should_accept_content(
content_hash, content_metadata, peer_id
)
if should_accept:
logger.info(f"Accepting content {content_hash} from {peer_id}")
# Добавление в очередь синхронизации
await self.sync_manager.queue_content_sync(
peer_id=peer_id,
content_hash=content_hash,
metadata=content_metadata
)
return True
else:
logger.info(f"Rejecting content {content_hash} from {peer_id}")
return False
async def sync_content_from_peer(self, peer_id: str, content_hash: str,
metadata: dict) -> bool:
"""Синхронизация контента с пира (без консенсуса)"""
try:
# Запрос метаданных контента
content_info = await self._request_content_info(peer_id, content_hash)
if not content_info:
logger.error(f"Failed to get content info for {content_hash}")
return False
# Проверка возможности получения контента от пира
access_granted = await self._request_content_access(peer_id, content_hash)
if not access_granted:
logger.warning(f"Access denied for content {content_hash} from {peer_id}")
return False
# Загрузка контента
content_data = await self._download_content(peer_id, content_hash)
if not content_data:
logger.error(f"Failed to download content {content_hash}")
return False
# Проверка целостности
if not await self._verify_content_integrity(content_hash, content_data):
logger.error(f"Content integrity check failed: {content_hash}")
return False
# Сохранение контента локально
stored_content = await self._store_content_locally(
content_hash, content_data, metadata
)
if stored_content:
logger.info(f"Successfully synced content {content_hash} from {peer_id}")
# Анонс нового контента другим пирам (кроме источника)
await self._announce_content_to_peers(stored_content, exclude_peer=peer_id)
return True
except Exception as e:
logger.error(f"Failed to sync content {content_hash} from {peer_id}: {e}")
return False
class IndividualContentFilter:
"""Система индивидуальной фильтрации контента"""
def __init__(self, node_config: dict):
self.node_config = node_config
self.whitelist_authors = set(node_config.get("whitelist_authors", []))
self.blacklist_authors = set(node_config.get("blacklist_authors", []))
self.allowed_content_types = set(node_config.get("allowed_content_types", ["*"]))
self.max_file_size = node_config.get("max_file_size", 100 * 1024 * 1024) # 100MB
async def should_accept_content(self, content_hash: str, metadata: dict,
peer_id: str) -> bool:
"""
Решение о принятии контента (заглушка для будущего расширения)
В текущей версии v3.0 всегда возвращает True.
В будущем здесь будут фильтры по:
- Авторам (whitelist/blacklist)
- Типам контента
- Размерам файлов
- Тегам и категориям
- Репутации пира
"""
# TODO: Добавить реальную логику фильтрации
# Пока принимаем весь контент
return True
# Пример будущей логики фильтрации:
"""
author_id = metadata.get("author_id")
content_type = metadata.get("content_type")
file_size = metadata.get("file_size", 0)
# Проверка черного списка авторов
if author_id in self.blacklist_authors:
return False
# Проверка белого списка (если не пустой)
if self.whitelist_authors and author_id not in self.whitelist_authors:
return False
# Проверка типа контента
if "*" not in self.allowed_content_types:
if content_type not in self.allowed_content_types:
return False
# Проверка размера файла
if file_size > self.max_file_size:
return False
return True
"""
```
## 🔐 Новая система шифрования и безопасности
### Архитектура безопасности v3.0
```python
class ContentSecurityManager:
"""Управление безопасностью контента в MY Network v3.0"""
def __init__(self):
self.hash_algorithm = "sha256"
self.encryption_algorithm = "AES-256-GCM"
async def create_encrypted_content(self, content_data: bytes,
content_metadata: dict) -> dict:
"""
Создание зашифрованного контента с единым хэшем
Система v3.0:
1. Симметричное шифрование с уникальным ключом для каждого контента
2. Единый хэш encrypted_content на всех нодах
3. Отдельные preview_id не связанные с основным хэшем
"""
# Генерация уникального ключа шифрования для этого контента
encryption_key = self._generate_content_key()
# Шифрование контента
encrypted_data = await self._encrypt_content(content_data, encryption_key)
# Вычисление единого хэша зашифрованного контента
encrypted_content_hash = self._calculate_deterministic_hash(encrypted_data)
# Создание отдельного preview_id (не связанного с хэшем)
preview_id = str(uuid4())
return {
"encrypted_content_hash": encrypted_content_hash, # Одинаковый на всех нодах
"encrypted_data": encrypted_data,
"encryption_key": encryption_key, # Передается при продаже
"preview_id": preview_id, # Для публичного доступа к preview
"metadata": content_metadata
}
def _generate_content_key(self) -> str:
"""Генерация уникального ключа для контента"""
return secrets.token_hex(32) # 256-bit key
async def _encrypt_content(self, content_data: bytes, encryption_key: str) -> bytes:
"""Симметричное шифрование контента"""
# Преобразование ключа в байты
key_bytes = bytes.fromhex(encryption_key)
# Создание шифратора AES-256-GCM
cipher = AES.new(key_bytes, AES.MODE_GCM)
# Шифрование
ciphertext, auth_tag = cipher.encrypt_and_digest(content_data)
# Объединение nonce, auth_tag и ciphertext для детерминированного хэша
encrypted_data = cipher.nonce + auth_tag + ciphertext
return encrypted_data
def _calculate_deterministic_hash(self, encrypted_data: bytes) -> str:
"""
Вычисление детерминированного хэша зашифрованного контента
ВАЖНО: Этот хэш будет одинаковым на всех нодах для одного контента,
но через него нельзя получить доступ к самому контенту
"""
return hashlib.sha256(encrypted_data).hexdigest()
async def decrypt_content(self, encrypted_data: bytes, encryption_key: str) -> bytes:
"""Расшифровка контента с помощью ключа"""
# Извлечение компонентов
nonce = encrypted_data[:16] # AES-GCM nonce: 16 bytes
auth_tag = encrypted_data[16:32] # Auth tag: 16 bytes
ciphertext = encrypted_data[32:] # Остальное - зашифрованные данные
# Создание дешифратора
key_bytes = bytes.fromhex(encryption_key)
cipher = AES.new(key_bytes, AES.MODE_GCM, nonce=nonce)
# Расшифровка с проверкой подлинности
decrypted_data = cipher.decrypt_and_verify(ciphertext, auth_tag)
return decrypted_data
async def create_preview_content(self, original_content: bytes,
content_type: str) -> tuple[bytes, str]:
"""
Создание preview контента (не зашифрованного)
Preview:
- Не связан с основным хэшем контента
- Имеет собственный уникальный ID
- Доступен публично без ключей
"""
# Создание preview в зависимости от типа контента
if content_type.startswith("audio/"):
# Для аудио - первые 30 секунд
preview_content = await self._create_audio_preview(original_content)
elif content_type.startswith("video/"):
# Для видео - первые 30 секунд + watermark
preview_content = await self._create_video_preview(original_content)
elif content_type.startswith("image/"):
# Для изображений - уменьшенная версия с watermark
preview_content = await self._create_image_preview(original_content)
else:
# Для других типов - описание или thumbnail
preview_content = await self._create_generic_preview(original_content)
# Уникальный ID для preview (НЕ хэш!)
preview_id = str(uuid4())
return preview_content, preview_id
class ContentAccessControl:
"""Система контроля доступа к контенту"""
async def can_provide_content(self, requesting_node: str, content_hash: str,
access_type: str = "full") -> dict:
"""
Проверка возможности предоставления контента
В v3.0 пока разрешаем всем (заглушка для будущего расширения)
"""
# TODO: Добавить реальную логику контроля доступа
# Пока разрешаем всем
return {
"access_granted": True,
"access_type": access_type,
"reason": "Open access policy"
}
# Пример будущей логики:
"""
# Проверка репутации ноды
node_reputation = await self._get_node_reputation(requesting_node)
if node_reputation < 0.5:
return {"access_granted": False, "reason": "Low node reputation"}
# Проверка коммерческих лицензий
content = await StoredContent.get_by_hash(content_hash)
if content and content.commercial_license:
license_valid = await self._check_commercial_license(
requesting_node, content_hash
)
if not license_valid:
return {"access_granted": False, "reason": "No valid license"}
# Проверка региональных ограничений
node_region = await self._get_node_region(requesting_node)
if node_region in content.restricted_regions:
return {"access_granted": False, "reason": "Regional restrictions"}
return {"access_granted": True, "access_type": access_type}
"""
async def get_content_by_preview_id(self, preview_id: str) -> Optional[bytes]:
"""
Получение preview контента по preview_id
Preview доступен всем без ограничений
"""
# Поиск preview по ID в базе данных
preview_content = await PreviewContent.get_by_preview_id(preview_id)
if preview_content:
return preview_content.data
return None
async def request_content_key(self, content_hash: str, user_license: dict) -> Optional[str]:
"""
Запрос ключа расшифровки контента
Ключ выдается только при наличии соответствующих прав
"""
# Проверка лицензии пользователя
if await self._validate_user_license(content_hash, user_license):
# Получение ключа расшифровки
content = await StoredContent.get_by_hash(content_hash)
if content:
return content.encryption_key
return None
```
## 🌐 Обновленный протокол обмена сообщениями
### Новые типы сообщений v3.0
```python
class P2PMessageTypes:
"""Типы сообщений протокола MY Network v3.0"""
# Базовые сообщения
HANDSHAKE = "handshake"
HANDSHAKE_RESPONSE = "handshake_response"
# Объявления о контенте (без консенсуса)
CONTENT_ANNOUNCEMENT = "content_announcement"
CONTENT_REQUEST = "content_request"
CONTENT_RESPONSE = "content_response"
# Синхронизация
SYNC_REQUEST = "sync_request"
SYNC_RESPONSE = "sync_response"
SYNC_DATA = "sync_data"
# Доступ к контенту
ACCESS_REQUEST = "access_request"
ACCESS_RESPONSE = "access_response"
# Версионирование
VERSION_INFO = "version_info"
VERSION_WARNING = "version_warning"
class P2PMessageHandler:
"""Обработчик P2P сообщений протокола v3.0"""
async def handle_content_announcement(self, peer_id: str, message: dict):
"""Обработка анонса контента (без консенсуса)"""
content_hash = message.get("content_hash")
metadata = message.get("metadata", {})
# Индивидуальное решение о принятии
should_accept = await self.sync_protocol.handle_content_announcement(
peer_id, message
)
# Отправка ответа пиру
response = {
"type": P2PMessageTypes.CONTENT_RESPONSE,
"content_hash": content_hash,
"accepted": should_accept,
"node_id": self.node_id,
"timestamp": datetime.utcnow().isoformat()
}
await self.send_message_to_peer(peer_id, response)
async def handle_sync_request(self, peer_id: str, message: dict):
"""Обработка запроса синхронизации"""
content_hash = message.get("content_hash")
# Проверка доступа к контенту
access_check = await self.access_control.can_provide_content(
peer_id, content_hash
)
if access_check["access_granted"]:
# Предоставление контента
content_data = await self._get_content_data(content_hash)
response = {
"type": P2PMessageTypes.SYNC_RESPONSE,
"content_hash": content_hash,
"status": "approved",
"data_size": len(content_data) if content_data else 0
}
else:
# Отказ в доступе
response = {
"type": P2PMessageTypes.SYNC_RESPONSE,
"content_hash": content_hash,
"status": "denied",
"reason": access_check.get("reason", "Access denied")
}
await self.send_message_to_peer(peer_id, response)
# Если доступ разрешен, отправляем данные
if access_check["access_granted"] and content_data:
await self._send_content_data(peer_id, content_hash, content_data)
```
## 📋 Новые API эндпоинты v3.0
### Обновленные API для децентрализованной архитектуры
```python
# Индивидуальная синхронизация (без консенсуса)
POST /api/v3/sync/announce # Анонс контента в сеть
GET /api/v3/sync/pending # Ожидающие синхронизации
POST /api/v3/sync/accept/{hash} # Принять конкретный контент
POST /api/v3/sync/reject/{hash} # Отклонить конкретный контент
# Управление фильтрами контента
GET /api/v3/filters/content # Текущие фильтры
PUT /api/v3/filters/content # Обновить фильтры
POST /api/v3/filters/test # Тестирование фильтров
# Безопасность контента
GET /api/v3/content/{hash}/preview/{preview_id} # Получение preview
POST /api/v3/content/{hash}/request-key # Запрос ключа расшифровки
GET /api/v3/content/{hash}/access-info # Информация о доступе
# Статистика децентрализованной сети
GET /api/v3/network/nodes # Известные ноды в сети
GET /api/v3/network/content/distribution # Распределение контента
GET /api/v3/network/sync/stats # Статистика синхронизации
```
## 🔄 Алгоритм удаления неиспользуемого контента
### Система очистки контента
```python
class ContentCleanupManager:
"""Управление очисткой контента через 7 дней"""
def __init__(self):
self.retention_days = 7
self.cleanup_interval = 3600 # 1 час
async def start_cleanup_scheduler(self):
"""Запуск планировщика очистки"""
while True:
try:
await self.cleanup_expired_content()
await asyncio.sleep(self.cleanup_interval)
except Exception as e:
logger.error(f"Cleanup scheduler error: {e}")
await asyncio.sleep(60) # Retry in 1 minute
async def cleanup_expired_content(self):
"""Очистка просроченного контента"""
cutoff_date = datetime.utcnow() - timedelta(days=self.retention_days)
# Поиск контента без blockchain регистрации
expired_content = await self._find_expired_content(cutoff_date)
for content in expired_content:
try:
# Проверка отсутствия blockchain записи
blockchain_exists = await self._check_blockchain_registration(content.hash)
if not blockchain_exists:
# Удаление файла и записи
await self._remove_content(content)
logger.info(f"Removed expired content: {content.hash}")
else:
# Обновление статуса - контент зарегистрирован
content.blockchain_registered = True
await content.save()
except Exception as e:
logger.error(f"Failed to cleanup content {content.hash}: {e}")
async def _find_expired_content(self, cutoff_date: datetime) -> List[StoredContent]:
"""Поиск просроченного контента"""
return await StoredContent.filter(
created_at__lt=cutoff_date,
blockchain_registered=False
).all()
async def _check_blockchain_registration(self, content_hash: str) -> bool:
"""Проверка регистрации контента в блокчейне"""
# TODO: Реализовать проверку TON блокчейна
# Пока возвращаем False для тестирования
return False
async def _remove_content(self, content: StoredContent):
"""Удаление контента и связанных файлов"""
# Удаление основного файла
if os.path.exists(content.file_path):
os.remove(content.file_path)
# Удаление preview файлов
preview_files = await PreviewContent.filter(parent_content_id=content.id).all()
for preview in preview_files:
if os.path.exists(preview.file_path):
os.remove(preview.file_path)
await preview.delete()
# Удаление записи из базы данных
await content.delete()
```
## 📊 Мониторинг новой системы
### Расширенная система мониторинга
```python
class DecentralizedNetworkMonitor:
"""Мониторинг децентрализованной сети v3.0"""
async def get_network_health(self) -> dict:
"""Здоровье децентрализованной сети"""
return {
"decentralization_metrics": {
"total_known_nodes": await self._count_known_nodes(),
"active_connections": len(self.active_peers),
"content_distribution": await self._analyze_content_distribution(),
"network_redundancy": await self._calculate_network_redundancy()
},
"sync_metrics": {
"pending_sync_items": self.sync_manager.get_pending_count(),
"successful_syncs_24h": await self._get_sync_count(hours=24),
"failed_syncs_24h": await self._get_failed_sync_count(hours=24),
"avg_sync_time": await self._get_avg_sync_time()
},
"content_metrics": {
"total_content_items": await self._count_local_content(),
"content_accepted_rate": await self._get_acceptance_rate(),
"content_cleanup_stats": await self._get_cleanup_stats(),
"storage_usage": await self._get_storage_usage()
},
"security_metrics": {
"access_requests_24h": await self._count_access_requests(hours=24),
"denied_access_rate": await self._get_denied_access_rate(),
"encryption_status": "AES-256-GCM",
"version_compatibility": await self._get_version_compatibility_stats()
}
}
```
Этот новый протокол обеспечивает полную децентрализацию MY Network при сохранении безопасности и эффективности системы.