uploader-bot/app/core/background/indexer_service.py

228 lines
11 KiB
Python

import asyncio
from base64 import b64decode
from datetime import datetime
from base58 import b58encode
from tonsdk.boc import Cell
from tonsdk.utils import Address
from app.core._blockchain.ton.platform import platform
from app.core._blockchain.ton.toncenter import toncenter
from app.core._utils.send_status import send_status
from app.core.logger import make_log
from app.core.models.node_storage import StoredContent
from app.core._utils.resolve_content import resolve_content
from app.core.models.wallet_connection import WalletConnection
from app.core._keyboards import get_inline_keyboard
from app.core.models._telegram import Wrapped_CBotChat
from app.core.storage import db_session
import os
import traceback
async def indexer_loop(memory, platform_found: bool, seqno: int) -> [bool, int]:
if not platform_found:
platform_state = await toncenter.get_account(platform.address.to_string(1, 1, 1))
if not platform_state.get('code'):
make_log("TON", "Platform contract is not deployed, skipping loop", level="info")
await send_status("indexer", "not working: platform is not deployed")
return False, seqno
else:
platform_found = True
make_log("Indexer", "Service running", level="debug")
with db_session() as session:
content_without_cid = session.query(StoredContent).filter(
StoredContent.content_id == None
)
for target_content in content_without_cid:
target_cid = target_content.cid.serialize_v2()
make_log("Indexer", f"Content without CID: {target_content.hash}, setting CID: {target_cid}", level="debug")
target_content.content_id = target_cid
session.commit()
last_known_index = session.query(StoredContent).filter(
StoredContent.onchain_index != None
).order_by(StoredContent.onchain_index.desc()).first()
last_known_index = last_known_index.onchain_index if last_known_index else 0
last_known_index = max(last_known_index, 0)
make_log("Indexer", f"Last known index: {last_known_index}", level="debug")
next_item_index = last_known_index + 1
resolve_item_result = await toncenter.run_get_method(platform.address.to_string(1, 1, 1), 'get_nft_address_by_index', [['num', hex(next_item_index)]])
if resolve_item_result.get('exit_code', -1) != 0:
make_log("Indexer", f"Resolve item error: {resolve_item_result}", level="error")
return platform_found, seqno
item_address_cell_b64 = resolve_item_result['stack'][0][1]["bytes"]
item_address_slice = Cell.one_from_boc(b64decode(item_address_cell_b64)).begin_parse()
item_address = item_address_slice.read_msg_addr()
item_get_data_result = await toncenter.run_get_method(item_address.to_string(1, 1, 1), 'indexator_data')
if item_get_data_result.get('exit_code', -1) != 0:
make_log("Indexer", f"Get item data error (maybe not deployed): {item_get_data_result}", level="debug")
return platform_found, seqno
assert item_get_data_result['stack'][0][0] == 'num', "Item type is not a number"
assert int(item_get_data_result['stack'][0][1], 16) == 1, "Item is not COP NFT"
item_returned_address = Cell.one_from_boc(b64decode(item_get_data_result['stack'][1][1]['bytes'])).begin_parse().read_msg_addr()
assert (
item_returned_address.to_string(1, 1, 1) == item_address.to_string(1, 1, 1)
), "Item address mismatch"
assert item_get_data_result['stack'][2][0] == 'num', "Item index is not a number"
item_index = int(item_get_data_result['stack'][2][1], 16)
assert item_index == next_item_index, "Item index mismatch"
item_platform_address = Cell.one_from_boc(b64decode(item_get_data_result['stack'][3][1]['bytes'])).begin_parse().read_msg_addr()
assert item_platform_address.to_string(1, 1, 1) == Address(platform.address.to_string(1, 1, 1)).to_string(1, 1, 1), "Item platform address mismatch"
assert item_get_data_result['stack'][4][0] == 'num', "Item license type is not a number"
item_license_type = int(item_get_data_result['stack'][4][1], 16)
assert item_license_type == 0, "Item license type is not 0"
item_owner_address = Cell.one_from_boc(b64decode(item_get_data_result['stack'][5][1]["bytes"])).begin_parse().read_msg_addr()
item_values = Cell.one_from_boc(b64decode(item_get_data_result['stack'][6][1]['bytes']))
item_derivates = Cell.one_from_boc(b64decode(item_get_data_result['stack'][7][1]['bytes']))
item_platform_variables = Cell.one_from_boc(b64decode(item_get_data_result['stack'][8][1]['bytes']))
item_distribution = Cell.one_from_boc(b64decode(item_get_data_result['stack'][9][1]['bytes']))
item_distribution_slice = item_distribution.begin_parse()
item_prices_slice = item_distribution_slice.refs[0].begin_parse()
item_listen_license_price = item_prices_slice.read_coins()
item_use_license_price = item_prices_slice.read_coins()
item_resale_license_price = item_prices_slice.read_coins()
item_values_slice = item_values.begin_parse()
item_content_hash_int = item_values_slice.read_uint(256)
item_content_hash = item_content_hash_int.to_bytes(32, 'big')
# item_content_hash_str = b58encode(item_content_hash).decode()
item_metadata = item_values_slice.refs[0]
item_content = item_values_slice.refs[1]
item_metadata_str = item_metadata.bits.array.decode()
item_content_cid_str = item_content.refs[0].bits.array.decode()
item_content_cover_cid_str = item_content.refs[1].bits.array.decode()
item_content_metadata_cid_str = item_content.refs[2].bits.array.decode()
item_content_cid, err = resolve_content(item_content_cid_str)
item_content_hash = item_content_cid.content_hash
item_content_hash_str = item_content_cid.content_hash_b58
item_metadata_packed = {
'license_type': item_license_type,
'item_address': item_address.to_string(1, 1, 1),
'content_cid': item_content_cid_str,
'cover_cid': item_content_cover_cid_str,
'metadata_cid': item_content_metadata_cid_str,
'derivates': b58encode(item_derivates.to_boc(False)).decode(),
'platform_variables': b58encode(item_platform_variables.to_boc(False)).decode(),
'license': {
'listen': {
'price': str(item_listen_license_price)
},
'resale': {
'price': str(item_listen_license_price)
}
}
}
user_wallet_connection = None
if item_owner_address:
user_wallet_connection = session.query(WalletConnection).filter(
WalletConnection.wallet_address == item_owner_address.to_string(1, 1, 1)
).first()
encrypted_stored_content = session.query(StoredContent).filter(
StoredContent.hash == item_content_hash_str,
# StoredContent.type.like("local%")
).first()
if encrypted_stored_content:
is_duplicate = encrypted_stored_content.type.startswith("onchain") \
and encrypted_stored_content.onchain_index != item_index
if not is_duplicate:
if encrypted_stored_content.type.startswith('local'):
encrypted_stored_content.type = "onchain/content" + ("_unknown" if (encrypted_stored_content.key_id is None) else "")
encrypted_stored_content.onchain_index = item_index
encrypted_stored_content.owner_address = item_owner_address.to_string(1, 1, 1)
user = None
if user_wallet_connection:
encrypted_stored_content.user_id = user_wallet_connection.user_id
user = user_wallet_connection.user
await (Wrapped_CBotChat(memory._telegram_bot, chat_id=user.telegram_id)).send_message(
user.translated('p_contentWasIndexed').format(
item_address=item_address.to_string(1, 1, 1),
item_index=item_index,
),
message_type='notification',
reply_markup=get_inline_keyboard([
[{
'text': user.translated('home_button'),
'callback_data': 'home'
}]
])
)
elif encrypted_stored_content.type.startswith('onchain') and encrypted_stored_content.onchain_index == item_index:
encrypted_stored_content.type = "onchain/content" + ("_unknown" if (encrypted_stored_content.key_id is None) else "")
encrypted_stored_content.owner_address = item_owner_address.to_string(1, 1, 1)
if user_wallet_connection:
encrypted_stored_content.user_id = user_wallet_connection.user_id
else:
make_log("Indexer", f"[CRITICAL] Item already indexed and ERRORED!: {item_content_hash_str}", level="error")
return platform_found, seqno
encrypted_stored_content.updated = datetime.now()
if encrypted_stored_content.meta != item_metadata_packed:
encrypted_stored_content.meta = item_metadata_packed
session.commit()
return platform_found, seqno
else:
item_metadata_packed['copied_from'] = encrypted_stored_content.id
item_metadata_packed['copied_from_cid'] = encrypted_stored_content.cid.serialize_v2()
item_content_hash_str = f"{b58encode(bytes(16) + os.urandom(30)).decode()}" # check this for vulnerability
onchain_stored_content = StoredContent(
type="onchain/content_unknown",
hash=item_content_hash_str,
onchain_index=item_index,
owner_address=item_owner_address.to_string(1, 1, 1) if item_owner_address else None,
meta=item_metadata_packed,
filename="UNKNOWN_ENCRYPTED_CONTENT",
user_id=user_wallet_connection.user_id if user_wallet_connection else None,
created=datetime.now(),
encrypted=True,
decrypted_content_id=None,
key_id=None,
updated=datetime.now()
)
session.add(onchain_stored_content)
session.commit()
make_log("Indexer", f"Item indexed: {item_content_hash_str}", level="info")
last_known_index += 1
return platform_found, seqno
async def main_fn(memory, ):
make_log("Indexer", "Service started", level="info")
platform_found = False
seqno = 0
while True:
try:
platform_found, seqno = await indexer_loop(memory, platform_found, seqno)
except BaseException as e:
make_log("Indexer", f"Error: {e}" + '\n' + traceback.format_exc(), level="error")
if platform_found:
await send_status("indexer", f"working (seqno={seqno})")
await asyncio.sleep(5)
seqno += 1
# if __name__ == '__main__':
# loop = asyncio.get_event_loop()
# loop.run_until_complete(main())
# loop.close()