uploader-bot/app/core/models/user/wallet_mixin.py

75 lines
2.9 KiB
Python

from app.core.models.content.user_content import UserContent
from app.core.models.wallet_connection import WalletConnection
from app.core._blockchain.ton.toncenter import toncenter
from tonsdk.utils import Address
from datetime import datetime, timedelta
from app.core.logger import make_log
class WalletMixin:
def wallet_connection(self, db_session):
return db_session.query(WalletConnection).filter(
WalletConnection.user_id == self.id,
WalletConnection.invalidated == False
).order_by(WalletConnection.created.desc()).first()
def wallet_address(self, db_session):
wallet_connection = self.wallet_connection(db_session)
return wallet_connection.wallet_address if wallet_connection else None
async def scan_owned_user_content(self, db_session):
page_id = -1
page_size = 100
have_next_page = True
while have_next_page:
page_id += 1
nfts_list = await toncenter.get_nft_items(limit=100, offset=page_id * page_size)
if len(nfts_list) >= page_size:
have_next_page = True
for nft_item in nfts_list:
item_address = Address(nft_item['address']).to_string(1, 1, 1)
owner_address = Address(nft_item['owner_address']).to_string(1, 1, 1)
user_content = db_session.query(UserContent).filter(
UserContent.onchain_address == item_address
).first()
if user_content:
continue
try:
nft_content = nft_item['content']['uri']
except KeyError:
nft_content = None
user_content = UserContent(
type='nft/unknown',
onchain_address=item_address,
owner_address=owner_address,
code_hash=nft_item['code_hash'],
data_hash=nft_item['data_hash'],
updated=datetime.fromtimestamp(0),
content_id=None, # not resolved yet
created=datetime.now(),
meta={
'metadata_uri': nft_content,
},
user_id=self.id,
wallet_connection_id=self.wallet_connection(db_session).id,
status="active"
)
db_session.add(user_content)
db_session.commit()
make_log(self, f"New onchain NFT found: {item_address}", level='info')
async def get_user_content(self, db_session, limit=100, offset=0):
try:
await self.scan_owned_user_content(db_session)
except BaseException as e:
make_log(self, f"Error while scanning user content: {e}", level='error')
return self.db_session.query(UserContent).filter(
UserContent.user_id == self.id
).offset(offset).limit(limit).all()