uploader-bot/app/core/crypto/cli.py

121 lines
4.5 KiB
Python

from __future__ import annotations
import argparse
import asyncio
import base64
import json
import os
import sys
from typing import Optional
from .aes_gcm_stream import CHUNK_BYTES, encrypt_file_to_encf
from .encf_stream import decrypt_encf_auto
from .keywrap import unwrap_dek, KeyWrapError
def _normalize_base64(value: str) -> str:
padding = (-len(value)) % 4
if padding:
return value + "=" * padding
return value
def _decode_key(value: str, fmt: str) -> bytes:
if fmt == "base64":
return base64.b64decode(_normalize_base64(value))
if fmt == "hex":
cleaned = value[2:] if value.lower().startswith("0x") else value
return bytes.fromhex(cleaned)
if fmt == "raw":
return value.encode()
raise ValueError(f"unsupported key format: {fmt}")
def _decode_salt(value: str, fmt: str) -> bytes:
if fmt == "base64":
return base64.b64decode(_normalize_base64(value))
if fmt == "hex":
cleaned = value[2:] if value.lower().startswith("0x") else value
return bytes.fromhex(cleaned)
raise ValueError(f"unsupported salt format: {fmt}")
async def _decrypt_file(input_path: str, key: bytes, output_path: str) -> None:
async def _aiter():
with open(input_path, "rb") as src:
while True:
chunk = src.read(65536)
if not chunk:
break
yield chunk
await decrypt_encf_auto(_aiter(), key, output_path)
def cmd_encrypt(args: argparse.Namespace) -> int:
key = _decode_key(args.key, args.key_format)
salt = _decode_salt(args.salt, args.salt_format) if args.salt else os.urandom(args.salt_bytes)
os.makedirs(os.path.dirname(args.output) or ".", exist_ok=True)
with open(args.input, "rb") as src, open(args.output, "wb") as dst:
for chunk in encrypt_file_to_encf(src, key, args.chunk_bytes, salt):
dst.write(chunk)
# Emit JSON metadata with salt for convenience
meta = {
"salt_b64": base64.b64encode(salt).decode(),
"chunk_bytes": args.chunk_bytes,
"aead_scheme": "AES_GCM",
}
print(json.dumps(meta), file=sys.stdout)
return 0
def cmd_decrypt(args: argparse.Namespace) -> int:
if bool(args.key) == bool(args.wrapped_key):
raise SystemExit("Provide exactly one of --key or --wrapped-key")
if args.wrapped_key:
try:
key = unwrap_dek(args.wrapped_key)
except KeyWrapError as exc:
raise SystemExit(f"Failed to unwrap key: {exc}") from exc
else:
key = _decode_key(args.key, args.key_format)
os.makedirs(os.path.dirname(args.output) or ".", exist_ok=True)
asyncio.run(_decrypt_file(args.input, key, args.output))
return 0
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(prog="python -m app.core.crypto.cli", description="ENCF AES-GCM helper")
sub = parser.add_subparsers(dest="command", required=True)
enc = sub.add_parser("encrypt", help="Encrypt file into ENCF v1 stream (AES-256-GCM)")
enc.add_argument("--input", required=True, help="Path to plaintext input file")
enc.add_argument("--output", required=True, help="Destination path for ENCF output")
enc.add_argument("--key", required=True, help="Encryption key")
enc.add_argument("--key-format", choices=["base64", "hex", "raw"], default="base64")
enc.add_argument("--salt", help="Salt in specified format; generates random if omitted")
enc.add_argument("--salt-format", choices=["base64", "hex"], default="base64")
enc.add_argument("--salt-bytes", type=int, default=16, help="Salt length when generated (default: 16)")
enc.add_argument("--chunk-bytes", type=int, default=CHUNK_BYTES, help="Plaintext chunk size (default from env)")
enc.set_defaults(func=cmd_encrypt)
dec = sub.add_parser("decrypt", help="Decrypt ENCF stream to plaintext")
dec.add_argument("--input", required=True, help="Path to ENCF input file")
dec.add_argument("--output", required=True, help="Destination path for decrypted file")
dec.add_argument("--key", help="Plaintext key")
dec.add_argument("--wrapped-key", help="Wrapped key produced by the backend")
dec.add_argument("--key-format", choices=["base64", "hex", "raw"], default="base64")
dec.set_defaults(func=cmd_decrypt)
return parser
def main(argv: Optional[list[str]] = None) -> int:
parser = build_parser()
args = parser.parse_args(argv)
return args.func(args)
if __name__ == "__main__": # pragma: no cover
sys.exit(main())