import asyncio from app.core.models.content.user_content import UserContent from app.core.models.wallet_connection import WalletConnection from app.core._blockchain.ton.toncenter import toncenter from tonsdk.utils import Address from datetime import datetime, timedelta from app.core.logger import make_log from httpx import AsyncClient class WalletMixin: def wallet_connection(self, db_session): return db_session.query(WalletConnection).filter( WalletConnection.user_id == self.id, WalletConnection.invalidated == False ).order_by(WalletConnection.created.desc()).first() def wallet_address(self, db_session): wallet_connection = self.wallet_connection(db_session) return wallet_connection.wallet_address if wallet_connection else None async def scan_owned_user_content(self, db_session): user_wallet_address = self.wallet_address(db_session) async def get_nft_items_list(): try: # TODO: support more than 1000 items async with AsyncClient() as client: response = await client.get(f"https://tonapi.io/v2/accounts/{user_wallet_address}/nfts?limit=1000&offset=0&indirect_ownership=false") return response.json()['nft_items'] except BaseException as e: make_log(self, f"Error while fetching NFTs: {e}", level='error') await asyncio.sleep(2) return await get_nft_items_list() nfts_list = await get_nft_items_list() make_log(self, f"Found {len(nfts_list)} NFTs", level='info') for nft_item in nfts_list: item_address = Address(nft_item['address']).to_string(1, 1, 1) owner_address = Address(nft_item['owner']['address']).to_string(1, 1, 1) user_content = db_session.query(UserContent).filter( UserContent.onchain_address == item_address ).first() if user_content: continue user_content = UserContent( type='nft/unknown', onchain_address=item_address, owner_address=owner_address, code_hash=None, data_hash=None, updated=datetime.fromtimestamp(0), content_id=None, # not resolved yet created=datetime.now(), meta={}, user_id=self.id, wallet_connection_id=self.wallet_connection(db_session).id, status="active" ) db_session.add(user_content) db_session.commit() make_log(self, f"New onchain NFT found: {item_address}", level='info') async def ____scan_owned_user_content(self, db_session): page_id = -1 page_size = 100 have_next_page = True user_wallet_address = self.wallet_address(db_session) while have_next_page: page_id += 1 nfts_list = await toncenter.get_nft_items(limit=100, offset=page_id * page_size, owner_address=user_wallet_address) if len(nfts_list) >= page_size: have_next_page = True for nft_item in nfts_list: try: # make_log(self, f"Scanning onchain NFT: {nft_item}", level='info') item_address = Address(nft_item['address']).to_string(1, 1, 1) owner_address = Address(nft_item['owner_address']).to_string(1, 1, 1) user_content = db_session.query(UserContent).filter( UserContent.onchain_address == item_address ).first() if user_content: continue try: nft_content = nft_item['content']['uri'] except KeyError: nft_content = None user_content = UserContent( type='nft/unknown', onchain_address=item_address, owner_address=owner_address, code_hash=nft_item['code_hash'], data_hash=nft_item['data_hash'], updated=datetime.fromtimestamp(0), content_id=None, # not resolved yet created=datetime.now(), meta={ 'metadata_uri': nft_content, }, user_id=self.id, wallet_connection_id=self.wallet_connection(db_session).id, status="active" ) db_session.add(user_content) db_session.commit() make_log(self, f"New onchain NFT found: {item_address}", level='info') except BaseException as e: make_log(self, f"Error while scanning onchain NFT: {e}", level='error') continue async def get_user_content(self, db_session, limit=100, offset=0): try: await self.scan_owned_user_content(db_session) except BaseException as e: make_log(self, f"Error while scanning user content: {e}", level='error') return self.db_session.query(UserContent).filter( UserContent.user_id == self.id ).offset(offset).limit(limit).all()