27 KiB
27 KiB
MY Network v3.0 - Новый протокол децентрализованной синхронизации
🎯 Обзор протокола v3.0
MY Network v3.0 полностью отказывается от кворумной системы в пользу децентрализованной архитектуры, где каждая нода принимает независимые решения о принятии и хранении контента.
🔄 Протокол индивидуальной синхронизации
Принципы новой системы
- Автономность решений: Каждая нода самостоятельно решает принимать контент или нет
- Отсутствие консенсуса: Нет необходимости в голосовании других нод
- Гибкая фильтрация: Настраиваемые правила принятия контента на уровне ноды
- Устойчивость к цензуре: Контент доступен пока есть хотя бы одна нода
Новый алгоритм синхронизации
class DecentralizedSyncProtocol:
"""Протокол децентрализованной синхронизации v3.0"""
def __init__(self, node_config: dict):
self.node_id = node_config["node_id"]
self.content_filter = IndividualContentFilter(node_config)
self.sync_manager = SyncManager()
self.active_peers = {}
async def handle_content_announcement(self, peer_id: str, announcement: dict) -> bool:
"""
Обработка анонса нового контента от пира
Новая логика v3.0:
1. Каждая нода принимает решение индивидуально
2. Нет консенсуса или голосования
3. Простая заглушка для фильтрации (пока всегда True)
"""
content_hash = announcement.get("content_hash")
content_metadata = announcement.get("metadata", {})
logger.info(f"Received content announcement from {peer_id}: {content_hash}")
# Проверка - не дубликат ли это
if await self._content_already_exists(content_hash):
logger.debug(f"Content {content_hash} already exists locally")
return False
# Индивидуальное решение о принятии контента
should_accept = await self.content_filter.should_accept_content(
content_hash, content_metadata, peer_id
)
if should_accept:
logger.info(f"Accepting content {content_hash} from {peer_id}")
# Добавление в очередь синхронизации
await self.sync_manager.queue_content_sync(
peer_id=peer_id,
content_hash=content_hash,
metadata=content_metadata
)
return True
else:
logger.info(f"Rejecting content {content_hash} from {peer_id}")
return False
async def sync_content_from_peer(self, peer_id: str, content_hash: str,
metadata: dict) -> bool:
"""Синхронизация контента с пира (без консенсуса)"""
try:
# Запрос метаданных контента
content_info = await self._request_content_info(peer_id, content_hash)
if not content_info:
logger.error(f"Failed to get content info for {content_hash}")
return False
# Проверка возможности получения контента от пира
access_granted = await self._request_content_access(peer_id, content_hash)
if not access_granted:
logger.warning(f"Access denied for content {content_hash} from {peer_id}")
return False
# Загрузка контента
content_data = await self._download_content(peer_id, content_hash)
if not content_data:
logger.error(f"Failed to download content {content_hash}")
return False
# Проверка целостности
if not await self._verify_content_integrity(content_hash, content_data):
logger.error(f"Content integrity check failed: {content_hash}")
return False
# Сохранение контента локально
stored_content = await self._store_content_locally(
content_hash, content_data, metadata
)
if stored_content:
logger.info(f"Successfully synced content {content_hash} from {peer_id}")
# Анонс нового контента другим пирам (кроме источника)
await self._announce_content_to_peers(stored_content, exclude_peer=peer_id)
return True
except Exception as e:
logger.error(f"Failed to sync content {content_hash} from {peer_id}: {e}")
return False
class IndividualContentFilter:
"""Система индивидуальной фильтрации контента"""
def __init__(self, node_config: dict):
self.node_config = node_config
self.whitelist_authors = set(node_config.get("whitelist_authors", []))
self.blacklist_authors = set(node_config.get("blacklist_authors", []))
self.allowed_content_types = set(node_config.get("allowed_content_types", ["*"]))
self.max_file_size = node_config.get("max_file_size", 100 * 1024 * 1024) # 100MB
async def should_accept_content(self, content_hash: str, metadata: dict,
peer_id: str) -> bool:
"""
Решение о принятии контента (заглушка для будущего расширения)
В текущей версии v3.0 всегда возвращает True.
В будущем здесь будут фильтры по:
- Авторам (whitelist/blacklist)
- Типам контента
- Размерам файлов
- Тегам и категориям
- Репутации пира
"""
# TODO: Добавить реальную логику фильтрации
# Пока принимаем весь контент
return True
# Пример будущей логики фильтрации:
"""
author_id = metadata.get("author_id")
content_type = metadata.get("content_type")
file_size = metadata.get("file_size", 0)
# Проверка черного списка авторов
if author_id in self.blacklist_authors:
return False
# Проверка белого списка (если не пустой)
if self.whitelist_authors and author_id not in self.whitelist_authors:
return False
# Проверка типа контента
if "*" not in self.allowed_content_types:
if content_type not in self.allowed_content_types:
return False
# Проверка размера файла
if file_size > self.max_file_size:
return False
return True
"""
🔐 Новая система шифрования и безопасности
Архитектура безопасности v3.0
class ContentSecurityManager:
"""Управление безопасностью контента в MY Network v3.0"""
def __init__(self):
self.hash_algorithm = "sha256"
self.encryption_algorithm = "AES-256-GCM"
async def create_encrypted_content(self, content_data: bytes,
content_metadata: dict) -> dict:
"""
Создание зашифрованного контента с единым хэшем
Система v3.0:
1. Симметричное шифрование с уникальным ключом для каждого контента
2. Единый хэш encrypted_content на всех нодах
3. Отдельные preview_id не связанные с основным хэшем
"""
# Генерация уникального ключа шифрования для этого контента
encryption_key = self._generate_content_key()
# Шифрование контента
encrypted_data = await self._encrypt_content(content_data, encryption_key)
# Вычисление единого хэша зашифрованного контента
encrypted_content_hash = self._calculate_deterministic_hash(encrypted_data)
# Создание отдельного preview_id (не связанного с хэшем)
preview_id = str(uuid4())
return {
"encrypted_content_hash": encrypted_content_hash, # Одинаковый на всех нодах
"encrypted_data": encrypted_data,
"encryption_key": encryption_key, # Передается при продаже
"preview_id": preview_id, # Для публичного доступа к preview
"metadata": content_metadata
}
def _generate_content_key(self) -> str:
"""Генерация уникального ключа для контента"""
return secrets.token_hex(32) # 256-bit key
async def _encrypt_content(self, content_data: bytes, encryption_key: str) -> bytes:
"""Симметричное шифрование контента"""
# Преобразование ключа в байты
key_bytes = bytes.fromhex(encryption_key)
# Создание шифратора AES-256-GCM
cipher = AES.new(key_bytes, AES.MODE_GCM)
# Шифрование
ciphertext, auth_tag = cipher.encrypt_and_digest(content_data)
# Объединение nonce, auth_tag и ciphertext для детерминированного хэша
encrypted_data = cipher.nonce + auth_tag + ciphertext
return encrypted_data
def _calculate_deterministic_hash(self, encrypted_data: bytes) -> str:
"""
Вычисление детерминированного хэша зашифрованного контента
ВАЖНО: Этот хэш будет одинаковым на всех нодах для одного контента,
но через него нельзя получить доступ к самому контенту
"""
return hashlib.sha256(encrypted_data).hexdigest()
async def decrypt_content(self, encrypted_data: bytes, encryption_key: str) -> bytes:
"""Расшифровка контента с помощью ключа"""
# Извлечение компонентов
nonce = encrypted_data[:16] # AES-GCM nonce: 16 bytes
auth_tag = encrypted_data[16:32] # Auth tag: 16 bytes
ciphertext = encrypted_data[32:] # Остальное - зашифрованные данные
# Создание дешифратора
key_bytes = bytes.fromhex(encryption_key)
cipher = AES.new(key_bytes, AES.MODE_GCM, nonce=nonce)
# Расшифровка с проверкой подлинности
decrypted_data = cipher.decrypt_and_verify(ciphertext, auth_tag)
return decrypted_data
async def create_preview_content(self, original_content: bytes,
content_type: str) -> tuple[bytes, str]:
"""
Создание preview контента (не зашифрованного)
Preview:
- Не связан с основным хэшем контента
- Имеет собственный уникальный ID
- Доступен публично без ключей
"""
# Создание preview в зависимости от типа контента
if content_type.startswith("audio/"):
# Для аудио - первые 30 секунд
preview_content = await self._create_audio_preview(original_content)
elif content_type.startswith("video/"):
# Для видео - первые 30 секунд + watermark
preview_content = await self._create_video_preview(original_content)
elif content_type.startswith("image/"):
# Для изображений - уменьшенная версия с watermark
preview_content = await self._create_image_preview(original_content)
else:
# Для других типов - описание или thumbnail
preview_content = await self._create_generic_preview(original_content)
# Уникальный ID для preview (НЕ хэш!)
preview_id = str(uuid4())
return preview_content, preview_id
class ContentAccessControl:
"""Система контроля доступа к контенту"""
async def can_provide_content(self, requesting_node: str, content_hash: str,
access_type: str = "full") -> dict:
"""
Проверка возможности предоставления контента
В v3.0 пока разрешаем всем (заглушка для будущего расширения)
"""
# TODO: Добавить реальную логику контроля доступа
# Пока разрешаем всем
return {
"access_granted": True,
"access_type": access_type,
"reason": "Open access policy"
}
# Пример будущей логики:
"""
# Проверка репутации ноды
node_reputation = await self._get_node_reputation(requesting_node)
if node_reputation < 0.5:
return {"access_granted": False, "reason": "Low node reputation"}
# Проверка коммерческих лицензий
content = await StoredContent.get_by_hash(content_hash)
if content and content.commercial_license:
license_valid = await self._check_commercial_license(
requesting_node, content_hash
)
if not license_valid:
return {"access_granted": False, "reason": "No valid license"}
# Проверка региональных ограничений
node_region = await self._get_node_region(requesting_node)
if node_region in content.restricted_regions:
return {"access_granted": False, "reason": "Regional restrictions"}
return {"access_granted": True, "access_type": access_type}
"""
async def get_content_by_preview_id(self, preview_id: str) -> Optional[bytes]:
"""
Получение preview контента по preview_id
Preview доступен всем без ограничений
"""
# Поиск preview по ID в базе данных
preview_content = await PreviewContent.get_by_preview_id(preview_id)
if preview_content:
return preview_content.data
return None
async def request_content_key(self, content_hash: str, user_license: dict) -> Optional[str]:
"""
Запрос ключа расшифровки контента
Ключ выдается только при наличии соответствующих прав
"""
# Проверка лицензии пользователя
if await self._validate_user_license(content_hash, user_license):
# Получение ключа расшифровки
content = await StoredContent.get_by_hash(content_hash)
if content:
return content.encryption_key
return None
🌐 Обновленный протокол обмена сообщениями
Новые типы сообщений v3.0
class P2PMessageTypes:
"""Типы сообщений протокола MY Network v3.0"""
# Базовые сообщения
HANDSHAKE = "handshake"
HANDSHAKE_RESPONSE = "handshake_response"
# Объявления о контенте (без консенсуса)
CONTENT_ANNOUNCEMENT = "content_announcement"
CONTENT_REQUEST = "content_request"
CONTENT_RESPONSE = "content_response"
# Синхронизация
SYNC_REQUEST = "sync_request"
SYNC_RESPONSE = "sync_response"
SYNC_DATA = "sync_data"
# Доступ к контенту
ACCESS_REQUEST = "access_request"
ACCESS_RESPONSE = "access_response"
# Версионирование
VERSION_INFO = "version_info"
VERSION_WARNING = "version_warning"
class P2PMessageHandler:
"""Обработчик P2P сообщений протокола v3.0"""
async def handle_content_announcement(self, peer_id: str, message: dict):
"""Обработка анонса контента (без консенсуса)"""
content_hash = message.get("content_hash")
metadata = message.get("metadata", {})
# Индивидуальное решение о принятии
should_accept = await self.sync_protocol.handle_content_announcement(
peer_id, message
)
# Отправка ответа пиру
response = {
"type": P2PMessageTypes.CONTENT_RESPONSE,
"content_hash": content_hash,
"accepted": should_accept,
"node_id": self.node_id,
"timestamp": datetime.utcnow().isoformat()
}
await self.send_message_to_peer(peer_id, response)
async def handle_sync_request(self, peer_id: str, message: dict):
"""Обработка запроса синхронизации"""
content_hash = message.get("content_hash")
# Проверка доступа к контенту
access_check = await self.access_control.can_provide_content(
peer_id, content_hash
)
if access_check["access_granted"]:
# Предоставление контента
content_data = await self._get_content_data(content_hash)
response = {
"type": P2PMessageTypes.SYNC_RESPONSE,
"content_hash": content_hash,
"status": "approved",
"data_size": len(content_data) if content_data else 0
}
else:
# Отказ в доступе
response = {
"type": P2PMessageTypes.SYNC_RESPONSE,
"content_hash": content_hash,
"status": "denied",
"reason": access_check.get("reason", "Access denied")
}
await self.send_message_to_peer(peer_id, response)
# Если доступ разрешен, отправляем данные
if access_check["access_granted"] and content_data:
await self._send_content_data(peer_id, content_hash, content_data)
📋 Новые API эндпоинты v3.0
Обновленные API для децентрализованной архитектуры
# Индивидуальная синхронизация (без консенсуса)
POST /api/v3/sync/announce # Анонс контента в сеть
GET /api/v3/sync/pending # Ожидающие синхронизации
POST /api/v3/sync/accept/{hash} # Принять конкретный контент
POST /api/v3/sync/reject/{hash} # Отклонить конкретный контент
# Управление фильтрами контента
GET /api/v3/filters/content # Текущие фильтры
PUT /api/v3/filters/content # Обновить фильтры
POST /api/v3/filters/test # Тестирование фильтров
# Безопасность контента
GET /api/v3/content/{hash}/preview/{preview_id} # Получение preview
POST /api/v3/content/{hash}/request-key # Запрос ключа расшифровки
GET /api/v3/content/{hash}/access-info # Информация о доступе
# Статистика децентрализованной сети
GET /api/v3/network/nodes # Известные ноды в сети
GET /api/v3/network/content/distribution # Распределение контента
GET /api/v3/network/sync/stats # Статистика синхронизации
🔄 Алгоритм удаления неиспользуемого контента
Система очистки контента
class ContentCleanupManager:
"""Управление очисткой контента через 7 дней"""
def __init__(self):
self.retention_days = 7
self.cleanup_interval = 3600 # 1 час
async def start_cleanup_scheduler(self):
"""Запуск планировщика очистки"""
while True:
try:
await self.cleanup_expired_content()
await asyncio.sleep(self.cleanup_interval)
except Exception as e:
logger.error(f"Cleanup scheduler error: {e}")
await asyncio.sleep(60) # Retry in 1 minute
async def cleanup_expired_content(self):
"""Очистка просроченного контента"""
cutoff_date = datetime.utcnow() - timedelta(days=self.retention_days)
# Поиск контента без blockchain регистрации
expired_content = await self._find_expired_content(cutoff_date)
for content in expired_content:
try:
# Проверка отсутствия blockchain записи
blockchain_exists = await self._check_blockchain_registration(content.hash)
if not blockchain_exists:
# Удаление файла и записи
await self._remove_content(content)
logger.info(f"Removed expired content: {content.hash}")
else:
# Обновление статуса - контент зарегистрирован
content.blockchain_registered = True
await content.save()
except Exception as e:
logger.error(f"Failed to cleanup content {content.hash}: {e}")
async def _find_expired_content(self, cutoff_date: datetime) -> List[StoredContent]:
"""Поиск просроченного контента"""
return await StoredContent.filter(
created_at__lt=cutoff_date,
blockchain_registered=False
).all()
async def _check_blockchain_registration(self, content_hash: str) -> bool:
"""Проверка регистрации контента в блокчейне"""
# TODO: Реализовать проверку TON блокчейна
# Пока возвращаем False для тестирования
return False
async def _remove_content(self, content: StoredContent):
"""Удаление контента и связанных файлов"""
# Удаление основного файла
if os.path.exists(content.file_path):
os.remove(content.file_path)
# Удаление preview файлов
preview_files = await PreviewContent.filter(parent_content_id=content.id).all()
for preview in preview_files:
if os.path.exists(preview.file_path):
os.remove(preview.file_path)
await preview.delete()
# Удаление записи из базы данных
await content.delete()
📊 Мониторинг новой системы
Расширенная система мониторинга
class DecentralizedNetworkMonitor:
"""Мониторинг децентрализованной сети v3.0"""
async def get_network_health(self) -> dict:
"""Здоровье децентрализованной сети"""
return {
"decentralization_metrics": {
"total_known_nodes": await self._count_known_nodes(),
"active_connections": len(self.active_peers),
"content_distribution": await self._analyze_content_distribution(),
"network_redundancy": await self._calculate_network_redundancy()
},
"sync_metrics": {
"pending_sync_items": self.sync_manager.get_pending_count(),
"successful_syncs_24h": await self._get_sync_count(hours=24),
"failed_syncs_24h": await self._get_failed_sync_count(hours=24),
"avg_sync_time": await self._get_avg_sync_time()
},
"content_metrics": {
"total_content_items": await self._count_local_content(),
"content_accepted_rate": await self._get_acceptance_rate(),
"content_cleanup_stats": await self._get_cleanup_stats(),
"storage_usage": await self._get_storage_usage()
},
"security_metrics": {
"access_requests_24h": await self._count_access_requests(hours=24),
"denied_access_rate": await self._get_denied_access_rate(),
"encryption_status": "AES-256-GCM",
"version_compatibility": await self._get_version_compatibility_stats()
}
}
Этот новый протокол обеспечивает полную децентрализацию MY Network при сохранении безопасности и эффективности системы.