uploader-bot/app/core/crypto/content_cipher.py

231 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
MY Network v3.0 - ContentCipher (AES-256-GCM) for uploader-bot
Реализует шифрование контента с помощью AES-256-GCM и интеграцию с Ed25519Manager
для подписи зашифрованного контента и проверки целостности.
Адаптация идей из DEPRECATED:
- См. базовую AES логику ([`DEPRECATED-uploader-bot/app/core/_crypto/cipher.py`](DEPRECATED-uploader-bot/app/core/_crypto/cipher.py:1))
- См. работу с контентом ([`DEPRECATED-uploader-bot/app/core/_crypto/content.py`](DEPRECATED-uploader-bot/app/core/_crypto/content.py:1))
Отличия новой реализации:
- Используем AES-256-GCM (аутентифицированное шифрование) вместо CBC+PAD
- Формируем content_id как SHA-256 от (ciphertext || nonce || tag || metadata_json)
- Подписываем структуру EncryptedContent через Ed25519Manager
"""
from __future__ import annotations
import base64
import json
import logging
import os
from dataclasses import asdict
from hashlib import sha256
from typing import Any, Dict, Optional, Tuple
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
try:
# Импорт менеджера подписи из текущего модуля crypto
from app.core.crypto import get_ed25519_manager
except Exception:
# Ленивая инициализация без разрыва импорта (например, при статическом анализе)
get_ed25519_manager = None # type: ignore
logger = logging.getLogger(__name__)
class ContentCipher:
"""
Класс шифрования контента AES-256-GCM с интеграцией Ed25519 подписи.
Ключевая информация:
- generate_content_key() -> 32 байта (AES-256)
- encrypt_content() -> (ciphertext, nonce, tag, content_id, signature, signer_pubkey)
- decrypt_content() -> исходные данные при валидной аутентификации
- verify_content_integrity() -> проверка подписи и content_id
"""
NONCE_SIZE = 12 # Рекомендуемый размер nonce для AES-GCM
KEY_SIZE = 32 # 256-bit
def __init__(self):
# В логах не пишем чувствительные данные
logger.debug("ContentCipher initialized (AES-256-GCM)")
@staticmethod
def generate_content_key(seed: Optional[bytes] = None) -> bytes:
"""
Генерация ключа шифрования контента (32 байта).
Если передан seed (как в DEPRECATED подходе), дополнительно хэшируем SHA-256.
"""
if seed is not None:
assert isinstance(seed, (bytes, bytearray)), "seed must be bytes"
key = sha256(seed).digest()
logger.debug("Content key generated from seed via SHA-256")
return key
# Без seed — криптографически стойкая генерация
key = os.urandom(ContentCipher.KEY_SIZE)
logger.debug("Random content key generated")
return key
@staticmethod
def _compute_content_id(ciphertext: bytes, nonce: bytes, tag: bytes, metadata: Optional[Dict[str, Any]]) -> str:
"""
content_id = HEX(SHA-256(ciphertext || nonce || tag || json(metadata, sorted)))
"""
md_json = b"{}"
if metadata:
md_json = json.dumps(metadata, sort_keys=True, ensure_ascii=False).encode("utf-8")
digest = sha256(ciphertext + nonce + tag + md_json).hexdigest()
logger.debug("Computed content_id via SHA-256 over ciphertext+nonce+tag+metadata_json")
return digest
def encrypt_content(
self,
plaintext: bytes,
key: bytes,
metadata: Optional[Dict[str, Any]] = None,
associated_data: Optional[bytes] = None,
sign_with_ed25519: bool = True,
) -> Dict[str, Any]:
"""
Шифрует данные AES-256-GCM и возвращает структуру с полями:
{
ciphertext_b64, nonce_b64, tag_b64, content_id, metadata, signature, signer_pubkey
}
Примечания:
- associated_data (AAD) включается в AEAD (не шифруется, но аутентифицируется).
- signature покрывает сериализованную структуру без signature поля.
"""
assert isinstance(plaintext, (bytes, bytearray)), "plaintext must be bytes"
assert isinstance(key, (bytes, bytearray)) and len(key) == self.KEY_SIZE, "key must be 32 bytes"
aesgcm = AESGCM(key)
nonce = os.urandom(self.NONCE_SIZE)
# Шифруем: AESGCM возвращает ciphertext||tag в одном буфере
ct_with_tag = aesgcm.encrypt(nonce, plaintext, associated_data)
# Последние 16 байт — GCM tag
tag = ct_with_tag[-16:]
ciphertext = ct_with_tag[:-16]
# content_id по требованиям
content_id = self._compute_content_id(ciphertext, nonce, tag, metadata)
# Подготовка объекта для подписи
payload = {
"ciphertext_b64": base64.b64encode(ciphertext).decode("ascii"),
"nonce_b64": base64.b64encode(nonce).decode("ascii"),
"tag_b64": base64.b64encode(tag).decode("ascii"),
"content_id": content_id,
"metadata": metadata or {},
}
signature = None
signer_pubkey = None
if sign_with_ed25519 and get_ed25519_manager is not None:
try:
crypto_mgr = get_ed25519_manager()
signature = crypto_mgr.sign_message(payload)
signer_pubkey = crypto_mgr.public_key_hex
logger.debug("Encrypted payload signed with Ed25519")
except Exception as e:
# Не блокируем шифрование при проблемах подписи, но логируем
logger.error(f"Failed to sign encrypted payload: {e}")
result = {
**payload,
"signature": signature,
"signer_pubkey": signer_pubkey,
}
logger.info(f"Content encrypted: content_id={content_id}, has_signature={signature is not None}")
return result
def decrypt_content(
self,
ciphertext_b64: str,
nonce_b64: str,
tag_b64: str,
key: bytes,
associated_data: Optional[bytes] = None,
) -> bytes:
"""
Расшифровывает данные AES-256-GCM.
Бросает исключение при неверной аутентификации (tag/AAD/nonce).
"""
assert isinstance(key, (bytes, bytearray)) and len(key) == self.KEY_SIZE, "key must be 32 bytes"
ciphertext = base64.b64decode(ciphertext_b64)
nonce = base64.b64decode(nonce_b64)
tag = base64.b64decode(tag_b64)
aesgcm = AESGCM(key)
pt = aesgcm.decrypt(nonce, ciphertext + tag, associated_data)
logger.info("Content decrypted successfully")
return pt
def verify_content_integrity(
self,
encrypted_obj: Dict[str, Any],
expected_metadata: Optional[Dict[str, Any]] = None,
verify_signature: bool = True,
) -> Tuple[bool, Optional[str]]:
"""
Проверяет:
- content_id соответствует данным (ciphertext/nonce/tag/metadata)
- при наличии verify_signature и signature/signer_pubkey — валидность подписи
Возвращает: (OK, error_message)
"""
try:
# Сначала проверим content_id
ciphertext_b64 = encrypted_obj.get("ciphertext_b64")
nonce_b64 = encrypted_obj.get("nonce_b64")
tag_b64 = encrypted_obj.get("tag_b64")
metadata = encrypted_obj.get("metadata") or {}
if expected_metadata is not None and expected_metadata != metadata:
return False, "Metadata mismatch"
if not (ciphertext_b64 and nonce_b64 and tag_b64):
return False, "Missing encrypted fields"
ciphertext = base64.b64decode(ciphertext_b64)
nonce = base64.b64decode(nonce_b64)
tag = base64.b64decode(tag_b64)
computed_id = self._compute_content_id(ciphertext, nonce, tag, metadata)
if computed_id != encrypted_obj.get("content_id"):
return False, "content_id mismatch"
# Далее проверим подпись при необходимости
if verify_signature:
signature = encrypted_obj.get("signature")
signer_pubkey = encrypted_obj.get("signer_pubkey")
if signature and signer_pubkey and get_ed25519_manager is not None:
# Важно: подписывалась структура без полей signature/signер_pubkey
payload = {
"ciphertext_b64": ciphertext_b64,
"nonce_b64": nonce_b64,
"tag_b64": tag_b64,
"content_id": computed_id,
"metadata": metadata,
}
try:
crypto_mgr = get_ed25519_manager()
if not crypto_mgr.verify_signature(payload, signature, signer_pubkey):
return False, "Invalid signature"
except Exception as e:
logger.error(f"Signature verification error: {e}")
return False, "Signature verification error"
else:
logger.debug("No signature provided for integrity verification")
logger.info("Integrity verification passed")
return True, None
except Exception as e:
logger.error(f"Integrity verification failed: {e}")
return False, str(e)