from __future__ import annotations import argparse import asyncio import base64 import json import os import sys from typing import Optional from .aes_gcm_stream import CHUNK_BYTES, encrypt_file_to_encf from .encf_stream import decrypt_encf_auto from .keywrap import unwrap_dek, KeyWrapError def _normalize_base64(value: str) -> str: padding = (-len(value)) % 4 if padding: return value + "=" * padding return value def _decode_key(value: str, fmt: str) -> bytes: if fmt == "base64": return base64.b64decode(_normalize_base64(value)) if fmt == "hex": cleaned = value[2:] if value.lower().startswith("0x") else value return bytes.fromhex(cleaned) if fmt == "raw": return value.encode() raise ValueError(f"unsupported key format: {fmt}") def _decode_salt(value: str, fmt: str) -> bytes: if fmt == "base64": return base64.b64decode(_normalize_base64(value)) if fmt == "hex": cleaned = value[2:] if value.lower().startswith("0x") else value return bytes.fromhex(cleaned) raise ValueError(f"unsupported salt format: {fmt}") async def _decrypt_file(input_path: str, key: bytes, output_path: str) -> None: async def _aiter(): with open(input_path, "rb") as src: while True: chunk = src.read(65536) if not chunk: break yield chunk await decrypt_encf_auto(_aiter(), key, output_path) def cmd_encrypt(args: argparse.Namespace) -> int: key = _decode_key(args.key, args.key_format) salt = _decode_salt(args.salt, args.salt_format) if args.salt else os.urandom(args.salt_bytes) os.makedirs(os.path.dirname(args.output) or ".", exist_ok=True) with open(args.input, "rb") as src, open(args.output, "wb") as dst: for chunk in encrypt_file_to_encf(src, key, args.chunk_bytes, salt): dst.write(chunk) # Emit JSON metadata with salt for convenience meta = { "salt_b64": base64.b64encode(salt).decode(), "chunk_bytes": args.chunk_bytes, "aead_scheme": "AES_GCM", } print(json.dumps(meta), file=sys.stdout) return 0 def cmd_decrypt(args: argparse.Namespace) -> int: if bool(args.key) == bool(args.wrapped_key): raise SystemExit("Provide exactly one of --key or --wrapped-key") if args.wrapped_key: try: key = unwrap_dek(args.wrapped_key) except KeyWrapError as exc: raise SystemExit(f"Failed to unwrap key: {exc}") from exc else: key = _decode_key(args.key, args.key_format) os.makedirs(os.path.dirname(args.output) or ".", exist_ok=True) asyncio.run(_decrypt_file(args.input, key, args.output)) return 0 def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(prog="python -m app.core.crypto.cli", description="ENCF AES-GCM helper") sub = parser.add_subparsers(dest="command", required=True) enc = sub.add_parser("encrypt", help="Encrypt file into ENCF v1 stream (AES-256-GCM)") enc.add_argument("--input", required=True, help="Path to plaintext input file") enc.add_argument("--output", required=True, help="Destination path for ENCF output") enc.add_argument("--key", required=True, help="Encryption key") enc.add_argument("--key-format", choices=["base64", "hex", "raw"], default="base64") enc.add_argument("--salt", help="Salt in specified format; generates random if omitted") enc.add_argument("--salt-format", choices=["base64", "hex"], default="base64") enc.add_argument("--salt-bytes", type=int, default=16, help="Salt length when generated (default: 16)") enc.add_argument("--chunk-bytes", type=int, default=CHUNK_BYTES, help="Plaintext chunk size (default from env)") enc.set_defaults(func=cmd_encrypt) dec = sub.add_parser("decrypt", help="Decrypt ENCF stream to plaintext") dec.add_argument("--input", required=True, help="Path to ENCF input file") dec.add_argument("--output", required=True, help="Destination path for decrypted file") dec.add_argument("--key", help="Plaintext key") dec.add_argument("--wrapped-key", help="Wrapped key produced by the backend") dec.add_argument("--key-format", choices=["base64", "hex", "raw"], default="base64") dec.set_defaults(func=cmd_decrypt) return parser def main(argv: Optional[list[str]] = None) -> int: parser = build_parser() args = parser.parse_args(argv) return args.func(args) if __name__ == "__main__": # pragma: no cover sys.exit(main())