126 lines
5.6 KiB
Python
126 lines
5.6 KiB
Python
import base64
|
||
import math
|
||
import os
|
||
from typing import List
|
||
|
||
import pytest
|
||
|
||
from .test_helpers import make_random_bytes, approx_eq_bytes, assert_dict_has_keys
|
||
|
||
try:
|
||
from app.core.content.chunk_manager import ChunkManager
|
||
from app.core.crypto import ContentCipher
|
||
from app.core.models.content.chunk import ContentChunk
|
||
except Exception:
|
||
ChunkManager = None # type: ignore
|
||
ContentCipher = None # type: ignore
|
||
ContentChunk = None # type: ignore
|
||
|
||
|
||
pytestmark = pytest.mark.chunking
|
||
|
||
|
||
@pytest.mark.skipif(ChunkManager is None or ContentCipher is None, reason="ChunkManager/ContentCipher not importable")
|
||
def test_split_and_reassemble_roundtrip(content_cipher, random_content_key):
|
||
cm = ChunkManager(cipher=content_cipher)
|
||
data = make_random_bytes(2 * cm.CHUNK_SIZE + 123) # 2 полных чанка + хвост
|
||
content_id = "content-" + os.urandom(8).hex()
|
||
|
||
chunks: List[ContentChunk] = cm.split_content(content_id, data, content_key=random_content_key, metadata={"t": 1}, associated_data=b"AAD")
|
||
assert len(chunks) == math.ceil(len(data) / cm.CHUNK_SIZE)
|
||
for i, ch in enumerate(chunks):
|
||
assert ch.chunk_index == i, f"Chunk index order broken: expected={i}, got={ch.chunk_index}"
|
||
assert ch.content_id == content_id
|
||
assert ch.encrypted_data and ch.chunk_hash
|
||
|
||
ok, err = cm.verify_chunk_integrity(ch, verify_signature=True)
|
||
assert ok, f"Chunk integrity failed: idx={i} err={err}"
|
||
|
||
reassembled = cm.reassemble_content(chunks, content_key=random_content_key, associated_data=b"AAD", expected_content_id=content_id)
|
||
approx_eq_bytes(reassembled, data, "Reassembled content mismatch")
|
||
|
||
|
||
@pytest.mark.skipif(ChunkManager is None or ContentCipher is None, reason="ChunkManager/ContentCipher not importable")
|
||
def test_empty_content_edge_case(content_cipher, random_content_key):
|
||
cm = ChunkManager(cipher=content_cipher)
|
||
data = b""
|
||
content_id = "empty-" + os.urandom(4).hex()
|
||
|
||
chunks = cm.split_content(content_id, data, content_key=random_content_key, metadata=None, associated_data=None)
|
||
# Для пустого контента возвращается один чанк с пустыми данными
|
||
assert len(chunks) == 1
|
||
ok, err = cm.verify_chunk_integrity(chunks[0], verify_signature=True)
|
||
assert ok, f"Empty chunk integrity failed: {err}"
|
||
|
||
restored = cm.reassemble_content(chunks, content_key=random_content_key, associated_data=None, expected_content_id=content_id)
|
||
approx_eq_bytes(restored, data, "Empty content roundtrip mismatch")
|
||
|
||
|
||
@pytest.mark.skipif(ChunkManager is None or ContentCipher is None, reason="ChunkManager/ContentCipher not importable")
|
||
def test_reassemble_mixed_content_id_should_fail(content_cipher, random_content_key):
|
||
cm = ChunkManager(cipher=content_cipher)
|
||
data1 = make_random_bytes(cm.CHUNK_SIZE + 1)
|
||
data2 = make_random_bytes(cm.CHUNK_SIZE + 1)
|
||
content_id1 = "cid1-" + os.urandom(4).hex()
|
||
content_id2 = "cid2-" + os.urandom(4).hex()
|
||
|
||
chunks1 = cm.split_content(content_id1, data1, content_key=random_content_key)
|
||
chunks2 = cm.split_content(content_id2, data2, content_key=random_content_key)
|
||
|
||
with pytest.raises(ValueError):
|
||
cm.reassemble_content([chunks1[0], chunks2[0]], content_key=random_content_key)
|
||
|
||
|
||
@pytest.mark.skipif(ChunkManager is None or ContentCipher is None, reason="ChunkManager/ContentCipher not importable")
|
||
def test_integrity_signature_missing(content_cipher, random_content_key, monkeypatch):
|
||
"""
|
||
Проверяем, что verify_chunk_integrity падает, если подпись отсутствует, а verify_signature=True.
|
||
Смоделируем отсутствие подписи, обнулив поле signature.
|
||
"""
|
||
cm = ChunkManager(cipher=content_cipher)
|
||
data = make_random_bytes(cm.CHUNK_SIZE // 2)
|
||
content_id = "cid-" + os.urandom(4).hex()
|
||
|
||
chunks = cm.split_content(content_id, data, content_key=random_content_key)
|
||
ch = chunks[0]
|
||
# Сотрем подпись
|
||
ch_no_sig = ContentChunk(
|
||
chunk_id=ch.chunk_id,
|
||
content_id=ch.content_id,
|
||
chunk_index=ch.chunk_index,
|
||
chunk_hash=ch.chunk_hash,
|
||
encrypted_data=ch.encrypted_data,
|
||
signature=None,
|
||
created_at=ch.created_at,
|
||
)
|
||
ok, err = cm.verify_chunk_integrity(ch_no_sig, verify_signature=True)
|
||
assert not ok and err == "missing chunk signature", f"Unexpected integrity result: ok={ok}, err={err}"
|
||
|
||
|
||
@pytest.mark.skipif(ChunkManager is None or ContentCipher is None, reason="ChunkManager/ContentCipher not importable")
|
||
def test_integrity_hash_mismatch(content_cipher, random_content_key):
|
||
cm = ChunkManager(cipher=content_cipher)
|
||
data = make_random_bytes(cm.CHUNK_SIZE // 2)
|
||
content_id = "cid-" + os.urandom(4).hex()
|
||
|
||
chunks = cm.split_content(content_id, data, content_key=random_content_key)
|
||
ch = chunks[0]
|
||
|
||
# Подменим байт зашифрованных данных (encrypted_data) и пересерилизируем в base64
|
||
raw = ch.encrypted_bytes()
|
||
if raw:
|
||
raw = raw[:-1] + bytes([(raw[-1] ^ 0x01)])
|
||
tampered_b64 = base64.b64encode(raw).decode("ascii")
|
||
|
||
ch_bad = ContentChunk(
|
||
chunk_id=ch.chunk_id,
|
||
content_id=ch.content_id,
|
||
chunk_index=ch.chunk_index,
|
||
chunk_hash=ch.chunk_hash, # старый хэш должен не совпасть
|
||
encrypted_data=tampered_b64,
|
||
signature=ch.signature,
|
||
created_at=ch.created_at,
|
||
)
|
||
|
||
ok, err = cm.verify_chunk_integrity(ch_bad, verify_signature=False)
|
||
assert not ok and err == "chunk_hash mismatch", f"Expected hash mismatch, got ok={ok}, err={err}" |