wallet-v3cr3/scripts/toncenter.py

60 lines
1.9 KiB
Python

import os
from base64 import b64encode
from httpx import AsyncClient
class TonCenter:
def __init__(self, api_key: str = None, testnet: bool = False):
self.host = os.getenv("TONCENTER_HOST", "https://toncenter.com/api/v1/")
if not (self.host[-1] == '/'):
self.host += '/'
self.api_key = api_key
async def request(self, method: str, endpoint: str, *args, **kwargs) -> dict:
async with AsyncClient() as client:
response = await client.request(method, f"{self.host}{endpoint}", *args, **kwargs)
if response.status_code != 200:
raise Exception(f'Error while TONCENTER request {endpoint}: {response.text}')
return response.json()
async def send_boc(self, src: bytes):
return await self.request(
"POST", 'sendBoc',
json={'boc': b64encode(src).decode()}
)
async def get_account(self, addr: str):
return (await self.request(
"GET", 'getAddressInformation',
params={'address': addr}
)).get('result', {})
async def get_seqno(self, addr: str):
return (await self.request(
"GET", 'getWalletInformation',
json={'address': addr}
)).get('result', {}).get('seqno', 0)
async def run_get_method(self, addr, method, stack=[]):
return (await self.request(
"POST", 'runGetMethod', json={
'address': addr,
'method': method,
'stack': stack if type(stack) == list else []
}
)).get('result', {})
async def get_transactions(
self, addr: str, limit: int = 100, lt: str = None, hash: str = None,
offset: int = 0, to_lt: str = None
):
return (await self.request(
"GET", 'getTransactions', params={
'address': addr,
}
)).get('result', {})