uploader-bot/app/api/routes/_blockchain.py

406 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from base64 import b64encode, b32decode
from datetime import datetime
import traceback
from sanic import response
from sqlalchemy import and_, select, func
from tonsdk.boc import begin_cell, begin_dict
from tonsdk.utils import Address
from base58 import b58encode, b58decode
from app.core._blockchain.ton.connect import TonConnect, wallet_obj_by_name
from app.core._blockchain.ton.platform import platform
from app.core._config import PROJECT_HOST
from app.core.logger import make_log
from app.core._utils.resolve_content import resolve_content
from app.core.content.utils import create_metadata_for_item
from app.core._crypto.content import create_encrypted_content
from app.core.models.content.user_content import UserContent
from app.core.models.node_storage import StoredContent
from app.core.models._telegram import Wrapped_CBotChat
from app.core._keyboards import get_inline_keyboard
from app.core.models.promo import PromoAction
from app.core.models.tasks import BlockchainTask
def valid_royalty_params(royalty_params):
assert sum([x['value'] for x in royalty_params]) == 10000, "Values of royalties should sum to 10000"
for royalty_param in royalty_params:
for field_key, field_value in {
'address': lambda x: isinstance(x, str),
'value': lambda x: (isinstance(x, int) and 0 <= x <= 10000)
}.items():
assert field_key in royalty_param, f"No {field_key} provided"
assert field_value(royalty_param[field_key]), f"Invalid {field_key} provided"
return True
async def s_api_v1_blockchain_send_new_content_message(request):
try:
assert request.json, "No data provided"
assert request.ctx.user, "No authorized user provided"
if not request.json['hashtags']:
request.json['hashtags'] = []
for field_key, field_value in {
'title': lambda x: isinstance(x, str),
'authors': lambda x: isinstance(x, list),
'content': lambda x: isinstance(x, str), # may be plaintext CID (legacy) or encrypted IPFS CID (bafy...)
'image': lambda x: isinstance(x, str),
'description': lambda x: isinstance(x, str),
'price': lambda x: (isinstance(x, str) and x.isdigit()),
'allowResale': lambda x: isinstance(x, bool),
'royaltyParams': lambda x: (isinstance(x, list) and valid_royalty_params(x)),
'hashtags': lambda x: isinstance(x, list) and all([isinstance(y, str) for y in x])
}.items():
assert field_key in request.json, f"No {field_key} provided"
assert field_value(request.json[field_key]), f"Invalid {field_key} provided"
# Support legacy: 'content' as decrypted ContentId; and new: 'content' as encrypted IPFS CID
encrypted_content_cid = None
try:
# Legacy path
decrypted_content_cid, err = resolve_content(request.json['content'])
assert not err
decrypted_content = (await request.ctx.db_session.execute(
select(StoredContent).where(StoredContent.hash == decrypted_content_cid.content_hash_b58)
)).scalars().first()
assert decrypted_content and decrypted_content.type == "local/content_bin"
encrypted_content = await create_encrypted_content(request.ctx.db_session, decrypted_content)
encrypted_content_cid = encrypted_content.cid
except BaseException:
# New path: treat provided string as encrypted IPFS CID (ENCF v1)
encrypted_ipfs_cid = request.json['content']
class _EC:
"""Adapter to provide ContentId-like interface for IPFS CID strings."""
def __init__(self, cid_str: str):
self._cid = cid_str
self.content_hash = self._extract_content_hash(cid_str)
self._content_hash_b58 = None
@staticmethod
def _decode_multibase(cid_str: str) -> bytes:
if not cid_str:
raise ValueError("empty CID")
prefix = cid_str[0]
if prefix in ('b', 'B'):
payload = cid_str[1:]
padding = (-len(payload)) % 8
return b32decode(payload.upper() + ('=' * padding), casefold=True)
if prefix in ('z', 'Z'):
return b58decode(cid_str[1:])
# CIDv0 (base58btc without explicit multibase prefix)
return b58decode(cid_str)
@staticmethod
def _read_varint(data: bytes, offset: int):
result = 0
shift = 0
while True:
if offset >= len(data):
raise ValueError("truncated varint")
byte = data[offset]
offset += 1
result |= (byte & 0x7F) << shift
if not (byte & 0x80):
break
shift += 7
if shift > 63:
raise ValueError("varint overflow")
return result, offset
@classmethod
def _extract_content_hash(cls, cid_str: str) -> bytes:
data = cls._decode_multibase(cid_str)
offset = 0
if data and data[0] == 0x01:
version, offset = cls._read_varint(data, offset)
if version != 1:
raise ValueError("unsupported CID version")
_, offset = cls._read_varint(data, offset) # skip codec
code, offset = cls._read_varint(data, offset)
length, offset = cls._read_varint(data, offset)
digest = data[offset:offset + length]
if len(digest) != length:
raise ValueError("truncated multihash digest")
if code != 0x12 or length != 32:
raise ValueError("unsupported multihash (expect sha2-256)")
return digest
def serialize_v2(self, include_accept_type: bool = False):
return self._cid
@property
def content_hash_b58(self) -> str:
if self._content_hash_b58 is None:
self._content_hash_b58 = b58encode(self.content_hash).decode()
return self._content_hash_b58
try:
encrypted_content_cid = _EC(encrypted_ipfs_cid)
except Exception as exc:
make_log("Blockchain", f"Provided encrypted IPFS CID is invalid: {exc}", level='error')
raise AssertionError("Invalid encrypted content CID provided") from exc
if request.json['image']:
image_content_cid, err = resolve_content(request.json['image'])
assert not err, f"Invalid image CID"
image_content = (await request.ctx.db_session.execute(
select(StoredContent).where(StoredContent.hash == image_content_cid.content_hash_b58)
)).scalars().first()
assert image_content, "No image locally found"
else:
image_content_cid = None
image_content = None
content_title = f"{', '.join(request.json['authors'])} {request.json['title']}" if request.json['authors'] else request.json['title']
metadata_content = await create_metadata_for_item(
request.ctx.db_session,
title=content_title,
cover_url=f"{PROJECT_HOST}/api/v1.5/storage/{image_content_cid.serialize_v2()}" if image_content_cid else None,
authors=request.json['authors'],
hashtags=request.json['hashtags'],
downloadable=request.json['downloadable'] if 'downloadable' in request.json else False,
)
# Try to update ContentIndexItem with cover_url for this encrypted content
try:
from app.core.models.content_v3 import ContentIndexItem
ecid_str = encrypted_content_cid.serialize_v2()
row = (await request.ctx.db_session.execute(select(ContentIndexItem).where(ContentIndexItem.encrypted_cid == ecid_str))).scalars().first()
if row:
payload = row.payload or {}
payload['cover_url'] = f"{PROJECT_HOST}/api/v1.5/storage/{image_content_cid.serialize_v2()}" if image_content_cid else None
row.payload = payload
await request.ctx.db_session.commit()
except Exception as _e:
make_log("Blockchain", f"index cover update failed: {_e}", level='warning')
royalties_dict = begin_dict(8)
i = 0
for royalty_param in request.json['royaltyParams']:
royalties_dict.store_ref(
i, begin_cell()
.store_address(Address(royalty_param['address']))
.store_uint(royalty_param['value'], 16)
.end_cell()
)
i += 1
_cnt = (await request.ctx.db_session.execute(
select(func.count()).select_from(PromoAction).where(
and_(
PromoAction.user_internal_id == request.ctx.user.id,
PromoAction.action_type == 'freeUpload'
)
)
)).scalar()
promo_free_upload_available = 3 - int(_cnt or 0)
has_pending_task = (await request.ctx.db_session.execute(
select(BlockchainTask).where(
and_(BlockchainTask.user_id == request.ctx.user.id, BlockchainTask.status != 'done')
)
)).scalars().first()
if has_pending_task:
make_log("Blockchain", f"User {request.ctx.user.id} already has a pending task", level='warning')
promo_free_upload_available = 0
make_log("Blockchain", f"User {request.ctx.user.id} has {promo_free_upload_available} free uploads available", level='info')
if promo_free_upload_available > 0:
promo_action = PromoAction(
user_id = str(request.ctx.user.id),
user_internal_id=request.ctx.user.id,
action_type='freeUpload',
action_ref=str(encrypted_content_cid.content_hash),
created=datetime.now()
)
request.ctx.db_session.add(promo_action)
blockchain_task = BlockchainTask(
destination=platform.address.to_string(1, 1, 1),
amount=str(int(0.03 * 10 ** 9)),
payload=b64encode(
begin_cell()
.store_uint(0x5491d08c, 32)
.store_uint(int.from_bytes(encrypted_content_cid.content_hash, "big", signed=False), 256)
.store_address(Address(await request.ctx.user.wallet_address_async(request.ctx.db_session)))
.store_ref(
begin_cell()
.store_ref(
begin_cell()
.store_coins(int(0))
.store_coins(int(0))
.store_coins(int(request.json['price']))
.end_cell()
)
.store_maybe_ref(royalties_dict.end_dict())
.store_uint(0, 1)
.end_cell()
)
.store_ref(
begin_cell()
.store_ref(
begin_cell()
.store_bytes(f"{PROJECT_HOST}/api/v1.5/storage/{metadata_content.cid.serialize_v2(include_accept_type=True)}".encode())
.end_cell()
)
.store_ref(
begin_cell()
.store_ref(begin_cell().store_bytes(f"{encrypted_content_cid.serialize_v2()}".encode()).end_cell())
.store_ref(begin_cell().store_bytes(f"{image_content_cid.serialize_v2() if image_content_cid else ''}".encode()).end_cell())
.store_ref(begin_cell().store_bytes(f"{metadata_content.cid.serialize_v2()}".encode()).end_cell())
.end_cell()
)
.end_cell()
)
.end_cell().to_boc(False)
).decode(),
epoch=None, seqno=None,
created = datetime.now(),
status='wait',
user_id = request.ctx.user.id
)
request.ctx.db_session.add(blockchain_task)
await request.ctx.db_session.commit()
await request.ctx.user_uploader_wrapper.send_message(
request.ctx.user.translated('p_uploadContentTxPromo').format(
title=content_title,
free_count=(promo_free_upload_available - 1)
), message_type='hint', message_meta={
'encrypted_content_hash': b58encode(encrypted_content_cid.content_hash).decode(),
'hint_type': 'uploadContentTxRequested'
}
)
return response.json({
'address': "free",
'amount': str(int(0.03 * 10 ** 9)),
'payload': ""
})
await request.ctx.user_uploader_wrapper.send_message(
request.ctx.user.translated('p_uploadContentTxRequested').format(
title=content_title,
), message_type='hint', message_meta={
'encrypted_content_hash': b58encode(encrypted_content_cid.content_hash).decode(),
'hint_type': 'uploadContentTxRequested'
}
)
return response.json({
'address': platform.address.to_string(1, 1, 1),
'amount': str(int(0.03 * 10 ** 9)),
'payload': b64encode(
begin_cell()
.store_uint(0x5491d08c, 32)
.store_uint(int.from_bytes(encrypted_content_cid.content_hash, "big", signed=False), 256)
.store_uint(0, 2)
.store_ref(
begin_cell()
.store_ref(
begin_cell()
.store_coins(int(0))
.store_coins(int(0))
.store_coins(int(request.json['price']))
.end_cell()
)
.store_maybe_ref(royalties_dict.end_dict())
.store_uint(0, 1)
.end_cell()
)
.store_ref(
begin_cell()
.store_ref(
begin_cell()
.store_bytes(f"{PROJECT_HOST}/api/v1.5/storage/{metadata_content.cid.serialize_v2(include_accept_type=True)}".encode())
.end_cell()
)
.store_ref(
begin_cell()
.store_ref(begin_cell().store_bytes(f"{encrypted_content_cid.serialize_v2()}".encode()).end_cell())
.store_ref(begin_cell().store_bytes(f"{image_content_cid.serialize_v2() if image_content_cid else ''}".encode()).end_cell())
.store_ref(begin_cell().store_bytes(f"{metadata_content.cid.serialize_v2()}".encode()).end_cell())
.end_cell()
)
.end_cell()
)
.end_cell().to_boc(False)
).decode()
})
except BaseException as e:
make_log("Blockchain", f"Error while sending new content message: {e}" + '\n' + traceback.format_exc(), level='error')
return response.json({"error": str(e)}, status=400)
async def s_api_v1_blockchain_send_purchase_content_message(request):
assert request.json, "No data provided"
for field_key, field_value in {
'content_address': lambda x: isinstance(x, str),
'license_type': lambda x: x in ['resale']
}.items():
assert field_key in request.json, f"No {field_key} provided"
assert field_value(request.json[field_key]), f"Invalid {field_key} provided"
if not (await request.ctx.user.wallet_address_async(request.ctx.db_session)):
return response.json({"error": "No wallet address provided"}, status=400)
from sqlalchemy import select
license_exist = (await request.ctx.db_session.execute(select(UserContent).where(
UserContent.onchain_address == request.json['content_address']
))).scalars().first()
if license_exist:
from app.core.content.content_id import ContentId
_cid = ContentId.deserialize(license_exist.content.cid.serialize_v2())
r_content = (await request.ctx.db_session.execute(select(StoredContent).where(StoredContent.hash == _cid.content_hash_b58))).scalars().first()
else:
from app.core.content.content_id import ContentId
_cid = ContentId.deserialize(request.json['content_address'])
r_content = (await request.ctx.db_session.execute(select(StoredContent).where(StoredContent.hash == _cid.content_hash_b58))).scalars().first()
async def open_content_async(session, sc: StoredContent):
if not sc.encrypted:
decrypted = sc
encrypted = (await session.execute(select(StoredContent).where(StoredContent.decrypted_content_id == sc.id))).scalars().first()
else:
encrypted = sc
decrypted = (await session.execute(select(StoredContent).where(StoredContent.id == sc.decrypted_content_id))).scalars().first()
assert decrypted and encrypted, "Can't open content"
ctype = decrypted.json_format().get('content_type', 'application/x-binary')
try:
content_type = ctype.split('/')[0]
except Exception:
content_type = 'application'
return {'encrypted_content': encrypted, 'decrypted_content': decrypted, 'content_type': content_type}
content = await open_content_async(request.ctx.db_session, r_content)
licenses_cost = content['encrypted_content'].json_format()['license']
assert request.json['license_type'] in licenses_cost
return response.json({
'address': (
license_exist.onchain_address if license_exist else content['encrypted_content'].json_format()['item_address']
),
'amount': str(int(licenses_cost['resale']['price'])),
'payload': b64encode((
begin_cell()
.store_uint(0x2a319593, 32)
.store_uint(0, 64)
.store_uint(3, 8)
# .store_uint({
# 'listen': 1,
# 'resale': 3
# }[request.json['license_type']], 8)
.store_uint(0, 256)
.store_uint(0, 2)
.end_cell()
).to_boc(False)).decode()
})