from __future__ import annotations import base64 import logging from dataclasses import asdict from hashlib import sha256 from typing import Any, Dict, Optional, Tuple from app.core.crypto import get_ed25519_manager from app.core.crypto.content_cipher import ContentCipher from app.core.models.validation.validation_models import ValidationResult, ContentSignature logger = logging.getLogger(__name__) class ContentValidator: """ Основной валидатор контента: - Проверка подписи источника (Ed25519) - Проверка целостности контента/объектов (checksum/content_id) - Интеграция с ContentCipher для дополнительной верификации """ def __init__(self, cipher: Optional[ContentCipher] = None): self.cipher = cipher or ContentCipher() logger.debug("ContentValidator initialized") def verify_source_signature( self, payload: Dict[str, Any], signature_b64: Optional[str], public_key_hex: Optional[str], ) -> ValidationResult: """ Проверка Ed25519 подписи источника. - payload должен сериализоваться идентично тому, что подписывалось. - signature_b64 - base64 строка подписи. - public_key_hex - hex публичного ключа источника. """ try: if not signature_b64 or not public_key_hex: logger.warning("verify_source_signature: missing signature/public key") return ValidationResult(ok=False, reason="missing_signature_or_public_key") crypto_mgr = get_ed25519_manager() ok = crypto_mgr.verify_signature(payload, signature_b64, public_key_hex) if not ok: logger.warning("verify_source_signature: invalid signature") return ValidationResult(ok=False, reason="invalid_signature") logger.info("verify_source_signature: signature valid") return ValidationResult(ok=True, details={"signer_key": public_key_hex}) except Exception as e: logger.exception("verify_source_signature error") return ValidationResult(ok=False, reason=str(e)) def check_content_integrity( self, encrypted_obj: Dict[str, Any], expected_metadata: Optional[Dict[str, Any]] = None, verify_signature: bool = True, ) -> ValidationResult: """ Делегирует проверку целостности ContentCipher: - сверка content_id = sha256(ciphertext||nonce||tag||metadata_json) - опциональная проверка встроенной подписи encrypted_obj (если есть signature/signер_pubkey) """ ok, err = self.cipher.verify_content_integrity( encrypted_obj=encrypted_obj, expected_metadata=expected_metadata, verify_signature=verify_signature, ) if not ok: return ValidationResult(ok=False, reason=err or "integrity_failed") return ValidationResult(ok=True) def validate_content( self, content_meta: Dict[str, Any], *, checksum: Optional[str] = None, source_signature: Optional[ContentSignature] = None, encrypted_obj: Optional[Dict[str, Any]] = None, verify_ed25519: bool = True, ) -> ValidationResult: """ Комплексная проверка валидности контента: 1) Если указан checksum (:), сверяем. 2) Если указан source_signature, проверяем Ed25519 подпись источника. 3) Если передан encrypted_obj, выполняем углублённую проверку ContentCipher. content_meta — произвольная структура метаданных, которая была объектом подписи источника. """ # 1. Проверка checksum (формат: "sha256:") if checksum: try: algo, hexval = checksum.split(":", 1) algo = algo.lower() if algo != "sha256": logger.warning("validate_content: unsupported checksum algo: %s", algo) return ValidationResult(ok=False, reason="unsupported_checksum_algo", details={"algo": algo}) # Вычислить sha256 по ожидаемым данным невозможно без исходных байт, # поэтому здесь лишь проверка формата. Фактическая сверка должна происходить # на уровне получателя с использованием известного буфера. if not all(c in "0123456789abcdef" for c in hexval.lower()) or len(hexval) != 64: return ValidationResult(ok=False, reason="invalid_checksum_format") logger.debug("validate_content: checksum format looks valid (sha256)") except Exception: return ValidationResult(ok=False, reason="invalid_checksum") # 2. Проверка подписи источника (если указана) if verify_ed25519 and source_signature: sig_check = self.verify_source_signature( payload=content_meta, signature_b64=source_signature.signature, public_key_hex=source_signature.public_key_hex, ) if not sig_check.ok: return ValidationResult(ok=False, reason="source_signature_invalid", details=sig_check.to_dict()) # 3. Проверка целостности зашифрованного объекта (если присутствует) if encrypted_obj: integ = self.check_content_integrity( encrypted_obj=encrypted_obj, expected_metadata=encrypted_obj.get("metadata"), verify_signature=verify_ed25519, ) if not integ.ok: return ValidationResult(ok=False, reason="encrypted_integrity_invalid", details=integ.to_dict()) logger.info("validate_content: content validation passed") return ValidationResult(ok=True)