330 lines
12 KiB
Python
330 lines
12 KiB
Python
from base64 import b32decode
|
|
from typing import Optional, Tuple
|
|
|
|
from base58 import b58encode, b58decode
|
|
|
|
from tonsdk.boc import begin_cell
|
|
from app.core._config import ALLOWED_CONTENT_TYPES
|
|
from app.core._utils.string_binary import string_to_bytes_fixed_size, bytes_to_string
|
|
|
|
|
|
# cid_v1#_ cid_version:uint8 accept_type:uint120 content_sha256:uint256 onchain_index:uint128 = CIDv1;
|
|
# onchain_index#b0 bytes_len:uint8 index:uint_var = OnchainIndex;
|
|
# accept_type#b1 bytes_len:uint8 value:bytes = Param;
|
|
# encryption_key_sha256#b2 digest:uint256 = EncryptionKey;
|
|
# cid_v2#_ cid_version:uint8 content_sha256:uint256 *[Param]s = CIDv2;
|
|
|
|
class ContentId:
|
|
"""Unified abstraction for legacy ContentID and ENCF/IPFS CID strings."""
|
|
|
|
def __init__(
|
|
self,
|
|
version: Optional[int] = None,
|
|
content_hash: Optional[bytes] = None, # only SHA256
|
|
onchain_index: Optional[int] = None,
|
|
accept_type: Optional[str] = None,
|
|
encryption_key_sha256: Optional[bytes] = None,
|
|
*,
|
|
raw_value: Optional[str] = None,
|
|
cid_format: Optional[str] = None,
|
|
multibase_prefix: Optional[str] = None,
|
|
multicodec: Optional[int] = None,
|
|
multihash_code: Optional[int] = 0x12,
|
|
multihash_length: Optional[int] = 32,
|
|
):
|
|
self.version = version
|
|
self.content_hash = content_hash
|
|
|
|
self.onchain_index = onchain_index if onchain_index is not None else -1
|
|
self.accept_type = accept_type
|
|
self.encryption_key_sha256 = encryption_key_sha256
|
|
if self.encryption_key_sha256:
|
|
assert len(self.encryption_key_sha256) == 32, "Invalid encryption key length"
|
|
|
|
self._raw_value = raw_value
|
|
if cid_format:
|
|
self.cid_format = cid_format
|
|
else:
|
|
if self.version == 1:
|
|
self.cid_format = 'content_id_v1'
|
|
elif self.version == 2:
|
|
self.cid_format = 'content_id_v2'
|
|
else:
|
|
self.cid_format = 'content_id_v2'
|
|
self.multibase_prefix = multibase_prefix
|
|
self.multicodec = multicodec
|
|
self.multihash_code = multihash_code
|
|
self.multihash_length = multihash_length
|
|
|
|
@property
|
|
def content_hash_b58(self) -> str:
|
|
assert self.content_hash, "Content hash is not set"
|
|
return b58encode(self.content_hash).decode()
|
|
|
|
@property
|
|
def safe_onchain_index(self):
|
|
return self.onchain_index if (not (self.onchain_index is None) and self.onchain_index >= 0) else None
|
|
|
|
def serialize_v2(self, include_accept_type=False) -> str:
|
|
if self.cid_format == 'ipfs':
|
|
if self._raw_value:
|
|
return self._raw_value
|
|
return self._serialize_ipfs()
|
|
|
|
cid_bin = (
|
|
(2).to_bytes(1, 'big') # cid version
|
|
+ self.content_hash
|
|
)
|
|
if not (self.safe_onchain_index is None):
|
|
oi_bin_hex = (lambda x: (lambda y: (f"0{y}" if len(y) % 2 else y))(hex(x)[2:]))(self.safe_onchain_index)
|
|
oi_bin_len = len(oi_bin_hex) // 2
|
|
cid_bin += b'\xb0' + oi_bin_len.to_bytes(1, 'big') + bytes.fromhex(oi_bin_hex)
|
|
if self.accept_type and include_accept_type:
|
|
at_bin_len = len(self.accept_type.encode())
|
|
at_bin = string_to_bytes_fixed_size(self.accept_type, at_bin_len)
|
|
cid_bin += (
|
|
b'\xb1'
|
|
+ at_bin_len.to_bytes(1, 'big')
|
|
+ at_bin
|
|
)
|
|
if self.encryption_key_sha256:
|
|
cid_bin += b'\xb2' + self.encryption_key_sha256
|
|
|
|
return b58encode(cid_bin).decode()
|
|
|
|
def serialize_v1(self) -> str:
|
|
if self.cid_format == 'ipfs':
|
|
raise ValueError("Cannot serialize IPFS CID as ContentId v1")
|
|
at_bin = string_to_bytes_fixed_size(self.accept_type, 15)
|
|
assert len(self.content_hash) == 32, "Invalid hash length"
|
|
if self.onchain_index < 0:
|
|
oi_bin = b''
|
|
else:
|
|
oi_bin = self.onchain_index.to_bytes(16, 'big', signed=False)
|
|
assert len(oi_bin) == 16, "Invalid onchain_index"
|
|
|
|
return b58encode(
|
|
(1).to_bytes(1, 'big') # cid version
|
|
+ at_bin
|
|
+ self.content_hash
|
|
+ oi_bin
|
|
).decode()
|
|
|
|
@classmethod
|
|
def from_v2(cls, cid: str):
|
|
cid_bin = b58decode(cid)
|
|
(
|
|
cid_version,
|
|
content_sha256,
|
|
cid_bin
|
|
) = (
|
|
int.from_bytes(cid_bin[0:1], 'big'),
|
|
cid_bin[1:33],
|
|
cid_bin[33:]
|
|
)
|
|
assert cid_version == 2, "Invalid version"
|
|
params = {}
|
|
while cid_bin:
|
|
param_op = cid_bin[0:1]
|
|
cid_bin = cid_bin[1:]
|
|
if param_op == b'\xb0': # onchain_index
|
|
oi_len = int.from_bytes(cid_bin[0:1], 'big')
|
|
params['onchain_index'] = int.from_bytes(cid_bin[1:1 + oi_len], 'big')
|
|
cid_bin = cid_bin[1 + oi_len:]
|
|
elif param_op == b'\xb1': # accept_type
|
|
at_len = int.from_bytes(cid_bin[0:1], 'big')
|
|
params['accept_type'] = bytes_to_string(cid_bin[1:1 + at_len])
|
|
cid_bin = cid_bin[1 + at_len:]
|
|
elif param_op == b'\xb2': # encryption_key_sha256
|
|
params['encryption_key_sha256'] = cid_bin[1:33]
|
|
cid_bin = cid_bin[33:]
|
|
|
|
return cls(version=2, content_hash=content_sha256, **params)
|
|
|
|
@classmethod
|
|
def from_v1(cls, cid: str):
|
|
cid_bin = b58decode(cid)
|
|
(
|
|
cid_version,
|
|
accept_type,
|
|
content_sha256,
|
|
onchain_index
|
|
) = (
|
|
int.from_bytes(cid_bin[0:1], 'big'),
|
|
bytes_to_string(cid_bin[1:16]),
|
|
cid_bin[16:48],
|
|
int.from_bytes(cid_bin[48:], 'big') if len(cid_bin) > 48 else -1
|
|
)
|
|
assert cid_version == 1, "Invalid version"
|
|
content_type = accept_type.split('/')
|
|
# assert '/'.join(content_type[0:2]) in ALLOWED_CONTENT_TYPES, "Invalid accept type"
|
|
assert len(content_sha256) == 32, "Invalid hash length"
|
|
return cls(
|
|
version=1,
|
|
content_hash=content_sha256,
|
|
onchain_index=onchain_index,
|
|
accept_type=accept_type
|
|
)
|
|
|
|
@classmethod
|
|
def deserialize(cls, cid: str):
|
|
if not cid:
|
|
raise ValueError("Empty content id provided")
|
|
|
|
first_char = cid[0]
|
|
if first_char in ('b', 'B', 'z', 'Z'):
|
|
return cls.from_ipfs(cid)
|
|
|
|
try:
|
|
cid_version = int.from_bytes(b58decode(cid)[0:1], 'big')
|
|
except Exception:
|
|
cid_version = None
|
|
|
|
if cid_version == 1:
|
|
obj = cls.from_v1(cid)
|
|
obj._raw_value = cid
|
|
return obj
|
|
if cid_version == 2:
|
|
obj = cls.from_v2(cid)
|
|
obj._raw_value = cid
|
|
return obj
|
|
|
|
try:
|
|
return cls.from_ipfs(cid)
|
|
except Exception as exc:
|
|
raise ValueError(f"Invalid cid format: {exc}") from exc
|
|
|
|
def json_format(self):
|
|
return {
|
|
"version": self.version,
|
|
"content_hash": self.content_hash_b58,
|
|
"onchain_index": self.safe_onchain_index,
|
|
"accept_type": self.accept_type,
|
|
"encryption_key_sha256": b58encode(self.encryption_key_sha256).decode() if self.encryption_key_sha256 else None,
|
|
"format": self.cid_format,
|
|
"raw": self.serialize_v2() if self.cid_format == 'ipfs' else None,
|
|
}
|
|
|
|
# --- helpers for IPFS/ENCF CID handling ---------------------------------
|
|
|
|
@staticmethod
|
|
def _decode_multibase(cid_str: str) -> Tuple[bytes, Optional[str]]:
|
|
prefix = cid_str[0]
|
|
if prefix in ('b', 'B'):
|
|
payload = cid_str[1:]
|
|
padding = (-len(payload)) % 8
|
|
decoded = b32decode(payload.upper() + ('=' * padding), casefold=True)
|
|
return decoded, prefix.lower()
|
|
if prefix in ('z', 'Z'):
|
|
return b58decode(cid_str[1:]), prefix.lower()
|
|
# CIDv0 without explicit prefix
|
|
return b58decode(cid_str), None
|
|
|
|
@staticmethod
|
|
def _read_varint(data: bytes, offset: int) -> Tuple[int, int]:
|
|
result = 0
|
|
shift = 0
|
|
while True:
|
|
if offset >= len(data):
|
|
raise ValueError("truncated varint")
|
|
byte = data[offset]
|
|
offset += 1
|
|
result |= (byte & 0x7F) << shift
|
|
if not (byte & 0x80):
|
|
break
|
|
shift += 7
|
|
if shift > 63:
|
|
raise ValueError("varint overflow")
|
|
return result, offset
|
|
|
|
@classmethod
|
|
def from_ipfs(cls, cid: str):
|
|
cid = cid.strip()
|
|
payload, multibase_prefix = cls._decode_multibase(cid)
|
|
|
|
idx = 0
|
|
version: Optional[int] = None
|
|
codec: Optional[int] = None
|
|
|
|
if multibase_prefix is not None:
|
|
version, idx = cls._read_varint(payload, idx)
|
|
if version not in (0, 1):
|
|
raise ValueError(f"unsupported CID version: {version}")
|
|
if version == 1:
|
|
codec, idx = cls._read_varint(payload, idx)
|
|
else:
|
|
codec = 0x70 # dag-pb default for CIDv0
|
|
else:
|
|
# CIDv0 without explicit version/codec
|
|
version = 0
|
|
codec = 0x70
|
|
|
|
multihash_code, idx = cls._read_varint(payload, idx)
|
|
multihash_length, idx = cls._read_varint(payload, idx)
|
|
digest = payload[idx:idx + multihash_length]
|
|
if len(digest) != multihash_length:
|
|
raise ValueError("truncated multihash digest")
|
|
if multihash_length != 32:
|
|
raise ValueError("unsupported multihash length (expected 32 bytes)")
|
|
if multihash_code != 0x12:
|
|
raise ValueError(f"unsupported multihash code: {hex(multihash_code)}")
|
|
|
|
return cls(
|
|
version=version,
|
|
content_hash=digest,
|
|
onchain_index=None,
|
|
accept_type=None,
|
|
encryption_key_sha256=None,
|
|
raw_value=cid,
|
|
cid_format='ipfs',
|
|
multibase_prefix=multibase_prefix,
|
|
multicodec=codec,
|
|
multihash_code=multihash_code,
|
|
multihash_length=multihash_length,
|
|
)
|
|
|
|
def _serialize_ipfs(self) -> str:
|
|
if not self.content_hash:
|
|
raise ValueError("Cannot serialize IPFS CID without content hash")
|
|
if self.multibase_prefix is None:
|
|
# default to CIDv0 (base58btc) dag-pb
|
|
multihash = self._encode_varint(self.multihash_code or 0x12) + self._encode_varint(self.multihash_length or len(self.content_hash)) + self.content_hash
|
|
return b58encode(multihash).decode()
|
|
|
|
version_bytes = self._encode_varint(self.version or 1)
|
|
codec_bytes = b''
|
|
if (self.version or 1) == 1:
|
|
codec_bytes = self._encode_varint(self.multicodec or 0x70)
|
|
|
|
multihash = (
|
|
version_bytes +
|
|
codec_bytes +
|
|
self._encode_varint(self.multihash_code or 0x12) +
|
|
self._encode_varint(self.multihash_length or len(self.content_hash)) +
|
|
self.content_hash
|
|
)
|
|
|
|
if self.multibase_prefix == 'z':
|
|
return 'z' + b58encode(multihash).decode()
|
|
if self.multibase_prefix == 'b':
|
|
from base64 import b32encode
|
|
encoded = b32encode(multihash).decode().rstrip('=').lower()
|
|
return 'b' + encoded
|
|
# Fallback to base58btc without prefix
|
|
return b58encode(multihash).decode()
|
|
|
|
@staticmethod
|
|
def _encode_varint(value: int) -> bytes:
|
|
if value < 0:
|
|
raise ValueError("varint cannot encode negative values")
|
|
out = bytearray()
|
|
while True:
|
|
to_write = value & 0x7F
|
|
value >>= 7
|
|
if value:
|
|
out.append(to_write | 0x80)
|
|
else:
|
|
out.append(to_write)
|
|
break
|
|
return bytes(out)
|