uploader-bot/app/core/validation/integrity_checker.py

119 lines
5.2 KiB
Python

from __future__ import annotations
import base64
import logging
from typing import Any, Dict, Iterable, List, Optional, Tuple
from app.core.content.chunk_manager import ChunkManager
from app.core.crypto.content_cipher import ContentCipher
from app.core.models.content.chunk import ContentChunk
from app.core.models.validation.validation_models import ValidationResult
logger = logging.getLogger(__name__)
class IntegrityChecker:
"""
Расширенная проверка целостности контента/чанков поверх возможностей ChunkManager:
- Поблочная проверка каждой записи (хеш/подпись)
- Обнаружение повреждений и дубликатов
- Проверка "цепочки" контента (согласованность content_id/индексов)
"""
def __init__(self, chunk_manager: Optional[ChunkManager] = None, cipher: Optional[ContentCipher] = None):
self.chunk_manager = chunk_manager or ChunkManager()
self.cipher = cipher or self.chunk_manager.cipher
logger.debug("IntegrityChecker initialized")
def check_chunk_integrity(self, chunk: ContentChunk, verify_signature: bool = True) -> ValidationResult:
"""
Проверяет единичный чанк, используя ChunkManager.verify_chunk_integrity.
"""
ok, err = self.chunk_manager.verify_chunk_integrity(chunk, verify_signature=verify_signature)
if not ok:
logger.warning("check_chunk_integrity: chunk invalid: %s -> %s", chunk.chunk_id, err)
return ValidationResult(ok=False, reason=err or "chunk_invalid", details={"chunk_id": chunk.chunk_id})
return ValidationResult(ok=True, details={"chunk_id": chunk.chunk_id})
def detect_corruption(self, chunks: Iterable[ContentChunk]) -> ValidationResult:
"""
Выявляет повреждения и аномалии:
- дубликаты chunk_id/chunk_index
- несовпадение content_id между чанками
- несогласованность индексов (пропуски/повторы)
"""
try:
chunks_list: List[ContentChunk] = sorted(list(chunks), key=lambda c: c.chunk_index)
if not chunks_list:
return ValidationResult(ok=True, details={"message": "no chunks"})
content_ids = {c.content_id for c in chunks_list}
if len(content_ids) != 1:
return ValidationResult(ok=False, reason="mixed_content_ids", details={"content_ids": list(content_ids)})
seen_ids = set()
seen_indexes = set()
duplicates: List[str] = []
gaps: List[int] = []
for c in chunks_list:
if c.chunk_id in seen_ids:
duplicates.append(c.chunk_id)
else:
seen_ids.add(c.chunk_id)
if c.chunk_index in seen_indexes:
duplicates.append(f"index:{c.chunk_index}")
else:
seen_indexes.add(c.chunk_index)
if chunks_list:
min_idx = chunks_list[0].chunk_index
max_idx = chunks_list[-1].chunk_index
expected = set(range(min_idx, max_idx + 1))
gaps = sorted(list(expected - seen_indexes))
if duplicates or gaps:
return ValidationResult(
ok=False,
reason="structure_anomaly",
details={"duplicates": duplicates, "missing_indexes": gaps},
)
return ValidationResult(ok=True, details={"content_id": chunks_list[0].content_id})
except Exception as e:
logger.exception("detect_corruption error")
return ValidationResult(ok=False, reason=str(e))
def verify_content_chain(
self,
chunks: Iterable[ContentChunk],
verify_signatures: bool = True,
) -> ValidationResult:
"""
Полная проверка набора чанков:
1) detect_corruption на структуру/последовательность
2) check_chunk_integrity для каждого чанка (хеш/подпись)
"""
try:
chunks_list = list(chunks)
structure = self.detect_corruption(chunks_list)
if not structure.ok:
return structure
errors: List[Dict[str, Any]] = []
ok_count = 0
for c in chunks_list:
res = self.check_chunk_integrity(c, verify_signature=verify_signatures)
if not res.ok:
errors.append({"chunk_id": c.chunk_id, "error": res.reason})
else:
ok_count += 1
if errors:
return ValidationResult(ok=False, reason="chain_integrity_failed", details={"verified_ok": ok_count, "errors": errors})
return ValidationResult(ok=True, details={"verified_ok": ok_count})
except Exception as e:
logger.exception("verify_content_chain error")
return ValidationResult(ok=False, reason=str(e))