uploader-bot/app/core/background/indexer_service.py

434 lines
23 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import os
from base64 import b64decode
from datetime import datetime
from base58 import b58encode
from sqlalchemy import String, and_, desc, cast
from tonsdk.boc import Cell
from tonsdk.utils import Address
from app.core._config import CLIENT_TELEGRAM_BOT_USERNAME, PROJECT_HOST
from app.core.events.service import record_event
from app.core._blockchain.ton.platform import platform
from app.core._blockchain.ton.toncenter import toncenter
from app.core._utils.send_status import send_status
from app.core.logger import make_log
from app.core.models import UserContent, KnownTelegramMessage, ServiceConfig
from app.core.models.user import User
from app.core.models.node_storage import StoredContent
from app.core._utils.resolve_content import resolve_content
from app.core.models.wallet_connection import WalletConnection
from app.core.models._telegram import Wrapped_CBotChat
MIN_ONCHAIN_INDEX = int(os.getenv("MIN_ONCHAIN_INDEX", "8"))
from sqlalchemy import select, and_, desc
from app.core.storage import db_session
import os
import traceback
async def indexer_loop(memory, platform_found: bool, seqno: int) -> [bool, int]:
if not platform_found:
platform_state = await toncenter.get_account(platform.address.to_string(1, 1, 1))
if not platform_state.get('code'):
make_log("TON", "Platform contract is not deployed, skipping loop", level="info")
await send_status("indexer", "not working: platform is not deployed")
return False, seqno
else:
platform_found = True
make_log("Indexer", "Service running", level="debug")
async with db_session() as session:
try:
result = await toncenter.run_get_method('EQD8TJ8xEWB1SpnRE4d89YO3jl0W0EiBnNS4IBaHaUmdfizE', 'get_pool_data')
assert result['exit_code'] == 0, f"Error in get-method: {result}"
assert result['stack'][0][0] == 'num', f"get first element is not num"
assert result['stack'][1][0] == 'num', f"get second element is not num"
usdt_per_ton = (int(result['stack'][0][1], 16) * 1e3) / int(result['stack'][1][1], 16)
ton_per_star = 0.014 / usdt_per_ton
await ServiceConfig(session).set('live_tonPerStar', [ton_per_star, datetime.utcnow().timestamp()])
make_log("TON_Daemon", f"TON per STAR price: {ton_per_star}", level="DEBUG")
except BaseException as e:
make_log("TON_Daemon", f"Error while saving TON per STAR price: {e}" + '\n' + traceback.format_exc(), level="ERROR")
from sqlalchemy import cast
from sqlalchemy.dialects.postgresql import JSONB
new_licenses = (await session.execute(select(UserContent).where(
and_(
~(cast(UserContent.meta, JSONB).contains({'notification_sent': True})),
UserContent.type == 'nft/listen'
)
))).scalars().all()
for new_license in new_licenses:
try:
licensed_content = (await session.execute(select(StoredContent).where(
StoredContent.id == new_license.content_id
))).scalars().first()
if not licensed_content:
make_log("Indexer", f"Licensed content not found: {new_license.content_id}", level="error")
new_license.meta = {**(new_license.meta or {}), 'notification_sent': True, 'notification_error': 'content_not_found'}
await session.commit()
continue
try:
content_metadata = await licensed_content.metadata_json_async(session)
except BaseException as e:
make_log("Indexer", f"Metadata fetch failed for content_id={licensed_content.id}: {e}", level="warning")
content_metadata = None
# Metadata is best-effort here: it should never block indexer loop progress.
if not content_metadata:
content_metadata = {
'name': licensed_content.meta.get('title') or licensed_content.filename or 'Unknown',
'artist': licensed_content.meta.get('artist'),
'title': licensed_content.meta.get('title'),
}
if not (licensed_content.owner_address == new_license.owner_address):
try:
user = await session.get(User, new_license.user_id)
if user and user.telegram_id:
await (
Wrapped_CBotChat(
memory._client_telegram_bot,
chat_id=user.telegram_id,
user=user,
db_session=session,
)
).send_content(session, licensed_content)
wallet_owner_connection = (await session.execute(
select(WalletConnection).where(
WalletConnection.wallet_address == licensed_content.owner_address,
WalletConnection.invalidated == False
).order_by(desc(WalletConnection.id))
)).scalars().first()
wallet_owner_user = await session.get(User, wallet_owner_connection.user_id) if wallet_owner_connection else None
if wallet_owner_user and wallet_owner_user.telegram_id:
wallet_owner_bot = Wrapped_CBotChat(
memory._telegram_bot,
chat_id=wallet_owner_user.telegram_id,
user=wallet_owner_user,
db_session=session,
)
meta_title = content_metadata.get('title') or content_metadata.get('name') or 'Unknown'
meta_artist = content_metadata.get('artist')
formatted_title = f"{meta_artist} {meta_title}" if meta_artist else meta_title
await wallet_owner_bot.send_message(
user.translated('p_licenseWasBought').format(
username=user.front_format(),
nft_address=f'"https://tonviewer.com/{new_license.onchain_address}"',
content_title=formatted_title,
),
message_type='notification',
)
except BaseException as e:
make_log("IndexerSendNewLicense", f"Error: {e}" + '\n' + traceback.format_exc(), level="error")
# Preserve current behavior: do not retry notifications indefinitely.
new_license.meta = {**(new_license.meta or {}), 'notification_sent': True}
await session.commit()
except BaseException as e:
# Never allow a single broken license/metadata record to block the whole indexer loop.
make_log("Indexer", f"Error processing new license {getattr(new_license, 'id', None)}: {e}" + '\n' + traceback.format_exc(), level="error")
new_license.meta = {**(new_license.meta or {}), 'notification_sent': True, 'notification_error': str(e)[:256]}
await session.commit()
content_without_cid = (await session.execute(select(StoredContent).where(StoredContent.content_id == None))).scalars().all()
for target_content in content_without_cid:
target_cid = target_content.cid.serialize_v2()
make_log("Indexer", f"Content without CID: {target_content.hash}, setting CID: {target_cid}", level="debug")
target_content.content_id = target_cid
await session.commit()
last_known_index_ = (await session.execute(
select(StoredContent).where(StoredContent.onchain_index != None).order_by(StoredContent.onchain_index.desc())
)).scalars().first()
last_known_index = last_known_index_.onchain_index if last_known_index_ else 0
last_known_index = max(last_known_index, 0)
if last_known_index < (MIN_ONCHAIN_INDEX - 1):
make_log(
"Indexer",
f"Adjusting last_known_index from {last_known_index} to {MIN_ONCHAIN_INDEX - 1} (MIN_ONCHAIN_INDEX)",
level="debug"
)
last_known_index = MIN_ONCHAIN_INDEX - 1
make_log("Indexer", f"Last known index: {last_known_index}", level="debug")
next_item_index = last_known_index + 1
resolve_item_result = await toncenter.run_get_method(platform.address.to_string(1, 1, 1), 'get_nft_address_by_index', [['num', next_item_index]])
make_log("Indexer", f"Resolve item result: {resolve_item_result}", level="debug")
if resolve_item_result.get('exit_code', -1) != 0:
make_log("Indexer", f"Resolve item error: {resolve_item_result}", level="error")
return platform_found, seqno
item_address_cell_b64 = resolve_item_result['stack'][0][1]["bytes"]
item_address_slice = Cell.one_from_boc(b64decode(item_address_cell_b64)).begin_parse()
item_address = item_address_slice.read_msg_addr()
make_log("Indexer", f"Item address: {item_address.to_string(1, 1, 1)}", level="debug")
item_get_data_result = await toncenter.run_get_method(item_address.to_string(1, 1, 1), 'indexator_data')
if item_get_data_result.get('exit_code', -1) != 0:
make_log("Indexer", f"Get item data error (maybe not deployed): {item_get_data_result}", level="debug")
return platform_found, seqno
assert item_get_data_result['stack'][0][0] == 'num', "Item type is not a number"
assert int(item_get_data_result['stack'][0][1], 16) == 1, "Item is not COP NFT"
item_returned_address = Cell.one_from_boc(b64decode(item_get_data_result['stack'][1][1]['bytes'])).begin_parse().read_msg_addr()
assert (
item_returned_address.to_string(1, 1, 1) == item_address.to_string(1, 1, 1)
), "Item address mismatch"
assert item_get_data_result['stack'][2][0] == 'num', "Item index is not a number"
item_index = int(item_get_data_result['stack'][2][1], 16)
if item_index < MIN_ONCHAIN_INDEX:
make_log(
"Indexer",
f"Skip on-chain item {item_index}: below MIN_ONCHAIN_INDEX={MIN_ONCHAIN_INDEX}",
level="info"
)
return platform_found, seqno
assert item_index == next_item_index, "Item index mismatch"
item_platform_address = Cell.one_from_boc(b64decode(item_get_data_result['stack'][3][1]['bytes'])).begin_parse().read_msg_addr()
assert item_platform_address.to_string(1, 1, 1) == Address(platform.address.to_string(1, 1, 1)).to_string(1, 1, 1), "Item platform address mismatch"
assert item_get_data_result['stack'][4][0] == 'num', "Item license type is not a number"
item_license_type = int(item_get_data_result['stack'][4][1], 16)
assert item_license_type == 0, "Item license type is not 0"
item_owner_address = Cell.one_from_boc(b64decode(item_get_data_result['stack'][5][1]["bytes"])).begin_parse().read_msg_addr()
item_values = Cell.one_from_boc(b64decode(item_get_data_result['stack'][6][1]['bytes']))
item_derivates = Cell.one_from_boc(b64decode(item_get_data_result['stack'][7][1]['bytes']))
item_platform_variables = Cell.one_from_boc(b64decode(item_get_data_result['stack'][8][1]['bytes']))
item_distribution = Cell.one_from_boc(b64decode(item_get_data_result['stack'][9][1]['bytes']))
item_distribution_slice = item_distribution.begin_parse()
item_prices_slice = item_distribution_slice.refs[0].begin_parse()
item_listen_license_price = item_prices_slice.read_coins()
item_use_license_price = item_prices_slice.read_coins()
item_resale_license_price = item_prices_slice.read_coins()
item_values_slice = item_values.begin_parse()
item_content_hash_int = item_values_slice.read_uint(256)
item_content_hash = item_content_hash_int.to_bytes(32, 'big')
# item_content_hash_str = b58encode(item_content_hash).decode()
item_metadata = item_values_slice.refs[0]
item_content = item_values_slice.refs[1]
item_metadata_str = item_metadata.bits.array.decode()
item_content_cid_str = item_content.refs[0].bits.array.decode()
item_content_cover_cid_str = item_content.refs[1].bits.array.decode()
item_content_metadata_cid_str = item_content.refs[2].bits.array.decode()
item_content_cid, err = resolve_content(item_content_cid_str)
item_content_hash = item_content_cid.content_hash
item_content_hash_str = item_content_cid.content_hash_b58
item_metadata_packed = {
'license_type': item_license_type,
'item_address': item_address.to_string(1, 1, 1),
'content_cid': item_content_cid_str,
'cover_cid': item_content_cover_cid_str,
'metadata_cid': item_content_metadata_cid_str,
'derivates': b58encode(item_derivates.to_boc(False)).decode(),
'platform_variables': b58encode(item_platform_variables.to_boc(False)).decode(),
'license': {
'listen': {
'price': str(item_listen_license_price)
},
'use': {
'price': str(item_use_license_price)
},
'resale': {
'price': str(item_resale_license_price)
}
}
}
user_wallet_connection = None
if item_owner_address:
user_wallet_connection = (await session.execute(select(WalletConnection).where(
WalletConnection.wallet_address == item_owner_address.to_string(1, 1, 1)
))).scalars().first()
encrypted_stored_content = (await session.execute(select(StoredContent).where(
StoredContent.hash == item_content_hash_str
))).scalars().first()
if encrypted_stored_content:
is_duplicate = encrypted_stored_content.type.startswith("onchain") \
and encrypted_stored_content.onchain_index != item_index
if not is_duplicate:
if encrypted_stored_content.type.startswith('local'):
encrypted_stored_content.type = "onchain/content" + ("_unknown" if (encrypted_stored_content.key_id is None) else "")
encrypted_stored_content.onchain_index = item_index
encrypted_stored_content.owner_address = item_owner_address.to_string(1, 1, 1)
user = None
if user_wallet_connection:
encrypted_stored_content.user_id = user_wallet_connection.user_id
user = await session.get(User, user_wallet_connection.user_id)
if user:
# Notify user about indexed content via client bot (main UX bot),
# but keep ability to clean up uploader-bot hint messages.
user_client_wrapper = Wrapped_CBotChat(
memory._client_telegram_bot,
chat_id=user.telegram_id,
user=user,
db_session=session,
)
user_uploader_wrapper = Wrapped_CBotChat(
memory._telegram_bot,
chat_id=user.telegram_id,
user=user,
db_session=session,
)
ref_id = (user.meta or {}).get('ref_id')
if not ref_id:
ref_id = user.ensure_ref_id()
await session.commit()
message_text = user.translated('p_contentWasIndexed').format(
item_address=item_address.to_string(1, 1, 1),
item_index=item_index,
)
await user_client_wrapper.send_message(
message_text,
message_type='notification'
)
await user_client_wrapper.send_content(
session,
encrypted_stored_content
)
try:
result = await session.execute(select(KnownTelegramMessage).where(
and_(
KnownTelegramMessage.chat_id == user.telegram_id,
KnownTelegramMessage.type == 'hint',
cast(KnownTelegramMessage.meta['encrypted_content_hash'], String) == encrypted_stored_content.hash,
KnownTelegramMessage.deleted == False
)
))
for hint_message in result.scalars().all():
# Delete the hint with the bot that originally sent it.
if hint_message.bot_id == user_client_wrapper.bot_id:
await user_client_wrapper.delete_message(hint_message.message_id)
elif hint_message.bot_id == user_uploader_wrapper.bot_id:
await user_uploader_wrapper.delete_message(hint_message.message_id)
except BaseException as e:
make_log("Indexer", f"Error while deleting hint messages: {e}" + '\n' + traceback.format_exc(), level="error")
elif encrypted_stored_content.type.startswith('onchain') and encrypted_stored_content.onchain_index == item_index:
encrypted_stored_content.type = "onchain/content" + ("_unknown" if (encrypted_stored_content.key_id is None) else "")
encrypted_stored_content.owner_address = item_owner_address.to_string(1, 1, 1)
if user_wallet_connection:
encrypted_stored_content.user_id = user_wallet_connection.user_id
else:
make_log("Indexer", f"[CRITICAL] Item already indexed and ERRORED!: {item_content_hash_str}", level="error")
return platform_found, seqno
encrypted_stored_content.updated = datetime.now()
encrypted_stored_content.meta = {
**encrypted_stored_content.meta,
**item_metadata_packed
}
encrypted_stored_content.content_id = item_content_cid_str
try:
await record_event(
session,
'content_indexed',
{
'onchain_index': item_index,
'content_hash': item_content_hash_str,
'encrypted_cid': item_content_cid_str,
'item_address': item_address.to_string(1, 1, 1),
'owner_address': item_owner_address.to_string(1, 1, 1) if item_owner_address else None,
},
origin_host=PROJECT_HOST,
)
except Exception as exc:
make_log("Events", f"Failed to record content_indexed event: {exc}", level="warning")
await session.commit()
return platform_found, seqno
else:
item_metadata_packed['copied_from'] = encrypted_stored_content.id
item_metadata_packed['copied_from_cid'] = encrypted_stored_content.cid.serialize_v2()
item_content_hash_str = f"{b58encode(bytes(16) + os.urandom(30)).decode()}" # check this for vulnerability
onchain_stored_content = StoredContent(
type="onchain/content_unknown",
hash=item_content_hash_str,
onchain_index=item_index,
owner_address=item_owner_address.to_string(1, 1, 1) if item_owner_address else None,
meta=item_metadata_packed,
filename="UNKNOWN_ENCRYPTED_CONTENT",
user_id=user_wallet_connection.user_id if user_wallet_connection else None,
created=datetime.now(),
encrypted=True,
decrypted_content_id=None,
key_id=None,
content_id=item_content_cid_str,
updated=datetime.now()
)
session.add(onchain_stored_content)
try:
await record_event(
session,
'content_indexed',
{
'onchain_index': item_index,
'content_hash': item_content_hash_str,
'encrypted_cid': item_content_cid_str,
'item_address': item_address.to_string(1, 1, 1),
'owner_address': item_owner_address.to_string(1, 1, 1) if item_owner_address else None,
},
origin_host=PROJECT_HOST,
)
except Exception as exc:
make_log("Events", f"Failed to record content_indexed event: {exc}", level="warning")
await session.commit()
make_log("Indexer", f"Item indexed: {item_content_hash_str}", level="info")
last_known_index += 1
return platform_found, seqno
async def main_fn(memory, ):
make_log("Indexer", "Service started", level="info")
platform_found = False
seqno = 0
while True:
try:
rid = __import__('uuid').uuid4().hex[:8]
try:
from app.core.log_context import ctx_rid
ctx_rid.set(rid)
except BaseException:
pass
make_log("Indexer", f"Loop start", level="debug", rid=rid)
platform_found, seqno = await indexer_loop(memory, platform_found, seqno)
except BaseException as e:
make_log("Indexer", f"Error: {e}" + '\n' + traceback.format_exc(), level="error", rid=locals().get('rid'))
if platform_found:
await send_status("indexer", f"working (seqno={seqno})")
await asyncio.sleep(5)
seqno += 1
try:
from app.core.log_context import ctx_rid
ctx_rid.set(None)
except BaseException:
pass
# if __name__ == '__main__':
# loop = asyncio.get_event_loop()
# loop.run_until_complete(main())
# loop.close()