import os from datetime import datetime from hashlib import sha256 import aiofiles from Crypto.Random import get_random_bytes from base58 import b58encode from app.core._config import UPLOADS_DIR from app.core._crypto.cipher import AESCipher from app.core.models.keys import KnownKey from app.core.models.node_storage import StoredContent async def create_new_encryption_key(db_session, user_id: int = None) -> KnownKey: randpart = get_random_bytes(32) new_seed = randpart new_seed_str = b58encode(new_seed).decode() new_seed_hash_bin = sha256(new_seed).digest() new_seed_hash = b58encode(new_seed_hash_bin).decode() public_key = get_random_bytes(32) # not used yet, algo is symmetric public_key_str = b58encode(public_key).decode() public_key_hash_bin = sha256(public_key).digest() public_key_hash = b58encode(public_key_hash_bin).decode() new_key = KnownKey( type="CONTENT_ENCRYPTION_KEY", seed=new_seed_str, seed_hash=new_seed_hash, public_key=public_key_str, public_key_hash=public_key_hash, algo="AES256", meta={"I_user_id": user_id} if user_id else None, created=datetime.now() ) db_session.add(new_key) db_session.commit() new_key = db_session.query(KnownKey).filter(KnownKey.seed_hash == new_seed_hash).first() assert new_key, "Key not created" return new_key async def create_encrypted_content( db_session, decrypted_content: StoredContent, ) -> StoredContent: encrypted_content = db_session.query(StoredContent).filter( StoredContent.id == decrypted_content.id ).first() if encrypted_content: return encrypted_content encrypted_content = None if decrypted_content.key is None: key = await create_new_encryption_key(db_session, user_id=decrypted_content.user_id) decrypted_content.key_id = key.id db_session.commit() decrypted_content = db_session.query(StoredContent).filter( StoredContent.id == decrypted_content.id ).first() assert decrypted_content.key_id, "Key not assigned" decrypted_path = os.path.join(UPLOADS_DIR, decrypted_content.hash) async with aiofiles.open(decrypted_path, mode='rb') as file: decrypted_bin = await file.read() key = decrypted_content.key cipher = AESCipher(key.seed_bin) encrypted_bin = cipher.encrypt(decrypted_bin) encrypted_hash_bin = sha256(encrypted_bin).digest() encrypted_hash = b58encode(encrypted_hash_bin).decode() encrypted_content = db_session.query(StoredContent).filter( StoredContent.hash == encrypted_hash ).first() if encrypted_content: return encrypted_content encrypted_content = None encrypted_meta = decrypted_content.meta encrypted_meta["encrypt_algo"] = "AES256" encrypted_content = StoredContent( type="local/content_bin", hash=encrypted_hash, onchain_index=None, filename=decrypted_content.filename, meta=encrypted_meta, user_id=decrypted_content.user_id, encrypted=True, decrypted_content_id=decrypted_content.id, key_id=decrypted_content.key_id, created=datetime.now(), ) db_session.add(encrypted_content) db_session.commit() encrypted_path = os.path.join(UPLOADS_DIR, encrypted_hash) async with aiofiles.open(encrypted_path, mode='wb') as file: await file.write(encrypted_bin) encrypted_content = db_session.query(StoredContent).filter( StoredContent.hash == encrypted_hash ).first() assert encrypted_content, "Content not created" return encrypted_content