from base64 import b32decode from typing import Optional, Tuple from base58 import b58encode, b58decode from tonsdk.boc import begin_cell from app.core._config import ALLOWED_CONTENT_TYPES from app.core._utils.string_binary import string_to_bytes_fixed_size, bytes_to_string # cid_v1#_ cid_version:uint8 accept_type:uint120 content_sha256:uint256 onchain_index:uint128 = CIDv1; # onchain_index#b0 bytes_len:uint8 index:uint_var = OnchainIndex; # accept_type#b1 bytes_len:uint8 value:bytes = Param; # encryption_key_sha256#b2 digest:uint256 = EncryptionKey; # cid_v2#_ cid_version:uint8 content_sha256:uint256 *[Param]s = CIDv2; class ContentId: """Unified abstraction for legacy ContentID and ENCF/IPFS CID strings.""" def __init__( self, version: Optional[int] = None, content_hash: Optional[bytes] = None, # only SHA256 onchain_index: Optional[int] = None, accept_type: Optional[str] = None, encryption_key_sha256: Optional[bytes] = None, *, raw_value: Optional[str] = None, cid_format: Optional[str] = None, multibase_prefix: Optional[str] = None, multicodec: Optional[int] = None, multihash_code: Optional[int] = 0x12, multihash_length: Optional[int] = 32, ): self.version = version self.content_hash = content_hash self.onchain_index = onchain_index if onchain_index is not None else -1 self.accept_type = accept_type self.encryption_key_sha256 = encryption_key_sha256 if self.encryption_key_sha256: assert len(self.encryption_key_sha256) == 32, "Invalid encryption key length" self._raw_value = raw_value if cid_format: self.cid_format = cid_format else: if self.version == 1: self.cid_format = 'content_id_v1' elif self.version == 2: self.cid_format = 'content_id_v2' else: self.cid_format = 'content_id_v2' self.multibase_prefix = multibase_prefix self.multicodec = multicodec self.multihash_code = multihash_code self.multihash_length = multihash_length @property def content_hash_b58(self) -> str: assert self.content_hash, "Content hash is not set" return b58encode(self.content_hash).decode() @property def safe_onchain_index(self): return self.onchain_index if (not (self.onchain_index is None) and self.onchain_index >= 0) else None def serialize_v2(self, include_accept_type=False) -> str: if self.cid_format == 'ipfs': if self._raw_value: return self._raw_value return self._serialize_ipfs() cid_bin = ( (2).to_bytes(1, 'big') # cid version + self.content_hash ) if not (self.safe_onchain_index is None): oi_bin_hex = (lambda x: (lambda y: (f"0{y}" if len(y) % 2 else y))(hex(x)[2:]))(self.safe_onchain_index) oi_bin_len = len(oi_bin_hex) // 2 cid_bin += b'\xb0' + oi_bin_len.to_bytes(1, 'big') + bytes.fromhex(oi_bin_hex) if self.accept_type and include_accept_type: at_bin_len = len(self.accept_type.encode()) at_bin = string_to_bytes_fixed_size(self.accept_type, at_bin_len) cid_bin += ( b'\xb1' + at_bin_len.to_bytes(1, 'big') + at_bin ) if self.encryption_key_sha256: cid_bin += b'\xb2' + self.encryption_key_sha256 return b58encode(cid_bin).decode() def serialize_v1(self) -> str: if self.cid_format == 'ipfs': raise ValueError("Cannot serialize IPFS CID as ContentId v1") at_bin = string_to_bytes_fixed_size(self.accept_type, 15) assert len(self.content_hash) == 32, "Invalid hash length" if self.onchain_index < 0: oi_bin = b'' else: oi_bin = self.onchain_index.to_bytes(16, 'big', signed=False) assert len(oi_bin) == 16, "Invalid onchain_index" return b58encode( (1).to_bytes(1, 'big') # cid version + at_bin + self.content_hash + oi_bin ).decode() @classmethod def from_v2(cls, cid: str): cid_bin = b58decode(cid) ( cid_version, content_sha256, cid_bin ) = ( int.from_bytes(cid_bin[0:1], 'big'), cid_bin[1:33], cid_bin[33:] ) assert cid_version == 2, "Invalid version" params = {} while cid_bin: param_op = cid_bin[0:1] cid_bin = cid_bin[1:] if param_op == b'\xb0': # onchain_index oi_len = int.from_bytes(cid_bin[0:1], 'big') params['onchain_index'] = int.from_bytes(cid_bin[1:1 + oi_len], 'big') cid_bin = cid_bin[1 + oi_len:] elif param_op == b'\xb1': # accept_type at_len = int.from_bytes(cid_bin[0:1], 'big') params['accept_type'] = bytes_to_string(cid_bin[1:1 + at_len]) cid_bin = cid_bin[1 + at_len:] elif param_op == b'\xb2': # encryption_key_sha256 params['encryption_key_sha256'] = cid_bin[1:33] cid_bin = cid_bin[33:] return cls(version=2, content_hash=content_sha256, **params) @classmethod def from_v1(cls, cid: str): cid_bin = b58decode(cid) ( cid_version, accept_type, content_sha256, onchain_index ) = ( int.from_bytes(cid_bin[0:1], 'big'), bytes_to_string(cid_bin[1:16]), cid_bin[16:48], int.from_bytes(cid_bin[48:], 'big') if len(cid_bin) > 48 else -1 ) assert cid_version == 1, "Invalid version" content_type = accept_type.split('/') # assert '/'.join(content_type[0:2]) in ALLOWED_CONTENT_TYPES, "Invalid accept type" assert len(content_sha256) == 32, "Invalid hash length" return cls( version=1, content_hash=content_sha256, onchain_index=onchain_index, accept_type=accept_type ) @classmethod def deserialize(cls, cid: str): if not cid: raise ValueError("Empty content id provided") first_char = cid[0] if first_char in ('b', 'B', 'z', 'Z'): return cls.from_ipfs(cid) try: cid_version = int.from_bytes(b58decode(cid)[0:1], 'big') except Exception: cid_version = None if cid_version == 1: obj = cls.from_v1(cid) obj._raw_value = cid return obj if cid_version == 2: obj = cls.from_v2(cid) obj._raw_value = cid return obj try: return cls.from_ipfs(cid) except Exception as exc: raise ValueError(f"Invalid cid format: {exc}") from exc def json_format(self): return { "version": self.version, "content_hash": self.content_hash_b58, "onchain_index": self.safe_onchain_index, "accept_type": self.accept_type, "encryption_key_sha256": b58encode(self.encryption_key_sha256).decode() if self.encryption_key_sha256 else None, "format": self.cid_format, "raw": self.serialize_v2() if self.cid_format == 'ipfs' else None, } # --- helpers for IPFS/ENCF CID handling --------------------------------- @staticmethod def _decode_multibase(cid_str: str) -> Tuple[bytes, Optional[str]]: prefix = cid_str[0] if prefix in ('b', 'B'): payload = cid_str[1:] padding = (-len(payload)) % 8 decoded = b32decode(payload.upper() + ('=' * padding), casefold=True) return decoded, prefix.lower() if prefix in ('z', 'Z'): return b58decode(cid_str[1:]), prefix.lower() # CIDv0 without explicit prefix return b58decode(cid_str), None @staticmethod def _read_varint(data: bytes, offset: int) -> Tuple[int, int]: result = 0 shift = 0 while True: if offset >= len(data): raise ValueError("truncated varint") byte = data[offset] offset += 1 result |= (byte & 0x7F) << shift if not (byte & 0x80): break shift += 7 if shift > 63: raise ValueError("varint overflow") return result, offset @classmethod def from_ipfs(cls, cid: str): cid = cid.strip() payload, multibase_prefix = cls._decode_multibase(cid) idx = 0 version: Optional[int] = None codec: Optional[int] = None if multibase_prefix is not None: version, idx = cls._read_varint(payload, idx) if version not in (0, 1): raise ValueError(f"unsupported CID version: {version}") if version == 1: codec, idx = cls._read_varint(payload, idx) else: codec = 0x70 # dag-pb default for CIDv0 else: # CIDv0 without explicit version/codec version = 0 codec = 0x70 multihash_code, idx = cls._read_varint(payload, idx) multihash_length, idx = cls._read_varint(payload, idx) digest = payload[idx:idx + multihash_length] if len(digest) != multihash_length: raise ValueError("truncated multihash digest") if multihash_length != 32: raise ValueError("unsupported multihash length (expected 32 bytes)") if multihash_code != 0x12: raise ValueError(f"unsupported multihash code: {hex(multihash_code)}") return cls( version=version, content_hash=digest, onchain_index=None, accept_type=None, encryption_key_sha256=None, raw_value=cid, cid_format='ipfs', multibase_prefix=multibase_prefix, multicodec=codec, multihash_code=multihash_code, multihash_length=multihash_length, ) def _serialize_ipfs(self) -> str: if not self.content_hash: raise ValueError("Cannot serialize IPFS CID without content hash") if self.multibase_prefix is None: # default to CIDv0 (base58btc) dag-pb multihash = self._encode_varint(self.multihash_code or 0x12) + self._encode_varint(self.multihash_length or len(self.content_hash)) + self.content_hash return b58encode(multihash).decode() version_bytes = self._encode_varint(self.version or 1) codec_bytes = b'' if (self.version or 1) == 1: codec_bytes = self._encode_varint(self.multicodec or 0x70) multihash = ( version_bytes + codec_bytes + self._encode_varint(self.multihash_code or 0x12) + self._encode_varint(self.multihash_length or len(self.content_hash)) + self.content_hash ) if self.multibase_prefix == 'z': return 'z' + b58encode(multihash).decode() if self.multibase_prefix == 'b': from base64 import b32encode encoded = b32encode(multihash).decode().rstrip('=').lower() return 'b' + encoded # Fallback to base58btc without prefix return b58encode(multihash).decode() @staticmethod def _encode_varint(value: int) -> bytes: if value < 0: raise ValueError("varint cannot encode negative values") out = bytearray() while True: to_write = value & 0x7F value >>= 7 if value: out.append(to_write | 0x80) else: out.append(to_write) break return bytes(out)