import base64 import os import time from typing import Any, Dict, List import pytest from .test_helpers import make_random_bytes, approx_eq_bytes, measure_throughput pytestmark = pytest.mark.e2e try: from app.core.crypto import ContentCipher from app.core.content.chunk_manager import ChunkManager except Exception: ContentCipher = None # type: ignore ChunkManager = None # type: ignore try: from fastapi.testclient import TestClient from app.fastapi_main import app as fastapi_app # type: ignore except Exception: TestClient = None # type: ignore fastapi_app = None # type: ignore @pytest.mark.skipif(any(x is None for x in [ChunkManager, ContentCipher]), reason="Core components not importable") def test_full_flow_local_crypto_chunking(content_cipher, random_content_key): """ Сквозной тест локального пайплайна: 1) Генерация ключа контента 2) Шифрование полного файла для получения content_id 3) Разбиение на чанки и подпись каждого чанка 4) Сборка обратно и сверка исходных данных """ data = make_random_bytes(2_500_000) # ~2.5MB cm = ChunkManager(cipher=content_cipher) # content_id может быть вычислен из первого encrypt (через ContentCipher), # но split_content формирует metadata с content_id, потому применим детерминированный внешний ID. content_id = "e2e-" + os.urandom(8).hex() start_split = time.perf_counter() chunks = cm.split_content(content_id, data, content_key=random_content_key, metadata={"flow": "e2e"}, associated_data=b"aad") split_elapsed = time.perf_counter() - start_split # Проверка целостности каждого чанка for ch in chunks: ok, err = cm.verify_chunk_integrity(ch, verify_signature=True) assert ok, f"Chunk integrity failed: {err}" start_reasm = time.perf_counter() restored = cm.reassemble_content(chunks, content_key=random_content_key, associated_data=b"aad", expected_content_id=content_id) reasm_elapsed = time.perf_counter() - start_reasm approx_eq_bytes(restored, data, "E2E restored mismatch") thr1, msg1 = measure_throughput("split", len(data), split_elapsed) thr2, msg2 = measure_throughput("reassemble", len(data), reasm_elapsed) print(msg1) print(msg2) assert thr1 > 5_000_000 and thr2 > 5_000_000, "Throughput below baseline for E2E local flow" @pytest.mark.skipif(TestClient is None or fastapi_app is None, reason="FastAPI app not importable") def test_e2e_api_smoke_upload_access_flow(fastapi_client: Any, small_sample_bytes: bytes): """ Сквозной smoke через HTTP API: - Проверка доступности ключевых endpoint'ов - Имитируем upload init/chunk/complete с минимальным контрактом (ожидаем мягкие статусы без реальной логики) - Проверяем доступность системных и контентных роутов """ # Health r = fastapi_client.get("/api/system/health") assert r.status_code in (200, 503), f"health unexpected: {r.status_code}" # Инициируем "загрузку" (проверяем контракт/наличие маршрута) init = fastapi_client.get("/api/storage") assert init.status_code in (200, 404, 405), f"/api/storage unexpected: {init.status_code}" # Заглушка chunk upload r2 = fastapi_client.post("/api/storage/upload/chunk", json={ "upload_id": "UPLOAD_E2E", "index": 0, "data": base64.b64encode(small_sample_bytes).decode("ascii") }) assert r2.status_code in (200, 400, 401, 404, 405), f"upload/chunk unexpected: {r2.status_code}" # complete r3 = fastapi_client.post("/api/storage/upload/complete", json={"upload_id": "UPLOAD_E2E"}) assert r3.status_code in (200, 400, 401, 404, 405), f"upload/complete unexpected: {r3.status_code}" # Доступ к контенту (маршруты из docs) for path in ["/content.view/unknown", "/api/system/info", "/api/node/network/status"]: x = fastapi_client.get(path) assert x.status_code in (200, 401, 404, 405), f"{path} unexpected status {x.status_code}"