uploader-bot/app/core/content/content_id.py

157 lines
5.5 KiB
Python

from base58 import b58encode, b58decode
from tonsdk.boc import begin_cell
from app.core._config import ALLOWED_CONTENT_TYPES
from app.core._utils.string_binary import string_to_bytes_fixed_size, bytes_to_string
# cid_v1#_ cid_version:uint8 accept_type:uint120 content_sha256:uint256 onchain_index:uint128 = CIDv1;
# onchain_index#b0 bytes_len:uint8 index:uint_var = OnchainIndex;
# accept_type#b1 bytes_len:uint8 value:bytes = Param;
# encryption_key_sha256#b2 digest:uint256 = EncryptionKey;
# cid_v2#_ cid_version:uint8 content_sha256:uint256 *[Param]s = CIDv2;
class ContentId:
def __init__(
self,
version: int = None,
content_hash: bytes = None, # only SHA256
onchain_index: int = None,
accept_type: str = None,
encryption_key_sha256: bytes = None,
):
self.version = version
self.content_hash = content_hash
self.onchain_index = onchain_index or -1
self.accept_type = accept_type
self.encryption_key_sha256 = encryption_key_sha256
if self.encryption_key_sha256:
assert len(self.encryption_key_sha256) == 32, "Invalid encryption key length"
@property
def content_hash_b58(self) -> str:
return b58encode(self.content_hash).decode()
@property
def safe_onchain_index(self):
return self.onchain_index if (not (self.onchain_index is None) and self.onchain_index >= 0) else None
def serialize_v2(self, include_accept_type=False) -> str:
cid_bin = (
(2).to_bytes(1, 'big') # cid version
+ self.content_hash
)
if not (self.safe_onchain_index is None):
oi_bin_hex = hex(self.safe_onchain_index)[2:]
if len(oi_bin_hex) % 2 == 1:
oi_bin_hex = '0' + oi_bin_hex
oi_bin_len = len(oi_bin_hex) // 2
cid_bin += b'\xb0' + oi_bin_len.to_bytes(1, 'big') + bytes.fromhex(oi_bin_hex)
if self.accept_type and include_accept_type:
at_bin_len = len(self.accept_type.encode())
at_bin = string_to_bytes_fixed_size(self.accept_type, at_bin_len)
cid_bin += (
b'\xb1'
+ at_bin_len.to_bytes(1, 'big')
+ at_bin
)
if self.encryption_key_sha256:
cid_bin += b'\xb2' + self.encryption_key_sha256
return b58encode(cid_bin).decode()
def serialize_v1(self) -> str:
at_bin = string_to_bytes_fixed_size(self.accept_type, 15)
assert len(self.content_hash) == 32, "Invalid hash length"
if self.onchain_index < 0:
oi_bin = b''
else:
oi_bin = self.onchain_index.to_bytes(16, 'big', signed=False)
assert len(oi_bin) == 16, "Invalid onchain_index"
return b58encode(
(1).to_bytes(1, 'big') # cid version
+ at_bin
+ self.content_hash
+ oi_bin
).decode()
@classmethod
def from_v2(cls, cid: str):
cid_bin = b58decode(cid)
(
cid_version,
content_sha256,
cid_bin
) = (
int.from_bytes(cid_bin[0:1], 'big'),
cid_bin[1:33],
cid_bin[33:]
)
assert cid_version == 2, "Invalid version"
params = {}
while cid_bin:
param_op = cid_bin[0:1]
cid_bin = cid_bin[1:]
if param_op == b'\xb0': # onchain_index
oi_len = int.from_bytes(cid_bin[0:1], 'big')
params['onchain_index'] = int.from_bytes(cid_bin[1:1 + oi_len], 'big')
cid_bin = cid_bin[1 + oi_len:]
elif param_op == b'\xb1': # accept_type
at_len = int.from_bytes(cid_bin[0:1], 'big')
params['accept_type'] = bytes_to_string(cid_bin[1:1 + at_len])
cid_bin = cid_bin[1 + at_len:]
elif param_op == b'\xb2': # encryption_key_sha256
params['encryption_key_sha256'] = cid_bin[1:33]
cid_bin = cid_bin[33:]
return cls(version=2, content_hash=content_sha256, **params)
@classmethod
def from_v1(cls, cid: str):
cid_bin = b58decode(cid)
(
cid_version,
accept_type,
content_sha256,
onchain_index
) = (
int.from_bytes(cid_bin[0:1], 'big'),
bytes_to_string(cid_bin[1:16]),
cid_bin[16:48],
int.from_bytes(cid_bin[48:], 'big') if len(cid_bin) > 48 else -1
)
assert cid_version == 1, "Invalid version"
content_type = accept_type.split('/')
# assert '/'.join(content_type[0:2]) in ALLOWED_CONTENT_TYPES, "Invalid accept type"
assert len(content_sha256) == 32, "Invalid hash length"
return cls(
version=1,
content_hash=content_sha256,
onchain_index=onchain_index,
accept_type=accept_type
)
@classmethod
def deserialize(cls, cid: str):
cid_version = int.from_bytes(b58decode(cid)[0:1], 'big')
if cid_version == 1:
return cls.from_v1(cid)
elif cid_version == 2:
return cls.from_v2(cid)
else:
raise ValueError("Invalid cid version")
def json_format(self):
return {
"version": self.version,
"content_hash": self.content_hash_b58,
"onchain_index": self.safe_onchain_index,
"accept_type": self.accept_type,
"encryption_key_sha256": b58encode(self.encryption_key_sha256).decode() if self.encryption_key_sha256 else None
}