import asyncio from base64 import b64decode import os import traceback import httpx from sqlalchemy import and_, func from tonsdk.boc import begin_cell, Cell from tonsdk.contract.wallet import Wallets from tonsdk.utils import HighloadQueryId from datetime import datetime, timedelta from app.core._blockchain.ton.platform import platform from app.core._blockchain.ton.toncenter import toncenter from app.core.models.tasks import BlockchainTask from app.core._config import MY_FUND_ADDRESS from app.core._secrets import service_wallet from app.core._utils.send_status import send_status from app.core.storage import db_session from app.core.logger import make_log async def get_sw_seqno(): sw_seqno_result = await toncenter.run_get_method(service_wallet.address.to_string(1, 1, 1), 'seqno') if sw_seqno_result.get('exit_code', -1) != 0: sw_seqno_value = 0 else: sw_seqno_value = int(sw_seqno_result.get('stack', [['num', '0x0']])[0][1], 16) return sw_seqno_value async def main_fn(memory): make_log("TON", f"Service started, SW = {service_wallet.address.to_string(1, 1, 1)}", level="info") sw_seqno_value = await get_sw_seqno() make_log("TON", f"Service wallet run seqno method: {sw_seqno_value}", level="info") if sw_seqno_value == 0: make_log("TON", "Service wallet is not deployed, deploying...", level="info") await toncenter.send_boc( service_wallet.create_transfer_message( [{ 'address': service_wallet.address.to_string(1, 1, 1), 'amount': 1, 'send_mode': 1, 'payload': begin_cell().store_uint(0, 32).store_bytes(b"Init MY Node").end_cell() }], 0 )['message'].to_boc(False) ) await asyncio.sleep(5) return await main_fn(memory) if os.getenv("TON_BEGIN_COMMAND_WITHDRAW"): await toncenter.send_boc( service_wallet.create_transfer_message( [{ 'address': MY_FUND_ADDRESS, 'amount': 1, 'send_mode': 128, 'payload': begin_cell().end_cell() }], sw_seqno_value )['message'].to_boc(False) ) make_log("TON", "Withdraw command sent", level="info") await asyncio.sleep(10) return await main_fn(memory) # TODO: не деплоить если указан master_address и мы проверили что аккаунт существует. Сейчас platform у каждой ноды будет разным platform_state = await toncenter.get_account(platform.address.to_string(1, 1, 1)) if not platform_state.get('code'): make_log("TON", "Platform contract is not deployed, send deploy transaction..", level="info") await toncenter.send_boc( service_wallet.create_transfer_message( [{ 'address': platform.address.to_string(1, 1, 1), 'amount': int(0.08 * 10 ** 9), 'send_mode': 1, 'payload': begin_cell().store_uint(0, 32).store_uint(0, 64).end_cell(), 'state_init': platform.create_state_init()['state_init'] }], sw_seqno_value )['message'].to_boc(False) ) await send_status("ton_daemon", "working: deploying platform") await asyncio.sleep(15) return await main_fn(memory) highload_wallet = Wallets.ALL['hv3']( private_key=service_wallet.options['private_key'], public_key=service_wallet.options['public_key'], wc=0 ) make_log("TON", f"Highload wallet address: {highload_wallet.address.to_string(1, 1, 1)}", level="info") highload_state = await toncenter.get_account(highload_wallet.address.to_string(1, 1, 1)) if int(highload_state.get('balance', '0')) / 1e9 < 0.05: make_log("TON", "Highload wallet balance is less than 0.05, send topup transaction..", level="info") await toncenter.send_boc( service_wallet.create_transfer_message( [{ 'address': highload_wallet.address.to_string(1, 1, 0), 'amount': int(0.08 * 10 ** 9), 'send_mode': 1, 'payload': begin_cell().store_uint(0, 32).end_cell() }], sw_seqno_value )['message'].to_boc(False) ) await send_status("ton_daemon", "working: topup highload wallet") await asyncio.sleep(15) return await main_fn(memory) if not highload_state.get('code'): make_log("TON", "Highload wallet contract is not deployed, send deploy transaction..", level="info") created_at_ts = int(datetime.utcnow().timestamp()) - 60 await toncenter.send_boc( highload_wallet.create_transfer_message( service_wallet.address.to_string(1, 1, 1), 1, HighloadQueryId.from_seqno(0), created_at_ts, send_mode=1, payload="hello world", need_deploy=True )['message'].to_boc(False) ) await send_status("ton_daemon", "working: deploying highload wallet") await asyncio.sleep(15) return await main_fn(memory) while True: try: sw_seqno_value = await get_sw_seqno() make_log("TON", f"Service running ({sw_seqno_value})", level="debug") async with db_session() as session: # Проверка отправленных сообщений await send_status("ton_daemon", f"working: processing in-txs (seqno={sw_seqno_value})") async def process_incoming_transaction(transaction: dict): transaction_hash = transaction['transaction_id']['hash'] transaction_lt = str(transaction['transaction_id']['lt']) # transaction_success = bool(transaction['success']) async def process_incoming_message(blockchain_message: dict): in_msg_cell = Cell.one_from_boc(b64decode(blockchain_message['msg_data']['body'])) in_msg_slice = in_msg_cell.refs[0].begin_parse() in_msg_slice.read_uint(32) in_msg_slice.read_uint(8) in_msg_query_id = in_msg_slice.read_uint(23) in_msg_created_at = in_msg_slice.read_uint(64) in_msg_epoch = int(in_msg_created_at // (60 * 60)) in_msg_seqno = HighloadQueryId.from_query_id(in_msg_query_id).to_seqno() from sqlalchemy import select in_msg_blockchain_task = ( await session.execute( select(BlockchainTask).where( and_( BlockchainTask.seqno == in_msg_seqno, BlockchainTask.epoch == in_msg_epoch, ) ) ) ).scalars().first() if not in_msg_blockchain_task: return if not (in_msg_blockchain_task.status in ['done']) or in_msg_blockchain_task.transaction_hash != transaction_hash: in_msg_blockchain_task.status = 'done' in_msg_blockchain_task.transaction_hash = transaction_hash in_msg_blockchain_task.transaction_lt = transaction_lt await session.commit() for blockchain_message in [transaction['in_msg']]: try: await process_incoming_message(blockchain_message) except BaseException as e: pass # make_log("TON_Daemon", f"Error while processing incoming message: {e}" + '\n' + traceback.format_exc(), level='debug') try: sw_transactions = await toncenter.get_transactions(highload_wallet.address.to_string(1, 1, 1), limit=100) for sw_transaction in sw_transactions: try: await process_incoming_transaction(sw_transaction) except BaseException as e: make_log("TON_Daemon", f"Error while processing incoming transaction: {e}", level="debug") except BaseException as e: make_log("TON_Daemon", f"Error while getting service wallet transactions: {e}", level="ERROR") await send_status("ton_daemon", f"working: processing out-txs (seqno={sw_seqno_value})") # Отправка подписанных сообщений from sqlalchemy import select _processing = (await session.execute(select(BlockchainTask).where( BlockchainTask.status == 'processing' ).order_by(BlockchainTask.updated.asc()))).scalars().all() for blockchain_task in _processing: make_log("TON_Daemon", f"Processing task (processing) {blockchain_task.id}") query_boc = bytes.fromhex(blockchain_task.meta['signed_message']) errors_list = [] try: await toncenter.send_boc(query_boc) except BaseException as e: errors_list.append(f"{e}") try: make_log("TON_Daemon", str( httpx.post( 'https://tonapi.io/v2/blockchain/message', json={ 'boc': query_boc.hex() } ).text )) except BaseException as e: make_log("TON_Daemon", f"Error while pushing task to tonkeeper ({blockchain_task.id}): {e}", level="ERROR") errors_list.append(f"{e}") blockchain_task.updated = datetime.utcnow() if blockchain_task.meta['sign_created'] + 10 * 60 < datetime.utcnow().timestamp(): # or sum([int("terminating vm with exit code 36" in e) for e in errors_list]) > 0: make_log("TON_Daemon", f"Task {blockchain_task.id} done", level="DEBUG") blockchain_task.status = 'done' await session.commit() continue await asyncio.sleep(0.5) await send_status("ton_daemon", f"working: creating new messages (seqno={sw_seqno_value})") # Создание новых подписей _waiting = (await session.execute(select(BlockchainTask).where(BlockchainTask.status == 'wait'))).scalars().all() for blockchain_task in _waiting: try: # Check processing tasks in current epoch < 3_000_000 from sqlalchemy import func _cnt = (await session.execute(select(func.count()).select_from(BlockchainTask).where( BlockchainTask.epoch == blockchain_task.epoch ))).scalar() or 0 if _cnt > 3_000_000: make_log("TON", f"Too many processing tasks in epoch {blockchain_task.epoch}", level="error") await send_status("ton_daemon", f"working: too many tasks in epoch {blockchain_task.epoch}") await asyncio.sleep(5) continue sign_created = int(datetime.utcnow().timestamp()) - 60 try: current_epoch = int(datetime.utcnow().timestamp() // (60 * 60)) from sqlalchemy import func max_epoch_seqno = ( (await session.execute(select(func.max(BlockchainTask.seqno)).where( BlockchainTask.epoch == current_epoch ))).scalar() or 0 ) current_epoch_shift = 3_000_000 if current_epoch % 2 == 0 else 0 current_seqno = max_epoch_seqno + 1 + (current_epoch_shift if max_epoch_seqno == 0 else 0) except BaseException as e: make_log("CRITICAL", f"Error calculating epoch,seqno: {e}", level="error") current_epoch = 0 current_seqno = 0 blockchain_task.seqno = current_seqno blockchain_task.epoch = current_epoch blockchain_task.status = 'processing' try: query = highload_wallet.create_transfer_message( blockchain_task.destination, int(blockchain_task.amount), HighloadQueryId.from_seqno(current_seqno), sign_created, send_mode=1, payload=Cell.one_from_boc(b64decode(blockchain_task.payload)) ) query_boc = query['message'].to_boc(False) except BaseException as e: make_log("TON", f"Error creating transfer message: {e}", level="error") query_boc = begin_cell().end_cell().to_boc(False) blockchain_task.meta = { **blockchain_task.meta, 'sign_created': sign_created, 'signed_message': query_boc.hex(), } await session.commit() make_log("TON", f"Created signed message for task {blockchain_task.id}" + '\n' + traceback.format_exc(), level="info") except BaseException as e: make_log("TON", f"Error processing task {blockchain_task.id}: {e}" + '\n' + traceback.format_exc(), level="error") continue await asyncio.sleep(1) await asyncio.sleep(1) await send_status("ton_daemon", f"working (seqno={sw_seqno_value})") except BaseException as e: make_log("TON", f"Error: {e}", level="error") await asyncio.sleep(3) # if __name__ == '__main__': # loop = asyncio.get_event_loop() # loop.run_until_complete(main()) # loop.close()