from base58 import b58encode, b58decode from tonsdk.boc import begin_cell from app.core._config import ALLOWED_CONTENT_TYPES from app.core._utils.string_binary import string_to_bytes_fixed_size, bytes_to_string # cid_v1#_ cid_version:uint8 accept_type:uint120 content_sha256:uint256 onchain_index:uint128 = CIDv1; # onchain_index#b0 bytes_len:uint8 index:uint_var = OnchainIndex; # accept_type#b1 bytes_len:uint8 value:bytes = Param; # encryption_key_sha256#b2 digest:uint256 = EncryptionKey; # cid_v2#_ cid_version:uint8 content_sha256:uint256 *[Param]s = CIDv2; class ContentId: def __init__( self, version: int = None, content_hash: bytes = None, # only SHA256 onchain_index: int = None, accept_type: str = 'image/jpeg', encryption_key_sha256: bytes = None, ): self.version = version self.content_hash = content_hash self.onchain_index = onchain_index or -1 self.accept_type = accept_type self.encryption_key_sha256 = encryption_key_sha256 if self.encryption_key_sha256: assert len(self.encryption_key_sha256) == 32, "Invalid encryption key length" @property def content_hash_b58(self) -> str: return b58encode(self.content_hash).decode() @property def safe_onchain_index(self): return self.onchain_index if (not (self.onchain_index is None) and self.onchain_index >= 0) else None def serialize_v2(self, include_accept_type=False) -> str: cid_bin = ( (2).to_bytes(1, 'big') # cid version + self.content_hash ) if not (self.safe_onchain_index is None): oi_bin_hex = hex(self.safe_onchain_index)[2:] if len(oi_bin_hex) % 2 == 1: oi_bin_hex = '0' + oi_bin_hex oi_bin_len = len(oi_bin_hex) // 2 cid_bin += b'\xb0' + oi_bin_len.to_bytes(1, 'big') + bytes.fromhex(oi_bin_hex) if self.accept_type and include_accept_type: at_bin_len = len(self.accept_type.encode()) at_bin = string_to_bytes_fixed_size(self.accept_type, at_bin_len) cid_bin += ( b'\xb1' + at_bin_len.to_bytes(1, 'big') + at_bin ) if self.encryption_key_sha256: cid_bin += b'\xb2' + self.encryption_key_sha256 return b58encode(cid_bin).decode() def serialize_v1(self) -> str: at_bin = string_to_bytes_fixed_size(self.accept_type, 15) assert len(self.content_hash) == 32, "Invalid hash length" if self.onchain_index < 0: oi_bin = b'' else: oi_bin = self.onchain_index.to_bytes(16, 'big', signed=False) assert len(oi_bin) == 16, "Invalid onchain_index" return b58encode( (1).to_bytes(1, 'big') # cid version + at_bin + self.content_hash + oi_bin ).decode() @classmethod def from_v2(cls, cid: str): cid_bin = b58decode(cid) ( cid_version, content_sha256, cid_bin ) = ( int.from_bytes(cid_bin[0:1], 'big'), cid_bin[1:33], cid_bin[33:] ) assert cid_version == 2, "Invalid version" params = {} while cid_bin: param_op = cid_bin[0:1] cid_bin = cid_bin[1:] if param_op == b'\xb0': # onchain_index oi_len = int.from_bytes(cid_bin[0:1], 'big') params['onchain_index'] = int.from_bytes(cid_bin[1:1 + oi_len], 'big') cid_bin = cid_bin[1 + oi_len:] elif param_op == b'\xb1': # accept_type at_len = int.from_bytes(cid_bin[0:1], 'big') params['accept_type'] = bytes_to_string(cid_bin[1:1 + at_len]) cid_bin = cid_bin[1 + at_len:] elif param_op == b'\xb2': # encryption_key_sha256 params['encryption_key_sha256'] = cid_bin[1:33] cid_bin = cid_bin[33:] return cls(version=2, content_hash=content_sha256, **params) @classmethod def from_v1(cls, cid: str): cid_bin = b58decode(cid) ( cid_version, accept_type, content_sha256, onchain_index ) = ( int.from_bytes(cid_bin[0:1], 'big'), bytes_to_string(cid_bin[1:16]), cid_bin[16:48], int.from_bytes(cid_bin[48:], 'big') if len(cid_bin) > 48 else -1 ) assert cid_version == 1, "Invalid version" content_type = accept_type.split('/') # assert '/'.join(content_type[0:2]) in ALLOWED_CONTENT_TYPES, "Invalid accept type" assert len(content_sha256) == 32, "Invalid hash length" return cls( version=1, content_hash=content_sha256, onchain_index=onchain_index, accept_type=accept_type ) @classmethod def deserialize(cls, cid: str): cid_version = int.from_bytes(b58decode(cid)[0:1], 'big') if cid_version == 1: return cls.from_v1(cid) elif cid_version == 2: return cls.from_v2(cid) else: raise ValueError("Invalid cid version") def json_format(self): return { "version": self.version, "content_hash": self.content_hash_b58, "onchain_index": self.safe_onchain_index, "accept_type": self.accept_type, "encryption_key_sha256": b58encode(self.encryption_key_sha256).decode() if self.encryption_key_sha256 else None }