diff --git a/app/api/routes/_blockchain.py b/app/api/routes/_blockchain.py index 5c41eaf..cffccb6 100644 --- a/app/api/routes/_blockchain.py +++ b/app/api/routes/_blockchain.py @@ -1,4 +1,4 @@ -from base64 import b64encode +from base64 import b64encode, b32decode from datetime import datetime import traceback @@ -7,7 +7,7 @@ from sqlalchemy import and_, select, func from tonsdk.boc import begin_cell, begin_dict from tonsdk.utils import Address -from base58 import b58encode +from base58 import b58encode, b58decode from app.core._blockchain.ton.connect import TonConnect, wallet_obj_by_name from app.core._blockchain.ton.platform import platform from app.core._config import PROJECT_HOST @@ -72,12 +72,78 @@ async def s_api_v1_blockchain_send_new_content_message(request): except BaseException: # New path: treat provided string as encrypted IPFS CID (ENCF v1) encrypted_ipfs_cid = request.json['content'] - class _EC: # tiny adapter to mimic .serialize_v2() - def __init__(self, s: str): - self._s = s + + class _EC: + """Adapter to provide ContentId-like interface for IPFS CID strings.""" + + def __init__(self, cid_str: str): + self._cid = cid_str + self.content_hash = self._extract_content_hash(cid_str) + self._content_hash_b58 = None + + @staticmethod + def _decode_multibase(cid_str: str) -> bytes: + if not cid_str: + raise ValueError("empty CID") + prefix = cid_str[0] + if prefix in ('b', 'B'): + payload = cid_str[1:] + padding = (-len(payload)) % 8 + return b32decode(payload.upper() + ('=' * padding), casefold=True) + if prefix in ('z', 'Z'): + return b58decode(cid_str[1:]) + # CIDv0 (base58btc without explicit multibase prefix) + return b58decode(cid_str) + + @staticmethod + def _read_varint(data: bytes, offset: int): + result = 0 + shift = 0 + while True: + if offset >= len(data): + raise ValueError("truncated varint") + byte = data[offset] + offset += 1 + result |= (byte & 0x7F) << shift + if not (byte & 0x80): + break + shift += 7 + if shift > 63: + raise ValueError("varint overflow") + return result, offset + + @classmethod + def _extract_content_hash(cls, cid_str: str) -> bytes: + data = cls._decode_multibase(cid_str) + offset = 0 + if data and data[0] == 0x01: + version, offset = cls._read_varint(data, offset) + if version != 1: + raise ValueError("unsupported CID version") + _, offset = cls._read_varint(data, offset) # skip codec + code, offset = cls._read_varint(data, offset) + length, offset = cls._read_varint(data, offset) + digest = data[offset:offset + length] + if len(digest) != length: + raise ValueError("truncated multihash digest") + if code != 0x12 or length != 32: + raise ValueError("unsupported multihash (expect sha2-256)") + return digest + def serialize_v2(self, include_accept_type: bool = False): - return self._s - encrypted_content_cid = _EC(encrypted_ipfs_cid) + return self._cid + + @property + def content_hash_b58(self) -> str: + if self._content_hash_b58 is None: + self._content_hash_b58 = b58encode(self.content_hash).decode() + return self._content_hash_b58 + + try: + encrypted_content_cid = _EC(encrypted_ipfs_cid) + except Exception as exc: + make_log("Blockchain", f"Provided encrypted IPFS CID is invalid: {exc}", level='error') + raise AssertionError("Invalid encrypted content CID provided") from exc if request.json['image']: image_content_cid, err = resolve_content(request.json['image'])