uploader-bot/app/core/background/ton_service.py

303 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
from base64 import b64decode
import os
import traceback
import httpx
from sqlalchemy import and_, func
from tonsdk.boc import begin_cell, Cell
from tonsdk.contract.wallet import Wallets
from tonsdk.utils import HighloadQueryId
from datetime import datetime, timedelta
from app.core._blockchain.ton.platform import platform
from app.core._blockchain.ton.toncenter import toncenter
from app.core.models.tasks import BlockchainTask
from app.core._config import MY_FUND_ADDRESS
from app.core._secrets import service_wallet
from app.core._utils.send_status import send_status
from app.core.storage import db_session
from app.core.logger import make_log
async def get_sw_seqno():
sw_seqno_result = await toncenter.run_get_method(service_wallet.address.to_string(1, 1, 1), 'seqno')
if sw_seqno_result.get('exit_code', -1) != 0:
sw_seqno_value = 0
else:
sw_seqno_value = int(sw_seqno_result.get('stack', [['num', '0x0']])[0][1], 16)
return sw_seqno_value
async def main_fn(memory):
make_log("TON", f"Service started, SW = {service_wallet.address.to_string(1, 1, 1)}", level="info")
sw_seqno_value = await get_sw_seqno()
make_log("TON", f"Service wallet run seqno method: {sw_seqno_value}", level="info")
if sw_seqno_value == 0:
make_log("TON", "Service wallet is not deployed, deploying...", level="info")
await toncenter.send_boc(
service_wallet.create_transfer_message(
[{
'address': service_wallet.address.to_string(1, 1, 1),
'amount': 1,
'send_mode': 1,
'payload': begin_cell().store_uint(0, 32).store_bytes(b"Init MY Node").end_cell()
}], 0
)['message'].to_boc(False)
)
await asyncio.sleep(5)
return await main_fn(memory)
if os.getenv("TON_BEGIN_COMMAND_WITHDRAW"):
await toncenter.send_boc(
service_wallet.create_transfer_message(
[{
'address': MY_FUND_ADDRESS,
'amount': 1,
'send_mode': 128,
'payload': begin_cell().end_cell()
}], sw_seqno_value
)['message'].to_boc(False)
)
make_log("TON", "Withdraw command sent", level="info")
await asyncio.sleep(10)
return await main_fn(memory)
# TODO: не деплоить если указан master_address и мы проверили что аккаунт существует. Сейчас platform у каждой ноды будет разным
platform_state = await toncenter.get_account(platform.address.to_string(1, 1, 1))
if not platform_state.get('code'):
make_log("TON", "Platform contract is not deployed, send deploy transaction..", level="info")
await toncenter.send_boc(
service_wallet.create_transfer_message(
[{
'address': platform.address.to_string(1, 1, 1),
'amount': int(0.08 * 10 ** 9),
'send_mode': 1,
'payload': begin_cell().store_uint(0, 32).store_uint(0, 64).end_cell(),
'state_init': platform.create_state_init()['state_init']
}], sw_seqno_value
)['message'].to_boc(False)
)
await send_status("ton_daemon", "working: deploying platform")
await asyncio.sleep(15)
return await main_fn(memory)
highload_wallet = Wallets.ALL['hv3'](
private_key=service_wallet.options['private_key'],
public_key=service_wallet.options['public_key'],
wc=0
)
make_log("TON", f"Highload wallet address: {highload_wallet.address.to_string(1, 1, 1)}", level="info")
highload_state = await toncenter.get_account(highload_wallet.address.to_string(1, 1, 1))
if int(highload_state.get('balance', '0')) / 1e9 < 0.05:
make_log("TON", "Highload wallet balance is less than 0.05, send topup transaction..", level="info")
await toncenter.send_boc(
service_wallet.create_transfer_message(
[{
'address': highload_wallet.address.to_string(1, 1, 0),
'amount': int(0.02 * 10 ** 9),
'send_mode': 1,
'payload': begin_cell().store_uint(0, 32).end_cell()
}], sw_seqno_value
)['message'].to_boc(False)
)
await send_status("ton_daemon", "working: topup highload wallet")
await asyncio.sleep(15)
return await main_fn(memory)
if not highload_state.get('code'):
make_log("TON", "Highload wallet contract is not deployed, send deploy transaction..", level="info")
created_at_ts = int(datetime.utcnow().timestamp()) - 60
await toncenter.send_boc(
highload_wallet.create_transfer_message(
service_wallet.address.to_string(1, 1, 1),
1, HighloadQueryId.from_seqno(0), created_at_ts, send_mode=1, payload="hello world", need_deploy=True
)['message'].to_boc(False)
)
await send_status("ton_daemon", "working: deploying highload wallet")
await asyncio.sleep(15)
return await main_fn(memory)
while True:
try:
rid = __import__('uuid').uuid4().hex[:8]
try:
from app.core.log_context import ctx_rid
ctx_rid.set(rid)
except BaseException:
pass
sw_seqno_value = await get_sw_seqno()
make_log("TON", f"Service running ({sw_seqno_value})", level="debug", rid=rid)
async with db_session() as session:
# Проверка отправленных сообщений
await send_status("ton_daemon", f"working: processing in-txs (seqno={sw_seqno_value})")
async def process_incoming_transaction(transaction: dict):
transaction_hash = transaction['transaction_id']['hash']
transaction_lt = str(transaction['transaction_id']['lt'])
# transaction_success = bool(transaction['success'])
async def process_incoming_message(blockchain_message: dict):
in_msg_cell = Cell.one_from_boc(b64decode(blockchain_message['msg_data']['body']))
in_msg_slice = in_msg_cell.refs[0].begin_parse()
in_msg_slice.read_uint(32)
in_msg_slice.read_uint(8)
in_msg_query_id = in_msg_slice.read_uint(23)
in_msg_created_at = in_msg_slice.read_uint(64)
in_msg_epoch = int(in_msg_created_at // (60 * 60))
in_msg_seqno = HighloadQueryId.from_query_id(in_msg_query_id).to_seqno()
from sqlalchemy import select
in_msg_blockchain_task = (
await session.execute(
select(BlockchainTask).where(
and_(
BlockchainTask.seqno == in_msg_seqno,
BlockchainTask.epoch == in_msg_epoch,
)
)
)
).scalars().first()
if not in_msg_blockchain_task:
return
if not (in_msg_blockchain_task.status in ['done']) or in_msg_blockchain_task.transaction_hash != transaction_hash:
in_msg_blockchain_task.status = 'done'
in_msg_blockchain_task.transaction_hash = transaction_hash
in_msg_blockchain_task.transaction_lt = transaction_lt
await session.commit()
for blockchain_message in [transaction['in_msg']]:
try:
await process_incoming_message(blockchain_message)
except BaseException as e:
pass # make_log("TON_Daemon", f"Error while processing incoming message: {e}" + '\n' + traceback.format_exc(), level='debug', rid=rid)
try:
sw_transactions = await toncenter.get_transactions(highload_wallet.address.to_string(1, 1, 1), limit=100)
for sw_transaction in sw_transactions:
try:
await process_incoming_transaction(sw_transaction)
except BaseException as e:
make_log("TON_Daemon", f"Error while processing incoming transaction: {e}", level="debug", rid=rid)
except BaseException as e:
make_log("TON_Daemon", f"Error while getting service wallet transactions: {e}", level="ERROR", rid=rid)
await send_status("ton_daemon", f"working: processing out-txs (seqno={sw_seqno_value})")
# Отправка подписанных сообщений
from sqlalchemy import select
_processing = (await session.execute(select(BlockchainTask).where(
BlockchainTask.status == 'processing'
).order_by(BlockchainTask.updated.asc()))).scalars().all()
for blockchain_task in _processing:
make_log("TON_Daemon", f"Processing task (processing) {blockchain_task.id}", rid=rid)
query_boc = bytes.fromhex(blockchain_task.meta['signed_message'])
errors_list = []
try:
await toncenter.send_boc(query_boc)
except BaseException as e:
errors_list.append(f"{e}")
try:
make_log("TON_Daemon", str(
httpx.post(
'https://tonapi.io/v2/blockchain/message',
json={
'boc': query_boc.hex()
}
).text
))
except BaseException as e:
make_log("TON_Daemon", f"Error while pushing task to tonkeeper ({blockchain_task.id}): {e}", level="ERROR")
errors_list.append(f"{e}")
blockchain_task.updated = datetime.utcnow()
if blockchain_task.meta['sign_created'] + 10 * 60 < datetime.utcnow().timestamp():
# or sum([int("terminating vm with exit code 36" in e) for e in errors_list]) > 0:
make_log("TON_Daemon", f"Task {blockchain_task.id} done", level="DEBUG")
blockchain_task.status = 'done'
await session.commit()
continue
await asyncio.sleep(0.5)
await send_status("ton_daemon", f"working: creating new messages (seqno={sw_seqno_value})")
# Создание новых подписей
_waiting = (await session.execute(select(BlockchainTask).where(BlockchainTask.status == 'wait'))).scalars().all()
for blockchain_task in _waiting:
try:
# Check processing tasks in current epoch < 3_000_000
from sqlalchemy import func
_cnt = (await session.execute(select(func.count()).select_from(BlockchainTask).where(
BlockchainTask.epoch == blockchain_task.epoch
))).scalar() or 0
if _cnt > 3_000_000:
make_log("TON", f"Too many processing tasks in epoch {blockchain_task.epoch}", level="error")
await send_status("ton_daemon", f"working: too many tasks in epoch {blockchain_task.epoch}")
await asyncio.sleep(5)
continue
sign_created = int(datetime.utcnow().timestamp()) - 60
try:
current_epoch = int(datetime.utcnow().timestamp() // (60 * 60))
from sqlalchemy import func
max_epoch_seqno = (
(await session.execute(select(func.max(BlockchainTask.seqno)).where(
BlockchainTask.epoch == current_epoch
))).scalar() or 0
)
current_epoch_shift = 3_000_000 if current_epoch % 2 == 0 else 0
current_seqno = max_epoch_seqno + 1 + (current_epoch_shift if max_epoch_seqno == 0 else 0)
except BaseException as e:
make_log("CRITICAL", f"Error calculating epoch,seqno: {e}", level="error")
current_epoch = 0
current_seqno = 0
blockchain_task.seqno = current_seqno
blockchain_task.epoch = current_epoch
blockchain_task.status = 'processing'
try:
query = highload_wallet.create_transfer_message(
blockchain_task.destination, int(blockchain_task.amount), HighloadQueryId.from_seqno(current_seqno),
sign_created, send_mode=1,
payload=Cell.one_from_boc(b64decode(blockchain_task.payload))
)
query_boc = query['message'].to_boc(False)
except BaseException as e:
make_log("TON", f"Error creating transfer message: {e}", level="error", rid=rid)
query_boc = begin_cell().end_cell().to_boc(False)
blockchain_task.meta = {
**blockchain_task.meta,
'sign_created': sign_created,
'signed_message': query_boc.hex(),
}
await session.commit()
make_log("TON", f"Created signed message for task {blockchain_task.id}" + '\n' + traceback.format_exc(), level="info", rid=rid)
except BaseException as e:
make_log("TON", f"Error processing task {blockchain_task.id}: {e}" + '\n' + traceback.format_exc(), level="error", rid=rid)
continue
await asyncio.sleep(1)
await asyncio.sleep(1)
await send_status("ton_daemon", f"working (seqno={sw_seqno_value})")
except BaseException as e:
make_log("TON", f"Error: {e}", level="error", rid=locals().get('rid'))
await asyncio.sleep(3)
finally:
try:
from app.core.log_context import ctx_rid
ctx_rid.set(None)
except BaseException:
pass
# if __name__ == '__main__':
# loop = asyncio.get_event_loop()
# loop.run_until_complete(main())
# loop.close()