291 lines
14 KiB
Python
291 lines
14 KiB
Python
import asyncio
|
||
from base64 import b64decode
|
||
import os
|
||
import traceback
|
||
import httpx
|
||
|
||
from sqlalchemy import and_, func
|
||
from tonsdk.boc import begin_cell, Cell
|
||
from tonsdk.contract.wallet import Wallets
|
||
from tonsdk.utils import HighloadQueryId
|
||
from datetime import datetime, timedelta
|
||
|
||
from app.core._blockchain.ton.platform import platform
|
||
from app.core._blockchain.ton.toncenter import toncenter
|
||
from app.core.models.tasks import BlockchainTask
|
||
from app.core._config import MY_FUND_ADDRESS
|
||
from app.core._secrets import service_wallet
|
||
from app.core._utils.send_status import send_status
|
||
from app.core.storage import db_session
|
||
from app.core.logger import make_log
|
||
|
||
|
||
async def get_sw_seqno():
|
||
sw_seqno_result = await toncenter.run_get_method(service_wallet.address.to_string(1, 1, 1), 'seqno')
|
||
if sw_seqno_result.get('exit_code', -1) != 0:
|
||
sw_seqno_value = 0
|
||
else:
|
||
sw_seqno_value = int(sw_seqno_result.get('stack', [['num', '0x0']])[0][1], 16)
|
||
|
||
return sw_seqno_value
|
||
|
||
|
||
async def main_fn(memory):
|
||
make_log("TON", f"Service started, SW = {service_wallet.address.to_string(1, 1, 1)}", level="info")
|
||
sw_seqno_value = await get_sw_seqno()
|
||
make_log("TON", f"Service wallet run seqno method: {sw_seqno_value}", level="info")
|
||
if sw_seqno_value == 0:
|
||
make_log("TON", "Service wallet is not deployed, deploying...", level="info")
|
||
await toncenter.send_boc(
|
||
service_wallet.create_transfer_message(
|
||
[{
|
||
'address': service_wallet.address.to_string(1, 1, 1),
|
||
'amount': 1,
|
||
'send_mode': 1,
|
||
'payload': begin_cell().store_uint(0, 32).store_bytes(b"Init MY Node").end_cell()
|
||
}], 0
|
||
)['message'].to_boc(False)
|
||
)
|
||
await asyncio.sleep(5)
|
||
return await main_fn(memory)
|
||
|
||
if os.getenv("TON_BEGIN_COMMAND_WITHDRAW"):
|
||
await toncenter.send_boc(
|
||
service_wallet.create_transfer_message(
|
||
[{
|
||
'address': MY_FUND_ADDRESS,
|
||
'amount': 1,
|
||
'send_mode': 128,
|
||
'payload': begin_cell().end_cell()
|
||
}], sw_seqno_value
|
||
)['message'].to_boc(False)
|
||
)
|
||
make_log("TON", "Withdraw command sent", level="info")
|
||
await asyncio.sleep(10)
|
||
return await main_fn(memory)
|
||
|
||
# TODO: не деплоить если указан master_address и мы проверили что аккаунт существует. Сейчас platform у каждой ноды будет разным
|
||
platform_state = await toncenter.get_account(platform.address.to_string(1, 1, 1))
|
||
if not platform_state.get('code'):
|
||
make_log("TON", "Platform contract is not deployed, send deploy transaction..", level="info")
|
||
await toncenter.send_boc(
|
||
service_wallet.create_transfer_message(
|
||
[{
|
||
'address': platform.address.to_string(1, 1, 1),
|
||
'amount': int(0.08 * 10 ** 9),
|
||
'send_mode': 1,
|
||
'payload': begin_cell().store_uint(0, 32).store_uint(0, 64).end_cell(),
|
||
'state_init': platform.create_state_init()['state_init']
|
||
}], sw_seqno_value
|
||
)['message'].to_boc(False)
|
||
)
|
||
|
||
await send_status("ton_daemon", "working: deploying platform")
|
||
await asyncio.sleep(15)
|
||
return await main_fn(memory)
|
||
|
||
highload_wallet = Wallets.ALL['hv3'](
|
||
private_key=service_wallet.options['private_key'],
|
||
public_key=service_wallet.options['public_key'],
|
||
wc=0
|
||
)
|
||
make_log("TON", f"Highload wallet address: {highload_wallet.address.to_string(1, 1, 1)}", level="info")
|
||
highload_state = await toncenter.get_account(highload_wallet.address.to_string(1, 1, 1))
|
||
if int(highload_state.get('balance', '0')) / 1e9 < 0.05:
|
||
make_log("TON", "Highload wallet balance is less than 0.05, send topup transaction..", level="info")
|
||
await toncenter.send_boc(
|
||
service_wallet.create_transfer_message(
|
||
[{
|
||
'address': highload_wallet.address.to_string(1, 1, 0),
|
||
'amount': int(0.08 * 10 ** 9),
|
||
'send_mode': 1,
|
||
'payload': begin_cell().store_uint(0, 32).end_cell()
|
||
}], sw_seqno_value
|
||
)['message'].to_boc(False)
|
||
)
|
||
await send_status("ton_daemon", "working: topup highload wallet")
|
||
await asyncio.sleep(15)
|
||
return await main_fn(memory)
|
||
|
||
if not highload_state.get('code'):
|
||
make_log("TON", "Highload wallet contract is not deployed, send deploy transaction..", level="info")
|
||
created_at_ts = int(datetime.utcnow().timestamp()) - 60
|
||
await toncenter.send_boc(
|
||
highload_wallet.create_transfer_message(
|
||
service_wallet.address.to_string(1, 1, 1),
|
||
1, HighloadQueryId.from_seqno(0), created_at_ts, send_mode=1, payload="hello world", need_deploy=True
|
||
)['message'].to_boc(False)
|
||
)
|
||
await send_status("ton_daemon", "working: deploying highload wallet")
|
||
await asyncio.sleep(15)
|
||
return await main_fn(memory)
|
||
|
||
while True:
|
||
try:
|
||
sw_seqno_value = await get_sw_seqno()
|
||
make_log("TON", f"Service running ({sw_seqno_value})", level="debug")
|
||
|
||
with db_session() as session:
|
||
# Проверка отправленных сообщений
|
||
await send_status("ton_daemon", f"working: processing in-txs (seqno={sw_seqno_value})")
|
||
async def process_incoming_transaction(transaction: dict):
|
||
transaction_hash = transaction['transaction_id']['hash']
|
||
transaction_lt = str(transaction['transaction_id']['lt'])
|
||
# transaction_success = bool(transaction['success'])
|
||
|
||
async def process_incoming_message(blockchain_message: dict):
|
||
in_msg_cell = Cell.one_from_boc(b64decode(blockchain_message['msg_data']['body']))
|
||
in_msg_slice = in_msg_cell.refs[0].begin_parse()
|
||
in_msg_slice.read_uint(32)
|
||
in_msg_slice.read_uint(8)
|
||
in_msg_query_id = in_msg_slice.read_uint(23)
|
||
in_msg_created_at = in_msg_slice.read_uint(64)
|
||
in_msg_epoch = int(in_msg_created_at // (60 * 60))
|
||
in_msg_seqno = HighloadQueryId.from_query_id(in_msg_query_id).to_seqno()
|
||
in_msg_blockchain_task = (
|
||
session.query(BlockchainTask).filter(
|
||
and_(
|
||
BlockchainTask.seqno == in_msg_seqno,
|
||
BlockchainTask.epoch == in_msg_epoch,
|
||
)
|
||
)
|
||
).first()
|
||
if not in_msg_blockchain_task:
|
||
return
|
||
|
||
if not (in_msg_blockchain_task.status in ['done']) or in_msg_blockchain_task.transaction_hash != transaction_hash:
|
||
in_msg_blockchain_task.status = 'done'
|
||
in_msg_blockchain_task.transaction_hash = transaction_hash
|
||
in_msg_blockchain_task.transaction_lt = transaction_lt
|
||
await session.commit()
|
||
|
||
for blockchain_message in [transaction['in_msg']]:
|
||
try:
|
||
await process_incoming_message(blockchain_message)
|
||
except BaseException as e:
|
||
pass # make_log("TON_Daemon", f"Error while processing incoming message: {e}" + '\n' + traceback.format_exc(), level='debug')
|
||
|
||
try:
|
||
sw_transactions = await toncenter.get_transactions(highload_wallet.address.to_string(1, 1, 1), limit=100)
|
||
for sw_transaction in sw_transactions:
|
||
try:
|
||
await process_incoming_transaction(sw_transaction)
|
||
except BaseException as e:
|
||
make_log("TON_Daemon", f"Error while processing incoming transaction: {e}", level="debug")
|
||
except BaseException as e:
|
||
make_log("TON_Daemon", f"Error while getting service wallet transactions: {e}", level="ERROR")
|
||
|
||
await send_status("ton_daemon", f"working: processing out-txs (seqno={sw_seqno_value})")
|
||
# Отправка подписанных сообщений
|
||
for blockchain_task in (
|
||
session.query(BlockchainTask).filter(
|
||
BlockchainTask.status == 'processing',
|
||
).order_by(BlockchainTask.updated.asc()).all()
|
||
):
|
||
make_log("TON_Daemon", f"Processing task (processing) {blockchain_task.id}")
|
||
query_boc = bytes.fromhex(blockchain_task.meta['signed_message'])
|
||
errors_list = []
|
||
|
||
try:
|
||
await toncenter.send_boc(query_boc)
|
||
except BaseException as e:
|
||
errors_list.append(f"{e}")
|
||
|
||
try:
|
||
make_log("TON_Daemon", str(
|
||
httpx.post(
|
||
'https://tonapi.io/v2/blockchain/message',
|
||
json={
|
||
'boc': query_boc.hex()
|
||
}
|
||
).text
|
||
))
|
||
except BaseException as e:
|
||
make_log("TON_Daemon", f"Error while pushing task to tonkeeper ({blockchain_task.id}): {e}", level="ERROR")
|
||
errors_list.append(f"{e}")
|
||
|
||
blockchain_task.updated = datetime.utcnow()
|
||
|
||
if blockchain_task.meta['sign_created'] + 10 * 60 < datetime.utcnow().timestamp():
|
||
# or sum([int("terminating vm with exit code 36" in e) for e in errors_list]) > 0:
|
||
make_log("TON_Daemon", f"Task {blockchain_task.id} done", level="DEBUG")
|
||
blockchain_task.status = 'done'
|
||
await session.commit()
|
||
continue
|
||
|
||
await asyncio.sleep(0.5)
|
||
|
||
await send_status("ton_daemon", f"working: creating new messages (seqno={sw_seqno_value})")
|
||
# Создание новых подписей
|
||
for blockchain_task in (
|
||
session.query(BlockchainTask).filter(BlockchainTask.status == 'wait').all()
|
||
):
|
||
try:
|
||
# Check processing tasks in current epoch < 3_000_000
|
||
if (
|
||
session.query(BlockchainTask).filter(
|
||
BlockchainTask.epoch == blockchain_task.epoch,
|
||
).count() > 3_000_000
|
||
):
|
||
make_log("TON", f"Too many processing tasks in epoch {blockchain_task.epoch}", level="error")
|
||
await send_status("ton_daemon", f"working: too many tasks in epoch {blockchain_task.epoch}")
|
||
await asyncio.sleep(5)
|
||
continue
|
||
|
||
sign_created = int(datetime.utcnow().timestamp()) - 60
|
||
try:
|
||
current_epoch = int(datetime.utcnow().timestamp() // (60 * 60))
|
||
max_epoch_seqno = (
|
||
session.query(func.max(BlockchainTask.seqno)).filter(
|
||
BlockchainTask.epoch == current_epoch
|
||
).scalar() or 0
|
||
)
|
||
current_epoch_shift = 3_000_000 if current_epoch % 2 == 0 else 0
|
||
current_seqno = max_epoch_seqno + 1 + (current_epoch_shift if max_epoch_seqno == 0 else 0)
|
||
except BaseException as e:
|
||
make_log("CRITICAL", f"Error calculating epoch,seqno: {e}", level="error")
|
||
current_epoch = 0
|
||
current_seqno = 0
|
||
|
||
blockchain_task.seqno = current_seqno
|
||
blockchain_task.epoch = current_epoch
|
||
blockchain_task.status = 'processing'
|
||
try:
|
||
query = highload_wallet.create_transfer_message(
|
||
blockchain_task.destination, int(blockchain_task.amount), HighloadQueryId.from_seqno(current_seqno),
|
||
sign_created, send_mode=1,
|
||
payload=Cell.one_from_boc(b64decode(blockchain_task.payload))
|
||
)
|
||
query_boc = query['message'].to_boc(False)
|
||
except BaseException as e:
|
||
make_log("TON", f"Error creating transfer message: {e}", level="error")
|
||
query_boc = begin_cell().end_cell().to_boc(False)
|
||
|
||
blockchain_task.meta = {
|
||
**blockchain_task.meta,
|
||
'sign_created': sign_created,
|
||
'signed_message': query_boc,
|
||
}
|
||
await session.commit()
|
||
make_log("TON", f"Created signed message for task {blockchain_task.id}" + '\n' + traceback.format_exc(), level="info")
|
||
except BaseException as e:
|
||
make_log("TON", f"Error processing task {blockchain_task.id}: {e}" + '\n' + traceback.format_exc(), level="error")
|
||
continue
|
||
|
||
await asyncio.sleep(1)
|
||
|
||
await asyncio.sleep(1)
|
||
await send_status("ton_daemon", f"working (seqno={sw_seqno_value})")
|
||
except BaseException as e:
|
||
make_log("TON", f"Error: {e}", level="error")
|
||
await asyncio.sleep(3)
|
||
|
||
# if __name__ == '__main__':
|
||
# loop = asyncio.get_event_loop()
|
||
# loop.run_until_complete(main())
|
||
# loop.close()
|
||
|
||
|
||
|
||
|