uploader-bot/tests/test_sync.py

179 lines
7.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import json
from typing import Dict, Any, List, Optional
import pytest
pytestmark = pytest.mark.sync
try:
from app.core.content.sync_manager import ContentSyncManager
from app.core.models.content.chunk import ContentChunk
except Exception:
ContentSyncManager = None # type: ignore
ContentChunk = None # type: ignore
class _DummyChunk:
"""
Минимальный дублер ContentChunk, если импорт не доступен (локальные smoke-тесты).
"""
def __init__(self, **kw):
self.chunk_id = kw["chunk_id"]
self.content_id = kw["content_id"]
self.chunk_index = kw["chunk_index"]
self.chunk_hash = kw["chunk_hash"]
self.encrypted_data = kw["encrypted_data"]
self.signature = kw.get("signature")
self.created_at = kw.get("created_at")
def to_dict(self) -> Dict[str, Any]:
return {
"chunk_id": self.chunk_id,
"content_id": self.content_id,
"chunk_index": self.chunk_index,
"chunk_hash": self.chunk_hash,
"encrypted_data": self.encrypted_data,
"signature": self.signature,
"created_at": self.created_at,
}
@staticmethod
def from_dict(d: Dict[str, Any]) -> "_DummyChunk":
return _DummyChunk(**d)
def encrypted_bytes(self) -> bytes:
import base64
return base64.b64decode(self.encrypted_data.encode("ascii"))
@pytest.mark.asyncio
@pytest.mark.skipif(ContentSyncManager is None, reason="ContentSyncManager not importable")
async def test_provide_chunks_happy_path(monkeypatch):
"""
Проверяем, что provide_chunks корректно собирает и валидирует выдаваемые чанки.
"""
mgr = ContentSyncManager()
# Сконструируем валидный чанк из фикстур ChunkManager через публичную логику
# Для независимости теста — создадим минимальную подделку валидного чанка после split_content.
# Мы не хотим здесь повторять split_content, этот тест — про provide_chunks.
# Поэтому замокаем verify_chunk_integrity, чтобы она "пропускала" подготовленные данные.
async def ok_verify(chunk):
return True, None
monkeypatch.setattr(mgr, "verify_chunk_integrity", ok_verify)
# Хранилище возвращает объект-чанк (either ContentChunk or dummy)
sample = {
"chunk_id": "ch_1",
"content_id": "cid_123",
"chunk_index": 0,
"chunk_hash": "f00d",
"encrypted_data": "AA==", # base64 of \x00
"signature": "sig",
"created_at": "2025-01-01T00:00:00Z",
}
def storage_reader(content_id: str, index: int):
if content_id == "cid_123" and index == 0:
if ContentChunk:
return ContentChunk.from_dict(sample)
return _DummyChunk.from_dict(sample)
return None
res = await mgr.provide_chunks("cid_123", [0, 1], storage_reader=storage_reader, batch_limit=10)
assert "chunks" in res and "errors" in res
assert len(res["chunks"]) == 1, f"Expected one provided chunk, got {len(res['chunks'])}"
assert any(e["index"] == 1 for e in res["errors"]), f"Missing not_found error for index 1"
@pytest.mark.asyncio
@pytest.mark.skipif(ContentSyncManager is None, reason="ContentSyncManager not importable")
async def test_request_chunks_aggregates_and_validates(monkeypatch):
"""
Проверяем логику агрегирования: request_chunks делает несколько батчей, валидирует чанки
и собирает общее резюме.
"""
mgr = ContentSyncManager()
# Подменим NodeClient внутри ContentSyncManager.request_chunks.
class _FakeResp:
def __init__(self, status: int, data: Dict[str, Any]):
self.status = status
self._data = data
async def json(self):
return self._data
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
return False
class _FakeSession:
def __init__(self, payloads: List[Dict[str, Any]], statuses: List[int]):
self._payloads = payloads
self._statuses = statuses
self._i = 0
def post(self, endpoint: str, **req):
i = self._i
self._i += 1
return _FakeResp(self._statuses[i], self._payloads[i])
class _FakeClient:
def __init__(self, session):
self.session = session
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
return False
async def _create_signed_request(self, action: str, data: Dict[str, Any], target_url: str):
return {"json": {"action": action, "data": data}}
# Замокаем verify_chunk_integrity — принимаем только chunk_id != "bad"
async def verify_chunk(chunk):
if getattr(chunk, "chunk_id", None) == "bad":
return False, "invalid"
return True, None
monkeypatch.setattr(mgr, "verify_chunk_integrity", verify_chunk)
# Подменяем NodeClient конструктор на фейк с предопределенными ответами
payloads = [
{"data": {"chunks": [
{"chunk_id": "good1", "content_id": "cid", "chunk_index": 0, "chunk_hash": "h1", "encrypted_data": "AA==", "signature": "s", "created_at": None},
{"chunk_id": "bad", "content_id": "cid", "chunk_index": 1, "chunk_hash": "h2", "encrypted_data": "AA==", "signature": "s", "created_at": None},
]}},
{"data": {"chunks": [
{"chunk_id": "good2", "content_id": "cid", "chunk_index": 2, "chunk_hash": "h3", "encrypted_data": "AA==", "signature": "s", "created_at": None},
]}},
]
statuses = [200, 200]
# Патчим класс NodeClient в модуле sync_manager
import app.core.content.sync_manager as sm # type: ignore
monkeypatch.setattr(sm, "NodeClient", lambda: _FakeClient(_FakeSession(payloads, statuses)))
res = await mgr.request_chunks("http://node-A", "cid", [0, 1, 2], batch_size=2)
assert res["requested"] == 3
assert res["received"] == 2, f"Expected 2 validated chunks, got {res['received']}"
assert any(e.get("chunk_id") == "bad" for e in res["errors"]), f"Expected invalid chunk error present"
@pytest.mark.asyncio
@pytest.mark.skipif(ContentSyncManager is None, reason="ContentSyncManager not importable")
async def test_sync_content_parallel_aggregation(monkeypatch):
"""
Проверка параллельной агрегации результатов от нескольких нод.
"""
mgr = ContentSyncManager()
async def fake_request(node_url: str, content_id: str, missing: List[int]):
if "A" in node_url:
return {"requested": len(missing), "received": 2, "chunks": [], "errors": []}
if "B" in node_url:
return {"requested": len(missing), "received": 1, "chunks": [], "errors": [{"batch": [0], "error": "HTTP 500"}]}
return {"requested": len(missing), "received": 0, "chunks": [], "errors": []}
monkeypatch.setattr(mgr, "request_chunks", fake_request)
res = await mgr.sync_content(["http://node-A", "http://node-B", "http://node-C"], "cid", have_indexes=[0], total_chunks=4)
assert res["downloaded"] == 3, f"downloaded mismatch: {res}"
assert "details" in res and len(res["details"]) == 3