uploader-bot/app/core/crypto/encf_stream.py

46 lines
1.4 KiB
Python

from __future__ import annotations
from typing import AsyncIterator
from .aes_gcm_siv_stream import MAGIC as _MAGIC, VERSION as _VER, SCHEME_AES_GCM_SIV
from .aes_gcm_siv_stream import decrypt_encf_to_file as _dec_gcmsiv
from .aes_gcm_stream import SCHEME_AES_GCM, decrypt_encf_to_file as _dec_gcm
from .aes_siv_stream import decrypt_encf_to_file as _dec_siv
async def decrypt_encf_auto(byte_iter: AsyncIterator[bytes], key: bytes, out_path: str) -> None:
"""
Detect scheme by peeking header, then delegate to proper decrypter.
Re-feeds the peeked bytes back to the chosen decoder.
"""
buf = bytearray()
async def _fill(n: int):
nonlocal buf
while len(buf) < n:
try:
ch = await byte_iter.__anext__()
except StopAsyncIteration:
break
if ch:
buf.extend(ch)
await _fill(11)
if buf[:4] != _MAGIC:
raise ValueError("bad magic")
scheme = buf[5]
async def _prepend_iter():
nonlocal buf
if buf:
yield bytes(buf)
async for ch in byte_iter:
yield ch
if scheme == SCHEME_AES_GCM_SIV:
await _dec_gcmsiv(_prepend_iter(), key, out_path)
elif scheme == SCHEME_AES_GCM:
await _dec_gcm(_prepend_iter(), key, out_path)
else:
await _dec_siv(_prepend_iter(), key, out_path)