import asyncio from base64 import b64decode from datetime import datetime from base58 import b58encode from sqlalchemy import String, and_, desc, cast from tonsdk.boc import Cell from tonsdk.utils import Address from app.core._config import CLIENT_TELEGRAM_BOT_USERNAME from app.core._blockchain.ton.platform import platform from app.core._blockchain.ton.toncenter import toncenter from app.core._utils.send_status import send_status from app.core.logger import make_log from app.core.models import UserContent, KnownTelegramMessage, ServiceConfig from app.core.models.node_storage import StoredContent from app.core._utils.resolve_content import resolve_content from app.core.models.wallet_connection import WalletConnection from app.core._keyboards import get_inline_keyboard from app.core.models._telegram import Wrapped_CBotChat from sqlalchemy import select, and_, desc from app.core.storage import db_session import os import traceback async def indexer_loop(memory, platform_found: bool, seqno: int) -> [bool, int]: if not platform_found: platform_state = await toncenter.get_account(platform.address.to_string(1, 1, 1)) if not platform_state.get('code'): make_log("TON", "Platform contract is not deployed, skipping loop", level="info") await send_status("indexer", "not working: platform is not deployed") return False, seqno else: platform_found = True make_log("Indexer", "Service running", level="debug") async with db_session() as session: try: result = await toncenter.run_get_method('EQD8TJ8xEWB1SpnRE4d89YO3jl0W0EiBnNS4IBaHaUmdfizE', 'get_pool_data') assert result['exit_code'] == 0, f"Error in get-method: {result}" assert result['stack'][0][0] == 'num', f"get first element is not num" assert result['stack'][1][0] == 'num', f"get second element is not num" usdt_per_ton = (int(result['stack'][0][1], 16) * 1e3) / int(result['stack'][1][1], 16) ton_per_star = 0.014 / usdt_per_ton await ServiceConfig(session).set('live_tonPerStar', [ton_per_star, datetime.utcnow().timestamp()]) make_log("TON_Daemon", f"TON per STAR price: {ton_per_star}", level="DEBUG") except BaseException as e: make_log("TON_Daemon", f"Error while saving TON per STAR price: {e}" + '\n' + traceback.format_exc(), level="ERROR") from sqlalchemy import cast from sqlalchemy.dialects.postgresql import JSONB new_licenses = (await session.execute(select(UserContent).where( and_( ~(cast(UserContent.meta, JSONB).contains({'notification_sent': True})), UserContent.type == 'nft/listen' ) ))).scalars().all() for new_license in new_licenses: licensed_content = (await session.execute(select(StoredContent).where( StoredContent.id == new_license.content_id ))).scalars().first() if not licensed_content: make_log("Indexer", f"Licensed content not found: {new_license.content_id}", level="error") content_metadata = licensed_content.metadata_json(session) assert content_metadata, "No content metadata found" if not (licensed_content.owner_address == new_license.owner_address): try: user = new_license.user if user.telegram_id and licensed_content: await (Wrapped_CBotChat(memory._client_telegram_bot, chat_id=user.telegram_id, user=user, db_session=session)).send_content( session, licensed_content ) wallet_owner_connection = (await session.execute( select(WalletConnection).where( WalletConnection.wallet_address == licensed_content.owner_address, WalletConnection.invalidated == False ).order_by(desc(WalletConnection.id)) )).scalars().first() wallet_owner_user = wallet_owner_connection.user if wallet_owner_user.telegram_id: wallet_owner_bot = Wrapped_CBotChat(memory._telegram_bot, chat_id=wallet_owner_user.telegram_id, user=wallet_owner_user, db_session=session) await wallet_owner_bot.send_message( user.translated('p_licenseWasBought').format( username=user.front_format(), nft_address=f'"https://tonviewer.com/{new_license.onchain_address}"', content_title=content_metadata.get('name', 'Unknown'), ), message_type='notification', ) except BaseException as e: make_log("IndexerSendNewLicense", f"Error: {e}" + '\n' + traceback.format_exc(), level="error") new_license.meta = {**new_license.meta, 'notification_sent': True} await session.commit() content_without_cid = (await session.execute(select(StoredContent).where(StoredContent.content_id == None))).scalars().all() for target_content in content_without_cid: target_cid = target_content.cid.serialize_v2() make_log("Indexer", f"Content without CID: {target_content.hash}, setting CID: {target_cid}", level="debug") target_content.content_id = target_cid await session.commit() last_known_index_ = (await session.execute( select(StoredContent).where(StoredContent.onchain_index != None).order_by(StoredContent.onchain_index.desc()) )).scalars().first() last_known_index = last_known_index_.onchain_index if last_known_index_ else 0 last_known_index = max(last_known_index, 0) make_log("Indexer", f"Last known index: {last_known_index}", level="debug") if last_known_index_: next_item_index = last_known_index + 1 else: next_item_index = 0 resolve_item_result = await toncenter.run_get_method(platform.address.to_string(1, 1, 1), 'get_nft_address_by_index', [['num', next_item_index]]) make_log("Indexer", f"Resolve item result: {resolve_item_result}", level="debug") if resolve_item_result.get('exit_code', -1) != 0: make_log("Indexer", f"Resolve item error: {resolve_item_result}", level="error") return platform_found, seqno item_address_cell_b64 = resolve_item_result['stack'][0][1]["bytes"] item_address_slice = Cell.one_from_boc(b64decode(item_address_cell_b64)).begin_parse() item_address = item_address_slice.read_msg_addr() make_log("Indexer", f"Item address: {item_address.to_string(1, 1, 1)}", level="debug") item_get_data_result = await toncenter.run_get_method(item_address.to_string(1, 1, 1), 'indexator_data') if item_get_data_result.get('exit_code', -1) != 0: make_log("Indexer", f"Get item data error (maybe not deployed): {item_get_data_result}", level="debug") return platform_found, seqno assert item_get_data_result['stack'][0][0] == 'num', "Item type is not a number" assert int(item_get_data_result['stack'][0][1], 16) == 1, "Item is not COP NFT" item_returned_address = Cell.one_from_boc(b64decode(item_get_data_result['stack'][1][1]['bytes'])).begin_parse().read_msg_addr() assert ( item_returned_address.to_string(1, 1, 1) == item_address.to_string(1, 1, 1) ), "Item address mismatch" assert item_get_data_result['stack'][2][0] == 'num', "Item index is not a number" item_index = int(item_get_data_result['stack'][2][1], 16) assert item_index == next_item_index, "Item index mismatch" item_platform_address = Cell.one_from_boc(b64decode(item_get_data_result['stack'][3][1]['bytes'])).begin_parse().read_msg_addr() assert item_platform_address.to_string(1, 1, 1) == Address(platform.address.to_string(1, 1, 1)).to_string(1, 1, 1), "Item platform address mismatch" assert item_get_data_result['stack'][4][0] == 'num', "Item license type is not a number" item_license_type = int(item_get_data_result['stack'][4][1], 16) assert item_license_type == 0, "Item license type is not 0" item_owner_address = Cell.one_from_boc(b64decode(item_get_data_result['stack'][5][1]["bytes"])).begin_parse().read_msg_addr() item_values = Cell.one_from_boc(b64decode(item_get_data_result['stack'][6][1]['bytes'])) item_derivates = Cell.one_from_boc(b64decode(item_get_data_result['stack'][7][1]['bytes'])) item_platform_variables = Cell.one_from_boc(b64decode(item_get_data_result['stack'][8][1]['bytes'])) item_distribution = Cell.one_from_boc(b64decode(item_get_data_result['stack'][9][1]['bytes'])) item_distribution_slice = item_distribution.begin_parse() item_prices_slice = item_distribution_slice.refs[0].begin_parse() item_listen_license_price = item_prices_slice.read_coins() item_use_license_price = item_prices_slice.read_coins() item_resale_license_price = item_prices_slice.read_coins() item_values_slice = item_values.begin_parse() item_content_hash_int = item_values_slice.read_uint(256) item_content_hash = item_content_hash_int.to_bytes(32, 'big') # item_content_hash_str = b58encode(item_content_hash).decode() item_metadata = item_values_slice.refs[0] item_content = item_values_slice.refs[1] item_metadata_str = item_metadata.bits.array.decode() item_content_cid_str = item_content.refs[0].bits.array.decode() item_content_cover_cid_str = item_content.refs[1].bits.array.decode() item_content_metadata_cid_str = item_content.refs[2].bits.array.decode() item_content_cid, err = resolve_content(item_content_cid_str) item_content_hash = item_content_cid.content_hash item_content_hash_str = item_content_cid.content_hash_b58 item_metadata_packed = { 'license_type': item_license_type, 'item_address': item_address.to_string(1, 1, 1), 'content_cid': item_content_cid_str, 'cover_cid': item_content_cover_cid_str, 'metadata_cid': item_content_metadata_cid_str, 'derivates': b58encode(item_derivates.to_boc(False)).decode(), 'platform_variables': b58encode(item_platform_variables.to_boc(False)).decode(), 'license': { 'listen': { 'price': str(item_listen_license_price) }, 'use': { 'price': str(item_use_license_price) }, 'resale': { 'price': str(item_resale_license_price) } } } user_wallet_connection = None if item_owner_address: user_wallet_connection = (await session.execute(select(WalletConnection).where( WalletConnection.wallet_address == item_owner_address.to_string(1, 1, 1) ))).scalars().first() encrypted_stored_content = (await session.execute(select(StoredContent).where( StoredContent.hash == item_content_hash_str ))).scalars().first() if encrypted_stored_content: is_duplicate = encrypted_stored_content.type.startswith("onchain") \ and encrypted_stored_content.onchain_index != item_index if not is_duplicate: if encrypted_stored_content.type.startswith('local'): encrypted_stored_content.type = "onchain/content" + ("_unknown" if (encrypted_stored_content.key_id is None) else "") encrypted_stored_content.onchain_index = item_index encrypted_stored_content.owner_address = item_owner_address.to_string(1, 1, 1) user = None if user_wallet_connection: encrypted_stored_content.user_id = user_wallet_connection.user_id user = user_wallet_connection.user if user: user_uploader_wrapper = Wrapped_CBotChat(memory._telegram_bot, chat_id=user.telegram_id, user=user, db_session=session) await user_uploader_wrapper.send_message( user.translated('p_contentWasIndexed').format( item_address=item_address.to_string(1, 1, 1), item_index=item_index, ), message_type='notification', reply_markup=get_inline_keyboard([ [{ 'text': user.translated('viewTrackAsClient_button'), 'url': f"https://t.me/{CLIENT_TELEGRAM_BOT_USERNAME}?start=C{encrypted_stored_content.cid.serialize_v2()}" }], ]) ) try: result = await session.execute(select(KnownTelegramMessage).where( and_( KnownTelegramMessage.chat_id == user.telegram_id, KnownTelegramMessage.type == 'hint', cast(KnownTelegramMessage.meta['encrypted_content_hash'], String) == encrypted_stored_content.hash, KnownTelegramMessage.deleted == False ) )) for hint_message in result.scalars().all(): await user_uploader_wrapper.delete_message(hint_message.message_id) except BaseException as e: make_log("Indexer", f"Error while deleting hint messages: {e}" + '\n' + traceback.format_exc(), level="error") elif encrypted_stored_content.type.startswith('onchain') and encrypted_stored_content.onchain_index == item_index: encrypted_stored_content.type = "onchain/content" + ("_unknown" if (encrypted_stored_content.key_id is None) else "") encrypted_stored_content.owner_address = item_owner_address.to_string(1, 1, 1) if user_wallet_connection: encrypted_stored_content.user_id = user_wallet_connection.user_id else: make_log("Indexer", f"[CRITICAL] Item already indexed and ERRORED!: {item_content_hash_str}", level="error") return platform_found, seqno encrypted_stored_content.updated = datetime.now() encrypted_stored_content.meta = { **encrypted_stored_content.meta, **item_metadata_packed } await session.commit() return platform_found, seqno else: item_metadata_packed['copied_from'] = encrypted_stored_content.id item_metadata_packed['copied_from_cid'] = encrypted_stored_content.cid.serialize_v2() item_content_hash_str = f"{b58encode(bytes(16) + os.urandom(30)).decode()}" # check this for vulnerability onchain_stored_content = StoredContent( type="onchain/content_unknown", hash=item_content_hash_str, onchain_index=item_index, owner_address=item_owner_address.to_string(1, 1, 1) if item_owner_address else None, meta=item_metadata_packed, filename="UNKNOWN_ENCRYPTED_CONTENT", user_id=user_wallet_connection.user_id if user_wallet_connection else None, created=datetime.now(), encrypted=True, decrypted_content_id=None, key_id=None, updated=datetime.now() ) session.add(onchain_stored_content) await session.commit() make_log("Indexer", f"Item indexed: {item_content_hash_str}", level="info") last_known_index += 1 return platform_found, seqno async def main_fn(memory, ): make_log("Indexer", "Service started", level="info") platform_found = False seqno = 0 while True: try: rid = __import__('uuid').uuid4().hex[:8] try: from app.core.log_context import ctx_rid ctx_rid.set(rid) except BaseException: pass make_log("Indexer", f"Loop start", level="debug", rid=rid) platform_found, seqno = await indexer_loop(memory, platform_found, seqno) except BaseException as e: make_log("Indexer", f"Error: {e}" + '\n' + traceback.format_exc(), level="error", rid=locals().get('rid')) if platform_found: await send_status("indexer", f"working (seqno={seqno})") await asyncio.sleep(5) seqno += 1 try: from app.core.log_context import ctx_rid ctx_rid.set(None) except BaseException: pass # if __name__ == '__main__': # loop = asyncio.get_event_loop() # loop.run_until_complete(main()) # loop.close()