uploader-bot/app/core/_blockchain/ton/toncenter.py

93 lines
3.3 KiB
Python

import asyncio
import time
from base64 import b64encode
from httpx import AsyncClient
from app.core._config import TONCENTER_HOST, TONCENTER_API_KEY
from app.core.logger import make_log
class TonCenter:
def __init__(self, host: str, api_key: str = None, v3_host: str = None, testnet: bool = False):
self.host = host
self.api_key = api_key
self.v3_host = v3_host
self.last_used = time.time()
async def request(self, method: str, endpoint: str, *args, v3: bool=False, tries_count=0, **kwargs) -> dict:
if tries_count > 3:
raise Exception(f'Error while toncenter request {endpoint}: {tries_count}')
async with AsyncClient(headers={
'Authorization': f"Bearer {self.api_key}"
} if self.api_key else None) as client:
if self.api_key:
kwargs['headers'] = kwargs.get('headers', {})
kwargs['headers']['X-API-Key'] = self.api_key
if not self.api_key:
while time.time() < self.last_used + 1.2:
await asyncio.sleep(0.1)
self.last_used = time.time()
response = await client.request(method, f"{self.v3_host if v3 is True else self.host}{endpoint}", *args, **kwargs)
try:
if response.status_code != 200:
raise Exception(f'Error while toncenter request {endpoint}: {response.text}')
return response.json()
except BaseException as e:
make_log("Toncenter", f'Error while request {self.host}{endpoint}: {e}' + '\n' + response.text, level='error')
return await self.request(method, endpoint, *args, tries_count=(tries_count + 1), **kwargs)
async def send_boc(self, src: bytes):
try:
res = await self.request(
"POST", 'sendBoc',
json={'boc': b64encode(src).decode()}
)
except Exception as e:
make_log("Toncenter", f"sendBoc error: {e}", level='error')
return None
make_log("Toncenter", f"sendBoc: {res}", level='debug')
return res
async def get_account(self, addr: str):
return (await self.request(
"GET", 'getAddressInformation',
params={'address': addr}
)).get('result', {})
async def get_seqno(self, addr: str):
return (await self.request(
"GET", 'getWalletInformation',
params={'address': addr}
)).get('result', {}).get('seqno', 0)
async def get_transactions(self, addr: str, limit: int = 10):
return (await self.request(
"GET", 'getTransactions',
params={'address': addr, 'limit': limit}
)).get('result', [])
async def run_get_method(self, addr, method, stack=[]):
return (await self.request(
"POST", 'runGetMethod', json={
'address': addr,
'method': method,
'stack': stack if type(stack) == list else []
}
)).get('result', {})
async def get_nft_items(self, limit: int = 100, offset: int = 0, **search_options):
return (await self.request(
"GET", 'nft/items',
params={**search_options, 'limit': limit, 'offset': offset},
v3=True
)).get('nft_items', [])
toncenter = TonCenter(TONCENTER_HOST, TONCENTER_API_KEY)