121 lines
4.5 KiB
Python
121 lines
4.5 KiB
Python
from __future__ import annotations
|
|
|
|
import argparse
|
|
import asyncio
|
|
import base64
|
|
import json
|
|
import os
|
|
import sys
|
|
from typing import Optional
|
|
|
|
from .aes_gcm_stream import CHUNK_BYTES, encrypt_file_to_encf
|
|
from .encf_stream import decrypt_encf_auto
|
|
from .keywrap import unwrap_dek, KeyWrapError
|
|
|
|
|
|
def _normalize_base64(value: str) -> str:
|
|
padding = (-len(value)) % 4
|
|
if padding:
|
|
return value + "=" * padding
|
|
return value
|
|
|
|
|
|
def _decode_key(value: str, fmt: str) -> bytes:
|
|
if fmt == "base64":
|
|
return base64.b64decode(_normalize_base64(value))
|
|
if fmt == "hex":
|
|
cleaned = value[2:] if value.lower().startswith("0x") else value
|
|
return bytes.fromhex(cleaned)
|
|
if fmt == "raw":
|
|
return value.encode()
|
|
raise ValueError(f"unsupported key format: {fmt}")
|
|
|
|
|
|
def _decode_salt(value: str, fmt: str) -> bytes:
|
|
if fmt == "base64":
|
|
return base64.b64decode(_normalize_base64(value))
|
|
if fmt == "hex":
|
|
cleaned = value[2:] if value.lower().startswith("0x") else value
|
|
return bytes.fromhex(cleaned)
|
|
raise ValueError(f"unsupported salt format: {fmt}")
|
|
|
|
|
|
async def _decrypt_file(input_path: str, key: bytes, output_path: str) -> None:
|
|
async def _aiter():
|
|
with open(input_path, "rb") as src:
|
|
while True:
|
|
chunk = src.read(65536)
|
|
if not chunk:
|
|
break
|
|
yield chunk
|
|
|
|
await decrypt_encf_auto(_aiter(), key, output_path)
|
|
|
|
|
|
def cmd_encrypt(args: argparse.Namespace) -> int:
|
|
key = _decode_key(args.key, args.key_format)
|
|
salt = _decode_salt(args.salt, args.salt_format) if args.salt else os.urandom(args.salt_bytes)
|
|
os.makedirs(os.path.dirname(args.output) or ".", exist_ok=True)
|
|
with open(args.input, "rb") as src, open(args.output, "wb") as dst:
|
|
for chunk in encrypt_file_to_encf(src, key, args.chunk_bytes, salt):
|
|
dst.write(chunk)
|
|
# Emit JSON metadata with salt for convenience
|
|
meta = {
|
|
"salt_b64": base64.b64encode(salt).decode(),
|
|
"chunk_bytes": args.chunk_bytes,
|
|
"aead_scheme": "AES_GCM",
|
|
}
|
|
print(json.dumps(meta), file=sys.stdout)
|
|
return 0
|
|
|
|
|
|
def cmd_decrypt(args: argparse.Namespace) -> int:
|
|
if bool(args.key) == bool(args.wrapped_key):
|
|
raise SystemExit("Provide exactly one of --key or --wrapped-key")
|
|
if args.wrapped_key:
|
|
try:
|
|
key = unwrap_dek(args.wrapped_key)
|
|
except KeyWrapError as exc:
|
|
raise SystemExit(f"Failed to unwrap key: {exc}") from exc
|
|
else:
|
|
key = _decode_key(args.key, args.key_format)
|
|
os.makedirs(os.path.dirname(args.output) or ".", exist_ok=True)
|
|
asyncio.run(_decrypt_file(args.input, key, args.output))
|
|
return 0
|
|
|
|
|
|
def build_parser() -> argparse.ArgumentParser:
|
|
parser = argparse.ArgumentParser(prog="python -m app.core.crypto.cli", description="ENCF AES-GCM helper")
|
|
sub = parser.add_subparsers(dest="command", required=True)
|
|
|
|
enc = sub.add_parser("encrypt", help="Encrypt file into ENCF v1 stream (AES-256-GCM)")
|
|
enc.add_argument("--input", required=True, help="Path to plaintext input file")
|
|
enc.add_argument("--output", required=True, help="Destination path for ENCF output")
|
|
enc.add_argument("--key", required=True, help="Encryption key")
|
|
enc.add_argument("--key-format", choices=["base64", "hex", "raw"], default="base64")
|
|
enc.add_argument("--salt", help="Salt in specified format; generates random if omitted")
|
|
enc.add_argument("--salt-format", choices=["base64", "hex"], default="base64")
|
|
enc.add_argument("--salt-bytes", type=int, default=16, help="Salt length when generated (default: 16)")
|
|
enc.add_argument("--chunk-bytes", type=int, default=CHUNK_BYTES, help="Plaintext chunk size (default from env)")
|
|
enc.set_defaults(func=cmd_encrypt)
|
|
|
|
dec = sub.add_parser("decrypt", help="Decrypt ENCF stream to plaintext")
|
|
dec.add_argument("--input", required=True, help="Path to ENCF input file")
|
|
dec.add_argument("--output", required=True, help="Destination path for decrypted file")
|
|
dec.add_argument("--key", help="Plaintext key")
|
|
dec.add_argument("--wrapped-key", help="Wrapped key produced by the backend")
|
|
dec.add_argument("--key-format", choices=["base64", "hex", "raw"], default="base64")
|
|
dec.set_defaults(func=cmd_decrypt)
|
|
|
|
return parser
|
|
|
|
|
|
def main(argv: Optional[list[str]] = None) -> int:
|
|
parser = build_parser()
|
|
args = parser.parse_args(argv)
|
|
return args.func(args)
|
|
|
|
|
|
if __name__ == "__main__": # pragma: no cover
|
|
sys.exit(main())
|