""" MY Network v3.0 - ContentCipher (AES-256-GCM) for uploader-bot Реализует шифрование контента с помощью AES-256-GCM и интеграцию с Ed25519Manager для подписи зашифрованного контента и проверки целостности. Адаптация идей из DEPRECATED: - См. базовую AES логику ([`DEPRECATED-uploader-bot/app/core/_crypto/cipher.py`](DEPRECATED-uploader-bot/app/core/_crypto/cipher.py:1)) - См. работу с контентом ([`DEPRECATED-uploader-bot/app/core/_crypto/content.py`](DEPRECATED-uploader-bot/app/core/_crypto/content.py:1)) Отличия новой реализации: - Используем AES-256-GCM (аутентифицированное шифрование) вместо CBC+PAD - Формируем content_id как SHA-256 от (ciphertext || nonce || tag || metadata_json) - Подписываем структуру EncryptedContent через Ed25519Manager """ from __future__ import annotations import base64 import json import logging import os from dataclasses import asdict from hashlib import sha256 from typing import Any, Dict, Optional, Tuple from cryptography.hazmat.primitives.ciphers.aead import AESGCM try: # Импорт менеджера подписи из текущего модуля crypto from app.core.crypto import get_ed25519_manager except Exception: # Ленивая инициализация без разрыва импорта (например, при статическом анализе) get_ed25519_manager = None # type: ignore logger = logging.getLogger(__name__) class ContentCipher: """ Класс шифрования контента AES-256-GCM с интеграцией Ed25519 подписи. Ключевая информация: - generate_content_key() -> 32 байта (AES-256) - encrypt_content() -> (ciphertext, nonce, tag, content_id, signature, signer_pubkey) - decrypt_content() -> исходные данные при валидной аутентификации - verify_content_integrity() -> проверка подписи и content_id """ NONCE_SIZE = 12 # Рекомендуемый размер nonce для AES-GCM KEY_SIZE = 32 # 256-bit def __init__(self): # В логах не пишем чувствительные данные logger.debug("ContentCipher initialized (AES-256-GCM)") @staticmethod def generate_content_key(seed: Optional[bytes] = None) -> bytes: """ Генерация ключа шифрования контента (32 байта). Если передан seed (как в DEPRECATED подходе), дополнительно хэшируем SHA-256. """ if seed is not None: assert isinstance(seed, (bytes, bytearray)), "seed must be bytes" key = sha256(seed).digest() logger.debug("Content key generated from seed via SHA-256") return key # Без seed — криптографически стойкая генерация key = os.urandom(ContentCipher.KEY_SIZE) logger.debug("Random content key generated") return key @staticmethod def _compute_content_id(ciphertext: bytes, nonce: bytes, tag: bytes, metadata: Optional[Dict[str, Any]]) -> str: """ content_id = HEX(SHA-256(ciphertext || nonce || tag || json(metadata, sorted))) """ md_json = b"{}" if metadata: md_json = json.dumps(metadata, sort_keys=True, ensure_ascii=False).encode("utf-8") digest = sha256(ciphertext + nonce + tag + md_json).hexdigest() logger.debug("Computed content_id via SHA-256 over ciphertext+nonce+tag+metadata_json") return digest def encrypt_content( self, plaintext: bytes, key: bytes, metadata: Optional[Dict[str, Any]] = None, associated_data: Optional[bytes] = None, sign_with_ed25519: bool = True, ) -> Dict[str, Any]: """ Шифрует данные AES-256-GCM и возвращает структуру с полями: { ciphertext_b64, nonce_b64, tag_b64, content_id, metadata, signature, signer_pubkey } Примечания: - associated_data (AAD) включается в AEAD (не шифруется, но аутентифицируется). - signature покрывает сериализованную структуру без signature поля. """ assert isinstance(plaintext, (bytes, bytearray)), "plaintext must be bytes" assert isinstance(key, (bytes, bytearray)) and len(key) == self.KEY_SIZE, "key must be 32 bytes" aesgcm = AESGCM(key) nonce = os.urandom(self.NONCE_SIZE) # Шифруем: AESGCM возвращает ciphertext||tag в одном буфере ct_with_tag = aesgcm.encrypt(nonce, plaintext, associated_data) # Последние 16 байт — GCM tag tag = ct_with_tag[-16:] ciphertext = ct_with_tag[:-16] # content_id по требованиям content_id = self._compute_content_id(ciphertext, nonce, tag, metadata) # Подготовка объекта для подписи payload = { "ciphertext_b64": base64.b64encode(ciphertext).decode("ascii"), "nonce_b64": base64.b64encode(nonce).decode("ascii"), "tag_b64": base64.b64encode(tag).decode("ascii"), "content_id": content_id, "metadata": metadata or {}, } signature = None signer_pubkey = None if sign_with_ed25519 and get_ed25519_manager is not None: try: crypto_mgr = get_ed25519_manager() signature = crypto_mgr.sign_message(payload) signer_pubkey = crypto_mgr.public_key_hex logger.debug("Encrypted payload signed with Ed25519") except Exception as e: # Не блокируем шифрование при проблемах подписи, но логируем logger.error(f"Failed to sign encrypted payload: {e}") result = { **payload, "signature": signature, "signer_pubkey": signer_pubkey, } logger.info(f"Content encrypted: content_id={content_id}, has_signature={signature is not None}") return result def decrypt_content( self, ciphertext_b64: str, nonce_b64: str, tag_b64: str, key: bytes, associated_data: Optional[bytes] = None, ) -> bytes: """ Расшифровывает данные AES-256-GCM. Бросает исключение при неверной аутентификации (tag/AAD/nonce). """ assert isinstance(key, (bytes, bytearray)) and len(key) == self.KEY_SIZE, "key must be 32 bytes" ciphertext = base64.b64decode(ciphertext_b64) nonce = base64.b64decode(nonce_b64) tag = base64.b64decode(tag_b64) aesgcm = AESGCM(key) pt = aesgcm.decrypt(nonce, ciphertext + tag, associated_data) logger.info("Content decrypted successfully") return pt def verify_content_integrity( self, encrypted_obj: Dict[str, Any], expected_metadata: Optional[Dict[str, Any]] = None, verify_signature: bool = True, ) -> Tuple[bool, Optional[str]]: """ Проверяет: - content_id соответствует данным (ciphertext/nonce/tag/metadata) - при наличии verify_signature и signature/signer_pubkey — валидность подписи Возвращает: (OK, error_message) """ try: # Сначала проверим content_id ciphertext_b64 = encrypted_obj.get("ciphertext_b64") nonce_b64 = encrypted_obj.get("nonce_b64") tag_b64 = encrypted_obj.get("tag_b64") metadata = encrypted_obj.get("metadata") or {} if expected_metadata is not None and expected_metadata != metadata: return False, "Metadata mismatch" if not (ciphertext_b64 and nonce_b64 and tag_b64): return False, "Missing encrypted fields" ciphertext = base64.b64decode(ciphertext_b64) nonce = base64.b64decode(nonce_b64) tag = base64.b64decode(tag_b64) computed_id = self._compute_content_id(ciphertext, nonce, tag, metadata) if computed_id != encrypted_obj.get("content_id"): return False, "content_id mismatch" # Далее проверим подпись при необходимости if verify_signature: signature = encrypted_obj.get("signature") signer_pubkey = encrypted_obj.get("signer_pubkey") if signature and signer_pubkey and get_ed25519_manager is not None: # Важно: подписывалась структура без полей signature/signер_pubkey payload = { "ciphertext_b64": ciphertext_b64, "nonce_b64": nonce_b64, "tag_b64": tag_b64, "content_id": computed_id, "metadata": metadata, } try: crypto_mgr = get_ed25519_manager() if not crypto_mgr.verify_signature(payload, signature, signer_pubkey): return False, "Invalid signature" except Exception as e: logger.error(f"Signature verification error: {e}") return False, "Signature verification error" else: logger.debug("No signature provided for integrity verification") logger.info("Integrity verification passed") return True, None except Exception as e: logger.error(f"Integrity verification failed: {e}") return False, str(e)