uploader-bot/tests/test_crypto.py

137 lines
6.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import base64
import os
import time
from typing import Dict, Any
import pytest
from .test_helpers import assert_dict_has_keys, approx_eq_bytes, make_random_bytes, measure_throughput
try:
from app.core.crypto import ContentCipher, get_ed25519_manager
except Exception:
ContentCipher = None # type: ignore
get_ed25519_manager = None # type: ignore
pytestmark = pytest.mark.crypto
@pytest.mark.skipif(ContentCipher is None, reason="ContentCipher is not importable")
def test_encrypt_decrypt_roundtrip(content_cipher, small_sample_bytes, random_content_key):
aad = b"associated-data"
meta = {"purpose": "unit-test", "case": "roundtrip"}
enc: Dict[str, Any] = content_cipher.encrypt_content(
plaintext=small_sample_bytes,
key=random_content_key,
metadata=meta,
associated_data=aad,
sign_with_ed25519=True,
)
assert_dict_has_keys(enc, ["ciphertext_b64", "nonce_b64", "tag_b64", "content_id", "metadata"])
ok, err = content_cipher.verify_content_integrity(enc, expected_metadata=meta, verify_signature=True)
assert ok, f"Integrity failed: {err}"
pt = content_cipher.decrypt_content(
ciphertext_b64=enc["ciphertext_b64"],
nonce_b64=enc["nonce_b64"],
tag_b64=enc["tag_b64"],
key=random_content_key,
associated_data=aad,
)
approx_eq_bytes(pt, small_sample_bytes, "Decrypted plaintext mismatch")
@pytest.mark.skipif(ContentCipher is None, reason="ContentCipher is not importable")
def test_aad_mismatch_should_fail(content_cipher, small_sample_bytes, random_content_key):
enc = content_cipher.encrypt_content(
plaintext=small_sample_bytes, key=random_content_key, metadata=None, associated_data=b"AAD"
)
with pytest.raises(Exception):
content_cipher.decrypt_content(
ciphertext_b64=enc["ciphertext_b64"],
nonce_b64=enc["nonce_b64"],
tag_b64=enc["tag_b64"],
key=random_content_key,
associated_data=b"WRONG",
)
@pytest.mark.skipif(ContentCipher is None, reason="ContentCipher is not importable")
def test_tag_tamper_should_fail(content_cipher, small_sample_bytes, random_content_key):
enc = content_cipher.encrypt_content(
plaintext=small_sample_bytes, key=random_content_key, metadata=None, associated_data=None
)
bad_tag = base64.b64encode(os.urandom(16)).decode("ascii")
with pytest.raises(Exception):
content_cipher.decrypt_content(
ciphertext_b64=enc["ciphertext_b64"],
nonce_b64=enc["nonce_b64"],
tag_b64=bad_tag,
key=random_content_key,
associated_data=None,
)
@pytest.mark.skipif(ContentCipher is None, reason="ContentCipher is not importable")
def test_content_id_determinism(content_cipher, random_content_key):
data = b"same data"
meta = {"k": "v"}
enc1 = content_cipher.encrypt_content(data, random_content_key, metadata=meta, associated_data=b"A")
enc2 = content_cipher.encrypt_content(data, random_content_key, metadata=meta, associated_data=b"A")
# nonce случайный => content_id должен отличаться. Проверим отрицательный кейс:
assert enc1["content_id"] != enc2["content_id"], "content_id must include nonce/tag randomness"
# но при одинаковом ciphertext/nonce/tag/meta content_id детерминирован — смоделируем напрямую
# Это edge-case контроля: сериализация verify_content_integrity проверяет вычисление ID
@pytest.mark.skipif(ContentCipher is None, reason="ContentCipher is not importable")
def test_integrity_metadata_mismatch(content_cipher, small_sample_bytes, random_content_key):
enc = content_cipher.encrypt_content(
small_sample_bytes, random_content_key, metadata={"x": 1}, associated_data=None
)
ok, err = content_cipher.verify_content_integrity(enc, expected_metadata={"x": 2}, verify_signature=False)
assert not ok and "Metadata mismatch" in (err or ""), f"Unexpected integrity result: ok={ok}, err={err}"
@pytest.mark.skipif(ContentCipher is None or get_ed25519_manager is None, reason="Crypto not importable")
def test_signature_validation(content_cipher, small_sample_bytes, random_content_key):
enc = content_cipher.encrypt_content(
plaintext=small_sample_bytes, key=random_content_key, metadata={"sig": True}, associated_data=None, sign_with_ed25519=True
)
ok, err = content_cipher.verify_content_integrity(enc, expected_metadata={"sig": True}, verify_signature=True)
assert ok, f"Signature must be valid: {err}"
# Повредим payload: изменим ciphertext
enc_bad = dict(enc)
raw = base64.b64decode(enc_bad["ciphertext_b64"])
raw = (raw[:-1] + bytes([(raw[-1] ^ 0xFF)])) if raw else os.urandom(1)
enc_bad["ciphertext_b64"] = base64.b64encode(raw).decode("ascii")
ok2, err2 = content_cipher.verify_content_integrity(enc_bad, expected_metadata={"sig": True}, verify_signature=True)
assert not ok2, "Signature verification must fail after tampering"
assert err2 in {"content_id mismatch", "Invalid signature", "Signature verification error"}, f"err2={err2}"
@pytest.mark.performance
@pytest.mark.skipif(ContentCipher is None, reason="ContentCipher is not importable")
def test_performance_large_payload(content_cipher, random_content_key, temp_large_bytes):
start = time.perf_counter()
enc = content_cipher.encrypt_content(temp_large_bytes, random_content_key, metadata=None, associated_data=None)
enc_elapsed = time.perf_counter() - start
start = time.perf_counter()
dec = content_cipher.decrypt_content(
enc["ciphertext_b64"], enc["nonce_b64"], enc["tag_b64"], random_content_key, associated_data=None
)
dec_elapsed = time.perf_counter() - start
assert len(dec) == len(temp_large_bytes), "Decrypted size mismatch"
encrypt_thr, msg1 = measure_throughput("encrypt", len(temp_large_bytes), enc_elapsed)
decrypt_thr, msg2 = measure_throughput("decrypt", len(temp_large_bytes), dec_elapsed)
# Не жесткие пороги, но печатаем метрики
print(msg1)
print(msg2)
assert encrypt_thr > 10_000_000 and decrypt_thr > 10_000_000, "Throughput too low for AES-GCM baseline"