from __future__ import annotations import hmac import hashlib import os import struct from typing import BinaryIO, Iterator, AsyncIterator import aiofiles from cryptography.hazmat.primitives.ciphers.aead import AESGCM MAGIC = b"ENCF" VERSION = 1 SCHEME_AES_GCM = 0x03 CHUNK_BYTES = int(os.getenv("CRYPTO_CHUNK_BYTES", "1048576")) def _derive_nonce(salt: bytes, idx: int) -> bytes: """Derive a deterministic 12-byte nonce from salt and chunk index.""" if len(salt) < 12: raise ValueError("salt must be at least 12 bytes") idx_bytes = idx.to_bytes(8, "big") return hmac.new(salt, idx_bytes, hashlib.sha256).digest()[:12] def build_header(chunk_bytes: int, salt: bytes) -> bytes: if not (0 < chunk_bytes <= (1 << 31)): raise ValueError("chunk_bytes must be between 1 and 2^31") if not (1 <= len(salt) <= 255): raise ValueError("salt length must be 1..255 bytes") # MAGIC(4) | ver(1) | scheme(1) | chunk_bytes(4,BE) | salt_len(1) | salt | reserved(5 zeros) hdr = bytearray() hdr += MAGIC hdr.append(VERSION) hdr.append(SCHEME_AES_GCM) hdr += struct.pack(">I", int(chunk_bytes)) hdr.append(len(salt)) hdr += salt hdr += b"\x00" * 5 return bytes(hdr) def encrypt_file_to_encf(src: BinaryIO, key: bytes, chunk_bytes: int, salt: bytes) -> Iterator[bytes]: """Yield ENCF v1 frames encrypted with AES-GCM.""" if len(key) not in (16, 24, 32): raise ValueError("AES-GCM key must be 128, 192 or 256 bits long") cipher = AESGCM(key) yield build_header(chunk_bytes, salt) idx = 0 while True: block = src.read(chunk_bytes) if not block: break nonce = _derive_nonce(salt, idx) ct = cipher.encrypt(nonce, block, associated_data=None) tag = ct[-16:] data = ct[:-16] yield struct.pack(">I", len(block)) yield data yield tag idx += 1 async def decrypt_encf_to_file(byte_iter: AsyncIterator[bytes], key: bytes, out_path: str) -> None: """Parse ENCF v1 (AES-GCM) stream and write plaintext to `out_path`.""" if len(key) not in (16, 24, 32): raise ValueError("AES-GCM key must be 128, 192 or 256 bits long") cipher = AESGCM(key) buf = bytearray() async def _fill(n: int) -> None: nonlocal buf while len(buf) < n: try: chunk = await byte_iter.__anext__() except StopAsyncIteration: break if chunk: buf.extend(chunk) # Parse header await _fill(11) if buf[:4] != MAGIC: raise ValueError("bad magic") version = buf[4] scheme = buf[5] if version != VERSION or scheme != SCHEME_AES_GCM: raise ValueError("unsupported ENCF header") chunk_bytes = struct.unpack(">I", bytes(buf[6:10]))[0] salt_len = buf[10] hdr_len = 4 + 1 + 1 + 4 + 1 + salt_len + 5 await _fill(hdr_len) salt = bytes(buf[11:11 + salt_len]) del buf[:hdr_len] async with aiofiles.open(out_path, "wb") as out: idx = 0 TAG_LEN = 16 while True: await _fill(4) if len(buf) == 0: break if len(buf) < 4: raise ValueError("truncated frame length") p_len = struct.unpack(">I", bytes(buf[:4]))[0] del buf[:4] await _fill(p_len + TAG_LEN) if len(buf) < p_len + TAG_LEN: raise ValueError("truncated cipher/tag") ct = bytes(buf[:p_len]) tag = bytes(buf[p_len:p_len + TAG_LEN]) del buf[:p_len + TAG_LEN] nonce = _derive_nonce(salt, idx) pt = cipher.decrypt(nonce, ct + tag, associated_data=None) await out.write(pt) idx += 1