uploader-bot/app/api/routes/_blockchain.py

176 lines
7.9 KiB
Python

from base64 import b64encode
from datetime import datetime
import traceback
from sanic import response
from tonsdk.boc import begin_cell, begin_dict
from tonsdk.utils import Address
from app.core._blockchain.ton.connect import TonConnect, wallet_obj_by_name
from app.core._blockchain.ton.platform import platform
from app.core._config import PROJECT_HOST
from app.core.logger import make_log
from app.core._utils.resolve_content import resolve_content
from app.core.content.utils import create_metadata_for_item
from app.core._crypto.content import create_encrypted_content
from app.core.models.node_storage import StoredContent
from app.core.models._telegram import Wrapped_CBotChat
from app.core._keyboards import get_inline_keyboard
def valid_royalty_params(royalty_params):
assert sum([x['value'] for x in royalty_params]) == 10000, "Values of royalties should sum to 10000"
for royalty_param in royalty_params:
for field_key, field_value in {
'address': lambda x: isinstance(x, str),
'value': lambda x: (isinstance(x, int) and 0 <= x <= 10000)
}.items():
assert field_key in royalty_param, f"No {field_key} provided"
assert field_value(royalty_param[field_key]), f"Invalid {field_key} provided"
return True
async def s_api_v1_blockchain_send_new_content_message(request):
try:
assert request.json, "No data provided"
assert request.ctx.user, "No authorized user provided"
if not request.json['hashtags']:
request.json['hashtags'] = []
for field_key, field_value in {
'title': lambda x: isinstance(x, str),
'authors': lambda x: isinstance(x, list),
'content': lambda x: isinstance(x, str),
'image': lambda x: isinstance(x, str),
'description': lambda x: isinstance(x, str),
'price': lambda x: (isinstance(x, str) and x.isdigit()),
'allowResale': lambda x: isinstance(x, bool),
'royaltyParams': lambda x: (isinstance(x, list) and valid_royalty_params(x)),
'hashtags': lambda x: isinstance(x, list) and all([isinstance(y, str) for y in x])
}.items():
assert field_key in request.json, f"No {field_key} provided"
assert field_value(request.json[field_key]), f"Invalid {field_key} provided"
wallet_connection = request.ctx.user.wallet_connection(request.ctx.db_session)
assert wallet_connection, "No wallet connection found"
decrypted_content_cid, err = resolve_content(request.json['content'])
assert not err, f"Invalid content CID"
decrypted_content = request.ctx.db_session.query(StoredContent).filter(
StoredContent.hash == decrypted_content_cid.content_hash_b58
).first()
assert decrypted_content, "No content locally found"
assert decrypted_content.type == "local/content_bin", "Invalid content type"
encrypted_content = await create_encrypted_content(request.ctx.db_session, decrypted_content)
encrypted_content_cid = encrypted_content.cid
if request.json['image']:
image_content_cid, err = resolve_content(request.json['image'])
assert not err, f"Invalid image CID"
image_content = request.ctx.db_session.query(StoredContent).filter(
StoredContent.hash == image_content_cid.content_hash_b58
).first()
assert image_content, "No image locally found"
else:
image_content_cid = None
image_content = None
metadata_content = await create_metadata_for_item(
request.ctx.db_session,
title=f"{', '.join(request.json['authors'])} - {request.json['title']}" if request.json['authors'] else request.json['title'],
cover_url=f"{PROJECT_HOST}/api/v1/storage/{image_content_cid.serialize_v2()}" if image_content_cid else None,
authors=request.json['authors'],
hashtags=request.json['hashtags']
)
royalties_dict = begin_dict(8)
i = 0
for royalty_param in request.json['royaltyParams']:
royalties_dict.store_ref(
i, begin_cell()
.store_address(Address(royalty_param['address']))
.store_uint(royalty_param['value'], 16)
.end_cell()
)
i += 1
return response.json({
'address': platform.address.to_string(1, 1, 1),
'amount': str(int(0.15 * 10 ** 9)),
'payload': b64encode(
begin_cell()
.store_uint(0x5491d08c, 32)
.store_uint(int.from_bytes(encrypted_content_cid.content_hash, "big", signed=False), 256)
.store_ref(
begin_cell()
.store_ref(
begin_cell()
.store_coins(int(30000000))
.store_coins(int(30000000))
.store_coins(int(request.json['price']))
.end_cell()
)
.store_maybe_ref(royalties_dict.end_dict())
.store_uint(0, 1)
.end_cell()
)
.store_ref(
begin_cell()
.store_ref(
begin_cell()
.store_bytes(f"{PROJECT_HOST}/api/v1/storage/{metadata_content.cid.serialize_v2(include_accept_type=True)}".encode())
.end_cell()
)
.store_ref(
begin_cell()
.store_ref(begin_cell().store_bytes(f"{encrypted_content_cid.serialize_v2()}".encode()).end_cell())
.store_ref(begin_cell().store_bytes(f"{image_content_cid.serialize_v2() if image_content_cid else ''}".encode()).end_cell())
.store_ref(begin_cell().store_bytes(f"{metadata_content.cid.serialize_v2()}".encode()).end_cell())
.end_cell()
)
.end_cell()
)
.end_cell().to_boc(False)
).decode()
})
except BaseException as e:
make_log("Blockchain", f"Error while sending new content message: {e}" + '\n' + traceback.format_exc(), level='error')
return response.json({"error": str(e)}, status=400)
async def s_api_v1_blockchain_send_purchase_content_message(request):
assert request.json, "No data provided"
for field_key, field_value in {
'content_address': lambda x: isinstance(x, str),
'license_type': lambda x: x in ['resale']
}.items():
assert field_key in request.json, f"No {field_key} provided"
assert field_value(request.json[field_key]), f"Invalid {field_key} provided"
r_content = StoredContent.from_cid(request.ctx.db_session, request.json['content_address'])
content = r_content.open_content(request.ctx.db_session)
licenses_cost = content['encrypted_content'].json_format()['license']
assert request.json['license_type'] in licenses_cost
return response.json({
'address': content['encrypted_content'].json_format()['item_address'],
'amount': str(int(licenses_cost['resale']['price'])),
'payload': b64encode((
begin_cell()
.store_uint(0x2a319593, 32)
.store_uint(0, 64)
.store_uint(3, 8)
# .store_uint({
# 'listen': 1,
# 'resale': 3
# }[request.json['license_type']], 8)
.store_uint(0, 256)
.store_coins(str(int(licenses_cost['resale']['price'])))
.end_cell()
).to_boc(False)).decode()
})