uploader-bot/app/core/content/chunk_manager.py

233 lines
9.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import asyncio
import base64
import logging
import math
from dataclasses import asdict
from hashlib import sha256
from typing import List, Iterable, Optional, Dict, Any, Tuple
from app.core.crypto.content_cipher import ContentCipher
from app.core.crypto import get_ed25519_manager
from app.core.models.content.chunk import ContentChunk
logger = logging.getLogger(__name__)
class ChunkManager:
"""
Управление разбиением контента на чанки и обратной сборкой.
Требования:
- Размер чанка: 8 MiB
- SHA-256 хэш каждого чанка (hex) для дедупликации
- Подпись каждого чанка Ed25519
- Интеграция с ContentCipher для шифрования/дешифрования чанков
"""
CHUNK_SIZE = 8 * 1024 * 1024 # 8 MiB
def __init__(self, cipher: Optional[ContentCipher] = None):
self.cipher = cipher or ContentCipher()
logger.debug("ChunkManager initialized with CHUNK_SIZE=%d", self.CHUNK_SIZE)
@staticmethod
def calculate_chunk_hash(data: bytes) -> str:
"""
Рассчитать SHA-256 хэш сырого буфера.
"""
h = sha256(data).hexdigest()
logger.debug("Calculated chunk SHA-256: %s", h)
return h
def _sign_chunk_payload(self, payload: Dict[str, Any]) -> Optional[str]:
"""
Подписать словарь Ed25519 через глобальный менеджер.
Возвращает base64-подпись либо None при ошибке (логируем).
"""
try:
crypto_mgr = get_ed25519_manager()
signature = crypto_mgr.sign_message(payload)
return signature
except Exception as e:
logger.error("Failed to sign chunk payload: %s", e)
return None
def split_content(
self,
content_id: str,
plaintext: bytes,
content_key: bytes,
metadata: Optional[Dict[str, Any]] = None,
associated_data: Optional[bytes] = None,
) -> List[ContentChunk]:
"""
Разбить исходный контент на зашифрованные и подписанные чанки.
Алгоритм:
1) Читаем кусками по CHUNK_SIZE
2) Шифруем каждый кусок через ContentCipher.encrypt_content (AES-256-GCM)
3) Формируем chunk_id как HEX(SHA-256(content_id || chunk_index || chunk_hash))
4) Подписываем полезную нагрузку чанка (без поля signature)
5) Возвращаем список ContentChunk
"""
assert isinstance(plaintext, (bytes, bytearray)), "plaintext must be bytes"
assert isinstance(content_key, (bytes, bytearray)) and len(content_key) == self.cipher.KEY_SIZE, \
"content_key must be 32 bytes"
total_size = len(plaintext)
chunks_count = math.ceil(total_size / self.CHUNK_SIZE) if total_size else 1
logger.info(
"Splitting content_id=%s into chunks: total_size=%d, chunk_size=%d, chunks=%d",
content_id, total_size, self.CHUNK_SIZE, chunks_count
)
result: List[ContentChunk] = []
offset = 0
index = 0
while offset < total_size or (total_size == 0 and index == 0):
part = plaintext[offset: offset + self.CHUNK_SIZE] if total_size else b""
offset += len(part)
logger.debug("Processing chunk index=%d, part_size=%d", index, len(part))
# Шифруем кусок
enc_obj = self.cipher.encrypt_content(
plaintext=part,
key=content_key,
metadata={"content_id": content_id, "chunk_index": index, **(metadata or {})},
associated_data=associated_data,
sign_with_ed25519=False, # подпишем на уровне чанка отдельно
)
# Собираем бинарные данные зашифрованного чанка (ciphertext||tag||nonce) для хэширования/дедупликации
ciphertext = base64.b64decode(enc_obj["ciphertext_b64"])
tag = base64.b64decode(enc_obj["tag_b64"])
nonce = base64.b64decode(enc_obj["nonce_b64"])
raw_encrypted_chunk = ciphertext + tag + nonce
chunk_hash = self.calculate_chunk_hash(raw_encrypted_chunk)
# Формируем chunk_id детерминированно
chunk_id = sha256(
(content_id + str(index) + chunk_hash).encode("utf-8")
).hexdigest()
payload_to_sign = {
"chunk_id": chunk_id,
"content_id": content_id,
"chunk_index": index,
"chunk_hash": chunk_hash,
"encrypted_data": base64.b64encode(raw_encrypted_chunk).decode("ascii"),
"created_at": enc_obj.get("created_at") or enc_obj.get("timestamp") or None,
}
# Удалим None, чтобы сериализация была стабильнее
payload_to_sign = {k: v for k, v in payload_to_sign.items() if v is not None}
signature = self._sign_chunk_payload(payload_to_sign)
chunk = ContentChunk(
chunk_id=payload_to_sign["chunk_id"],
content_id=payload_to_sign["content_id"],
chunk_index=payload_to_sign["chunk_index"],
chunk_hash=payload_to_sign["chunk_hash"],
encrypted_data=payload_to_sign["encrypted_data"],
signature=signature,
created_at=payload_to_sign.get("created_at") or None,
)
result.append(chunk)
logger.debug("Chunk created: index=%d, chunk_id=%s", index, chunk.chunk_id)
index += 1
logger.info("Split completed: content_id=%s, chunks=%d", content_id, len(result))
return result
def reassemble_content(
self,
chunks: Iterable[ContentChunk],
content_key: bytes,
associated_data: Optional[bytes] = None,
expected_content_id: Optional[str] = None,
) -> bytes:
"""
Сборка исходного контента из последовательности чанков.
Предполагается, что входные чанки валидированы и относятся к одинаковому content_id.
Порядок определяется по chunk_index.
"""
chunks_list = sorted(list(chunks), key=lambda c: c.chunk_index)
if not chunks_list:
logger.warning("Reassemble called with empty chunks list")
return b""
first_content_id = chunks_list[0].content_id
if expected_content_id and expected_content_id != first_content_id:
raise ValueError("content_id mismatch for reassembly")
logger.info("Reassembling content_id=%s from %d chunks", first_content_id, len(chunks_list))
assembled: List[bytes] = []
for c in chunks_list:
if c.content_id != first_content_id:
raise ValueError("mixed content_id detected during reassembly")
raw = c.encrypted_bytes()
# Разделим обратно: ciphertext||tag||nonce
if len(raw) < 16 + ContentCipher.NONCE_SIZE:
raise ValueError("invalid encrypted chunk length")
nonce = raw[-ContentCipher.NONCE_SIZE:]
tag = raw[-(ContentCipher.NONCE_SIZE + 16):-ContentCipher.NONCE_SIZE]
ciphertext = raw[:-(ContentCipher.NONCE_SIZE + 16)]
plaintext = self.cipher.decrypt_content(
ciphertext_b64=base64.b64encode(ciphertext).decode("ascii"),
nonce_b64=base64.b64encode(nonce).decode("ascii"),
tag_b64=base64.b64encode(tag).decode("ascii"),
key=content_key,
associated_data=associated_data,
)
assembled.append(plaintext)
data = b"".join(assembled)
logger.info("Reassembly completed: content_id=%s, total_size=%d", first_content_id, len(data))
return data
def verify_chunk_integrity(
self,
chunk: ContentChunk,
verify_signature: bool = True
) -> Tuple[bool, Optional[str]]:
"""
Проверка валидности чанка:
- Соответствие chunk_hash фактическим данным
- Верификация Ed25519 подписи полезной нагрузки чанка
"""
try:
raw = chunk.encrypted_bytes()
computed_hash = self.calculate_chunk_hash(raw)
if computed_hash != chunk.chunk_hash:
return False, "chunk_hash mismatch"
if verify_signature:
if not chunk.signature:
return False, "missing chunk signature"
payload = {
"chunk_id": chunk.chunk_id,
"content_id": chunk.content_id,
"chunk_index": int(chunk.chunk_index),
"chunk_hash": chunk.chunk_hash,
"encrypted_data": chunk.encrypted_data,
"created_at": chunk.created_at,
}
crypto_mgr = get_ed25519_manager()
ok = crypto_mgr.verify_signature(payload, chunk.signature, crypto_mgr.public_key_hex)
if not ok:
return False, "invalid chunk signature"
return True, None
except Exception as e:
logger.error("verify_chunk_integrity error: %s", e)
return False, str(e)