137 lines
6.2 KiB
Python
137 lines
6.2 KiB
Python
import base64
|
||
import os
|
||
import time
|
||
from typing import Dict, Any
|
||
|
||
import pytest
|
||
|
||
from .test_helpers import assert_dict_has_keys, approx_eq_bytes, make_random_bytes, measure_throughput
|
||
|
||
try:
|
||
from app.core.crypto import ContentCipher, get_ed25519_manager
|
||
except Exception:
|
||
ContentCipher = None # type: ignore
|
||
get_ed25519_manager = None # type: ignore
|
||
|
||
|
||
pytestmark = pytest.mark.crypto
|
||
|
||
|
||
@pytest.mark.skipif(ContentCipher is None, reason="ContentCipher is not importable")
|
||
def test_encrypt_decrypt_roundtrip(content_cipher, small_sample_bytes, random_content_key):
|
||
aad = b"associated-data"
|
||
meta = {"purpose": "unit-test", "case": "roundtrip"}
|
||
|
||
enc: Dict[str, Any] = content_cipher.encrypt_content(
|
||
plaintext=small_sample_bytes,
|
||
key=random_content_key,
|
||
metadata=meta,
|
||
associated_data=aad,
|
||
sign_with_ed25519=True,
|
||
)
|
||
assert_dict_has_keys(enc, ["ciphertext_b64", "nonce_b64", "tag_b64", "content_id", "metadata"])
|
||
ok, err = content_cipher.verify_content_integrity(enc, expected_metadata=meta, verify_signature=True)
|
||
assert ok, f"Integrity failed: {err}"
|
||
|
||
pt = content_cipher.decrypt_content(
|
||
ciphertext_b64=enc["ciphertext_b64"],
|
||
nonce_b64=enc["nonce_b64"],
|
||
tag_b64=enc["tag_b64"],
|
||
key=random_content_key,
|
||
associated_data=aad,
|
||
)
|
||
approx_eq_bytes(pt, small_sample_bytes, "Decrypted plaintext mismatch")
|
||
|
||
|
||
@pytest.mark.skipif(ContentCipher is None, reason="ContentCipher is not importable")
|
||
def test_aad_mismatch_should_fail(content_cipher, small_sample_bytes, random_content_key):
|
||
enc = content_cipher.encrypt_content(
|
||
plaintext=small_sample_bytes, key=random_content_key, metadata=None, associated_data=b"AAD"
|
||
)
|
||
with pytest.raises(Exception):
|
||
content_cipher.decrypt_content(
|
||
ciphertext_b64=enc["ciphertext_b64"],
|
||
nonce_b64=enc["nonce_b64"],
|
||
tag_b64=enc["tag_b64"],
|
||
key=random_content_key,
|
||
associated_data=b"WRONG",
|
||
)
|
||
|
||
|
||
@pytest.mark.skipif(ContentCipher is None, reason="ContentCipher is not importable")
|
||
def test_tag_tamper_should_fail(content_cipher, small_sample_bytes, random_content_key):
|
||
enc = content_cipher.encrypt_content(
|
||
plaintext=small_sample_bytes, key=random_content_key, metadata=None, associated_data=None
|
||
)
|
||
bad_tag = base64.b64encode(os.urandom(16)).decode("ascii")
|
||
with pytest.raises(Exception):
|
||
content_cipher.decrypt_content(
|
||
ciphertext_b64=enc["ciphertext_b64"],
|
||
nonce_b64=enc["nonce_b64"],
|
||
tag_b64=bad_tag,
|
||
key=random_content_key,
|
||
associated_data=None,
|
||
)
|
||
|
||
|
||
@pytest.mark.skipif(ContentCipher is None, reason="ContentCipher is not importable")
|
||
def test_content_id_determinism(content_cipher, random_content_key):
|
||
data = b"same data"
|
||
meta = {"k": "v"}
|
||
enc1 = content_cipher.encrypt_content(data, random_content_key, metadata=meta, associated_data=b"A")
|
||
enc2 = content_cipher.encrypt_content(data, random_content_key, metadata=meta, associated_data=b"A")
|
||
# nonce случайный => content_id должен отличаться. Проверим отрицательный кейс:
|
||
assert enc1["content_id"] != enc2["content_id"], "content_id must include nonce/tag randomness"
|
||
|
||
# но при одинаковом ciphertext/nonce/tag/meta content_id детерминирован — смоделируем напрямую
|
||
# Это edge-case контроля: сериализация verify_content_integrity проверяет вычисление ID
|
||
|
||
|
||
@pytest.mark.skipif(ContentCipher is None, reason="ContentCipher is not importable")
|
||
def test_integrity_metadata_mismatch(content_cipher, small_sample_bytes, random_content_key):
|
||
enc = content_cipher.encrypt_content(
|
||
small_sample_bytes, random_content_key, metadata={"x": 1}, associated_data=None
|
||
)
|
||
ok, err = content_cipher.verify_content_integrity(enc, expected_metadata={"x": 2}, verify_signature=False)
|
||
assert not ok and "Metadata mismatch" in (err or ""), f"Unexpected integrity result: ok={ok}, err={err}"
|
||
|
||
|
||
@pytest.mark.skipif(ContentCipher is None or get_ed25519_manager is None, reason="Crypto not importable")
|
||
def test_signature_validation(content_cipher, small_sample_bytes, random_content_key):
|
||
enc = content_cipher.encrypt_content(
|
||
plaintext=small_sample_bytes, key=random_content_key, metadata={"sig": True}, associated_data=None, sign_with_ed25519=True
|
||
)
|
||
ok, err = content_cipher.verify_content_integrity(enc, expected_metadata={"sig": True}, verify_signature=True)
|
||
assert ok, f"Signature must be valid: {err}"
|
||
|
||
# Повредим payload: изменим ciphertext
|
||
enc_bad = dict(enc)
|
||
raw = base64.b64decode(enc_bad["ciphertext_b64"])
|
||
raw = (raw[:-1] + bytes([(raw[-1] ^ 0xFF)])) if raw else os.urandom(1)
|
||
enc_bad["ciphertext_b64"] = base64.b64encode(raw).decode("ascii")
|
||
|
||
ok2, err2 = content_cipher.verify_content_integrity(enc_bad, expected_metadata={"sig": True}, verify_signature=True)
|
||
assert not ok2, "Signature verification must fail after tampering"
|
||
assert err2 in {"content_id mismatch", "Invalid signature", "Signature verification error"}, f"err2={err2}"
|
||
|
||
|
||
@pytest.mark.performance
|
||
@pytest.mark.skipif(ContentCipher is None, reason="ContentCipher is not importable")
|
||
def test_performance_large_payload(content_cipher, random_content_key, temp_large_bytes):
|
||
start = time.perf_counter()
|
||
enc = content_cipher.encrypt_content(temp_large_bytes, random_content_key, metadata=None, associated_data=None)
|
||
enc_elapsed = time.perf_counter() - start
|
||
|
||
start = time.perf_counter()
|
||
dec = content_cipher.decrypt_content(
|
||
enc["ciphertext_b64"], enc["nonce_b64"], enc["tag_b64"], random_content_key, associated_data=None
|
||
)
|
||
dec_elapsed = time.perf_counter() - start
|
||
|
||
assert len(dec) == len(temp_large_bytes), "Decrypted size mismatch"
|
||
encrypt_thr, msg1 = measure_throughput("encrypt", len(temp_large_bytes), enc_elapsed)
|
||
decrypt_thr, msg2 = measure_throughput("decrypt", len(temp_large_bytes), dec_elapsed)
|
||
# Не жесткие пороги, но печатаем метрики
|
||
print(msg1)
|
||
print(msg2)
|
||
assert encrypt_thr > 10_000_000 and decrypt_thr > 10_000_000, "Throughput too low for AES-GCM baseline" |