import asyncio import time from base64 import b64encode from httpx import AsyncClient from app.core._config import TONCENTER_HOST, TONCENTER_API_KEY from app.core.logger import make_log class TonCenter: def __init__(self, host: str, api_key: str = None, testnet: bool = False): self.host = host self.api_key = api_key self.last_used = time.time() async def request(self, method: str, endpoint: str, *args, tries_count=0, **kwargs) -> dict: if tries_count > 3: raise Exception(f'Error while toncenter request {endpoint}: {tries_count}') async with AsyncClient(headers={ 'Authorization': f"Bearer {self.api_key}" } if self.api_key else None) as client: if self.api_key: kwargs['headers'] = kwargs.get('headers', {}) kwargs['headers']['X-API-Key'] = self.api_key if not self.api_key: while time.time() < self.last_used + 1.2: await asyncio.sleep(0.1) self.last_used = time.time() response = await client.request(method, f"{self.host}{endpoint}", *args, **kwargs) try: if response.status_code != 200: raise Exception(f'Error while toncenter request {endpoint}: {response.text}') return response.json() except BaseException as e: make_log("Toncenter", f'Error while request {self.host}{endpoint}: {e}' + '\n' + response.text, level='error') return await self.request(method, endpoint, *args, tries_count=(tries_count + 1), **kwargs) async def send_boc(self, src: bytes): try: res = await self.request( "POST", 'sendBoc', json={'boc': b64encode(src).decode()} ) except Exception as e: make_log("Toncenter", f"sendBoc error: {e}", level='error') return None make_log("Toncenter", f"sendBoc: {res}", level='debug') return res async def get_account(self, addr: str): return (await self.request( "GET", 'getAddressInformation', params={'address': addr} )).get('result', {}) async def get_seqno(self, addr: str): return (await self.request( "GET", 'getWalletInformation', params={'address': addr} )).get('result', {}).get('seqno', 0) async def get_transactions(self, addr: str, limit: int = 10): return (await self.request( "GET", 'getTransactions', params={'address': addr, 'limit': limit} )).get('result', []) async def run_get_method(self, addr, method, stack=[]): return (await self.request( "POST", 'runGetMethod', json={ 'address': addr, 'method': method, 'stack': stack if type(stack) == list else [] } )).get('result', {}) toncenter = TonCenter(TONCENTER_HOST, TONCENTER_API_KEY)