115 lines
4.9 KiB
Python
115 lines
4.9 KiB
Python
import asyncio
|
|
from base64 import b64decode
|
|
from datetime import datetime, timedelta
|
|
|
|
from base58 import b58encode
|
|
from sqlalchemy import and_
|
|
from tonsdk.boc import Cell
|
|
from tonsdk.utils import Address
|
|
|
|
from app.core._blockchain.ton.platform import platform
|
|
from app.core._blockchain.ton.toncenter import toncenter
|
|
from app.core._utils.send_status import send_status
|
|
from app.core.logger import make_log
|
|
from app.core.models.node_storage import StoredContent
|
|
from app.core.models.content.user_content import UserContent, UserAction
|
|
from app.core._utils.resolve_content import resolve_content
|
|
from app.core.models.wallet_connection import WalletConnection
|
|
from app.core._keyboards import get_inline_keyboard
|
|
from app.core.models._telegram import Wrapped_CBotChat
|
|
from app.core.storage import db_session
|
|
from app.core._config import CLIENT_TELEGRAM_API_KEY
|
|
from app.core.models.user import User
|
|
import os
|
|
import traceback
|
|
|
|
|
|
async def license_index_loop(memory, platform_found: bool, seqno: int) -> [bool, int]:
|
|
make_log("LicenseIndex", "Service running", level="debug")
|
|
with db_session() as session:
|
|
for user in session.query(User).filter(
|
|
User.last_use > datetime.now() - timedelta(minutes=10)
|
|
).all():
|
|
last_updated_licenses = user.meta.get('last_updated_licenses')
|
|
make_log("LicenseIndex", f"User: {user.id}, last_updated_licenses: {last_updated_licenses}", level="info")
|
|
if last_updated_licenses and (datetime.now() - datetime.fromisoformat(last_updated_licenses)) < timedelta(minutes=1):
|
|
continue
|
|
|
|
try:
|
|
await user.scan_owned_user_content(session)
|
|
user.meta['last_updated_licenses'] = datetime.now().isoformat()
|
|
except BaseException as e:
|
|
make_log("LicenseIndex", f"Error: {e}" + '\n' + traceback.format_exc(), level="error")
|
|
|
|
for content in session.query(UserContent).filter(
|
|
and_(
|
|
UserContent.type.startswith('nft/'),
|
|
UserContent.updated < (datetime.now() - timedelta(minutes=15)),
|
|
)
|
|
):
|
|
make_log("LicenseIndex", f"Syncing content with blockchain: {content.id}", level="info")
|
|
try:
|
|
await content.sync_with_chain(session)
|
|
content.updated = datetime.now()
|
|
session.commit()
|
|
except BaseException as e:
|
|
make_log("LicenseIndex", f"Error: {e}" + '\n' + traceback.format_exc(), level="error")
|
|
|
|
for action in session.query(UserAction).filter(
|
|
and_(
|
|
UserAction.type == 'purchase',
|
|
# UserAction.updated < (datetime.now() - timedelta(minutes=5)),
|
|
UserAction.status == 'requested',
|
|
)
|
|
):
|
|
try:
|
|
user = session.query(User).filter_by(id=action.user_id).first()
|
|
chat_wrap = Wrapped_CBotChat(CLIENT_TELEGRAM_API_KEY, chat_id=user.telegram_id, db_session=session, user=user)
|
|
content = session.query(StoredContent).filter_by(id=action.content_id).first()
|
|
|
|
edited_ = False
|
|
if (datetime.now() - action.created) > timedelta(minutes=5):
|
|
if action.telegram_message_id:
|
|
await chat_wrap.delete_message(action.telegram_message_id)
|
|
|
|
make_log("LicenseIndex", f"Action timeout: {action.id}", level="info")
|
|
action.status = 'canceled'
|
|
edited_ = True
|
|
else:
|
|
user_wallet_address = user.wallet_address(session)
|
|
user_content = session.query(UserContent).filter_by(content_id=action.content_id, status='active', owner_address=user_wallet_address).first()
|
|
if user_content:
|
|
make_log("LicenseIndex", f"User already has content: {user_content.content_id}", level="info")
|
|
action.status = 'success'
|
|
edited_ = True
|
|
|
|
if edited_:
|
|
session.commit()
|
|
await chat_wrap.send_content(session, content)
|
|
except BaseException as e:
|
|
make_log("LicenseIndex", f"Error: {e}" + '\n' + traceback.format_exc(), level="error")
|
|
|
|
return platform_found, seqno
|
|
|
|
|
|
async def main_fn(memory, ):
|
|
make_log("LicenseIndex", "Service started", level="info")
|
|
platform_found = False
|
|
seqno = 0
|
|
while True:
|
|
try:
|
|
platform_found, seqno = await license_index_loop(memory, platform_found, seqno)
|
|
except BaseException as e:
|
|
make_log("LicenseIndex", f"Error: {e}" + '\n' + traceback.format_exc(), level="error")
|
|
|
|
if platform_found:
|
|
await send_status("LicenseIndex", f"working (seqno={seqno})")
|
|
|
|
await asyncio.sleep(5)
|
|
seqno += 1
|
|
|
|
# if __name__ == '__main__':
|
|
# loop = asyncio.get_event_loop()
|
|
# loop.run_until_complete(main())
|
|
# loop.close()
|