uploader-bot/app/core/models/content/indexation_mixins.py

86 lines
4.0 KiB
Python

import traceback
import base58
from sqlalchemy import and_
from app.core.logger import make_log
from app.core.models import StoredContent
from app.core.models.wallet_connection import WalletConnection
from app.core._blockchain.ton.toncenter import toncenter
from tonsdk.boc import Cell
from base64 import b64decode
def unpack_item_indexator_data(item_get_data_result):
result = {}
assert item_get_data_result['stack'][0][0] == 'num', "Type is not a number"
result['type'] = int(item_get_data_result['stack'][0][1], 16)
result['address'] = Cell.one_from_boc(
b64decode(item_get_data_result['stack'][1][1]['bytes'])).begin_parse().read_msg_addr().to_string(1, 1, 1)
assert item_get_data_result['stack'][2][0] == 'num', "Index is not a number"
result['index'] = int(item_get_data_result['stack'][2][1], 16)
result['platform_address'] = Cell.one_from_boc(
b64decode(item_get_data_result['stack'][3][1]['bytes'])).begin_parse().read_msg_addr().to_string(1, 1, 1)
assert item_get_data_result['stack'][4][0] == 'num', "License type is not a number"
result['license_type'] = int(item_get_data_result['stack'][4][1], 16)
result['owner_address'] = Cell.one_from_boc(
b64decode(item_get_data_result['stack'][5][1]["bytes"])).begin_parse().read_msg_addr().to_string(1, 1, 1)
result['values'] = Cell.one_from_boc(b64decode(item_get_data_result['stack'][6][1]['bytes']))
result['derivates'] = Cell.one_from_boc(b64decode(item_get_data_result['stack'][7][1]['bytes']))
result['platform_variables'] = Cell.one_from_boc(b64decode(item_get_data_result['stack'][8][1]['bytes']))
result['distribution'] = Cell.one_from_boc(b64decode(item_get_data_result['stack'][9][1]['bytes']))
return result
class NodeStorageIndexationMixin:
pass # async def fetch_onchain_metadata(self):
class UserContentIndexationMixin:
async def sync_with_chain(self, db_session):
errored = False
cc_indexator_result = await toncenter.run_get_method(self.onchain_address, 'indexator_data')
if cc_indexator_result.get('exit_code', -1) != 0:
errored = True
if not errored:
try:
cc_indexator_data = unpack_item_indexator_data(cc_indexator_result)
assert cc_indexator_data['type'] == 1, "Type is not a content"
assert cc_indexator_data['address'] == self.onchain_address, "Address is not equal"
values_slice = cc_indexator_data['values'].begin_parse()
content_hash_b58 = base58.b58encode(bytes.fromhex(hex(values_slice.read_uint(256))[2:])).decode()
stored_content = db_session.query(StoredContent).filter(
and_(
StoredContent.type == 'onchain/content',
StoredContent.hash == content_hash_b58,
)
).first()
trusted_cop_address_result = await toncenter.run_get_method(stored_content.meta['item_address'], 'get_nft_address_by_index', [['num', cc_indexator_data['index']]])
assert trusted_cop_address_result.get('exit_code', -1) == 0, "Trusted cop address error"
trusted_cop_address = Cell.one_from_boc(b64decode(trusted_cop_address_result['stack'][0][1]['bytes'])).begin_parse().read_msg_addr().to_string(1, 1, 1)
make_log("UserContent", f"Trusted cop address: {trusted_cop_address} / Contract address: {trusted_cop_address}", level="info")
assert trusted_cop_address == cc_indexator_data['address'], "Trusted cop address is not equal"
self.owner_address = cc_indexator_data['owner_address']
self.type = 'nft/listen'
self.content_id = stored_content.id
db_session.commit()
except BaseException as e:
errored = True
make_log("UserContent", f"Error: {e}" + '\n' + traceback.format_exc(), level="error")
if errored is True:
self.type = 'nft/unknown'
self.content_id = None
db_session.commit()