128 lines
5.3 KiB
Python
128 lines
5.3 KiB
Python
import asyncio
|
|
|
|
from app.core.models.content.user_content import UserContent
|
|
from app.core.models.wallet_connection import WalletConnection
|
|
from app.core._blockchain.ton.toncenter import toncenter
|
|
from tonsdk.utils import Address
|
|
from datetime import datetime, timedelta
|
|
from app.core.logger import make_log
|
|
from httpx import AsyncClient
|
|
|
|
|
|
class WalletMixin:
|
|
def wallet_connection(self, db_session):
|
|
return db_session.query(WalletConnection).filter(
|
|
WalletConnection.user_id == self.id,
|
|
WalletConnection.invalidated == False
|
|
).order_by(WalletConnection.created.desc()).first()
|
|
|
|
def wallet_address(self, db_session):
|
|
wallet_connection = self.wallet_connection(db_session)
|
|
return wallet_connection.wallet_address if wallet_connection else None
|
|
|
|
async def scan_owned_user_content(self, db_session):
|
|
user_wallet_address = self.wallet_address(db_session)
|
|
|
|
async def get_nft_items_list():
|
|
try:
|
|
# TODO: support more than 1000 items
|
|
async with AsyncClient() as client:
|
|
response = await client.get(f"https://tonapi.io/v2/accounts/{user_wallet_address}/nfts?limit=1000&offset=0&indirect_ownership=false")
|
|
response = response.json()
|
|
assert 'nft_items' in response
|
|
except BaseException as e:
|
|
make_log(self, f"Error while fetching NFTs: {e}", level='error')
|
|
await asyncio.sleep(2)
|
|
return await get_nft_items_list()
|
|
|
|
nfts_list = await get_nft_items_list()
|
|
for nft_item in nfts_list:
|
|
item_address = Address(nft_item['address']).to_string(1, 1, 1)
|
|
owner_address = Address(nft_item['owner']['address']).to_string(1, 1, 1)
|
|
|
|
user_content = db_session.query(UserContent).filter(
|
|
UserContent.onchain_address == item_address
|
|
).first()
|
|
if user_content:
|
|
continue
|
|
|
|
user_content = UserContent(
|
|
type='nft/unknown',
|
|
onchain_address=item_address,
|
|
owner_address=owner_address,
|
|
code_hash=None,
|
|
data_hash=None,
|
|
updated=datetime.fromtimestamp(0),
|
|
content_id=None, # not resolved yet
|
|
created=datetime.now(),
|
|
meta={},
|
|
user_id=self.id,
|
|
wallet_connection_id=self.wallet_connection(db_session).id,
|
|
status="active"
|
|
)
|
|
db_session.add(user_content)
|
|
db_session.commit()
|
|
make_log(self, f"New onchain NFT found: {item_address}", level='info')
|
|
|
|
async def ____scan_owned_user_content(self, db_session):
|
|
page_id = -1
|
|
page_size = 100
|
|
have_next_page = True
|
|
user_wallet_address = self.wallet_address(db_session)
|
|
while have_next_page:
|
|
page_id += 1
|
|
nfts_list = await toncenter.get_nft_items(limit=100, offset=page_id * page_size, owner_address=user_wallet_address)
|
|
if len(nfts_list) >= page_size:
|
|
have_next_page = True
|
|
|
|
for nft_item in nfts_list:
|
|
try:
|
|
# make_log(self, f"Scanning onchain NFT: {nft_item}", level='info')
|
|
item_address = Address(nft_item['address']).to_string(1, 1, 1)
|
|
owner_address = Address(nft_item['owner_address']).to_string(1, 1, 1)
|
|
|
|
user_content = db_session.query(UserContent).filter(
|
|
UserContent.onchain_address == item_address
|
|
).first()
|
|
if user_content:
|
|
continue
|
|
|
|
try:
|
|
nft_content = nft_item['content']['uri']
|
|
except KeyError:
|
|
nft_content = None
|
|
|
|
user_content = UserContent(
|
|
type='nft/unknown',
|
|
onchain_address=item_address,
|
|
owner_address=owner_address,
|
|
code_hash=nft_item['code_hash'],
|
|
data_hash=nft_item['data_hash'],
|
|
updated=datetime.fromtimestamp(0),
|
|
content_id=None, # not resolved yet
|
|
created=datetime.now(),
|
|
meta={
|
|
'metadata_uri': nft_content,
|
|
},
|
|
user_id=self.id,
|
|
wallet_connection_id=self.wallet_connection(db_session).id,
|
|
status="active"
|
|
)
|
|
db_session.add(user_content)
|
|
db_session.commit()
|
|
|
|
make_log(self, f"New onchain NFT found: {item_address}", level='info')
|
|
except BaseException as e:
|
|
make_log(self, f"Error while scanning onchain NFT: {e}", level='error')
|
|
continue
|
|
|
|
async def get_user_content(self, db_session, limit=100, offset=0):
|
|
try:
|
|
await self.scan_owned_user_content(db_session)
|
|
except BaseException as e:
|
|
make_log(self, f"Error while scanning user content: {e}", level='error')
|
|
|
|
return self.db_session.query(UserContent).filter(
|
|
UserContent.user_id == self.id
|
|
).offset(offset).limit(limit).all()
|