import os import sys import traceback from aiogram import types, Router, F from app.core._keyboards import get_inline_keyboard from app.core.models.node_storage import StoredContent import json from app.core.logger import make_log from app.core.models.content.user_content import UserAction, UserContent from app.client_bot.routers.home import router as home_router from app.client_bot.routers.tonconnect import router as tonconnect_router from app.core._config import CLIENT_TELEGRAM_BOT_USERNAME, PROJECT_HOST from app.core.logger import logger from app.core.content.content_id import ContentId import base58 from datetime import datetime, timedelta import asyncio import urllib from app.core._blockchain.ton.connect import TonConnect, wallet_obj_by_name router = Router() CACHE_CHAT_ID = -1002390124789 async def t_callback_purchase_node_content(query: types.CallbackQuery, memory=None, user=None, db_session=None, chat_wrap=None, **extra): content_oid = int(query.data.split('_')[1]) is_cancel_request = query.data.split('_')[2] == 'cancel' if len(query.data.split('_')) > 2 else False content = db_session.query(StoredContent).filter_by(id=content_oid).first() if not content: return await query.answer(user.translated('error_contentNotFound'), show_alert=True) if not is_cancel_request: # Logging purchase attempt with detailed information license_price = content.meta.get('license', {}).get('listen', {}).get('price') license_price_num = int(license_price) if license_price_num < 1: return await query.answer(user.translated('error_contentPrice'), show_alert=True) make_log("Purchase", f"User {user.id} initiated purchase for content ID {content_oid}. License price: {license_price_num}.", level='info') ton_connect, ton_connection = TonConnect.by_user(db_session, user, callback_fn=()) await ton_connect.restore_connection() assert ton_connect.connected, "No connected wallet" user_wallet_address = user.wallet_address(db_session) memory._app.add_task(ton_connect._sdk_client.send_transaction({ 'valid_until': int(datetime.now().timestamp() + 300), 'messages': [ { 'address': content.meta['item_address'], 'amount': license_price } ] })) new_action = UserAction( type='purchase', user_id=user.id, content_id=content.id, telegram_message_id=query.message.message_id, from_address=user_wallet_address, to_address=content.meta['item_address'], status='requested', meta={ 'confirmation_url': wallet_obj_by_name(ton_connection.wallet_key.split('==')[0])['universal_url'] }, created=datetime.now() ) db_session.add(new_action) else: # Logging cancellation attempt with detailed information make_log("Purchase", f"User {user.id} cancelled purchase for content ID {content_oid}.", level='info') action = db_session.query(UserAction).filter_by( type='purchase', content_id=content_oid, user_id=user.id, status='requested' ).first() if not action: return await query.answer() action.status = 'canceled' db_session.commit() await chat_wrap.send_content(db_session, content, message_id=query.message.message_id) router.callback_query.register(t_callback_purchase_node_content, F.data.startswith('PC_')) async def t_inline_query_node_content(query: types.InlineQuery, memory=None, user=None, db_session=None, chat_wrap=None, **extra): # Logging the received inline query using issuer "InlineSearch" make_log("InlineSearch", f"Received inline query: '{query.query}'", level='info') try: source_args = query.query[1:] source_args_ext = query.query if source_args.startswith('Q'): license_onchain_address = source_args[1:] licensed_content = db_session.query(UserContent).filter_by( onchain_address=license_onchain_address, ).first().content make_log("InlineSearch", f"Query '{query.query}' is a license query for content ID {licensed_content.id}.", level='info') args = content.cid.serialize_v2() else: args = source_args[1:] cid = ContentId.deserialize(args) content_list = [] content = db_session.query(StoredContent).filter_by(hash=cid.content_hash_b58).first() content_prod = content.open_content(db_session) # Get both encrypted and decrypted content objects encrypted_content = content_prod['encrypted_content'] decrypted_content = content_prod['decrypted_content'] decrypted_content_meta = decrypted_content.json_format() try: metadata_content = StoredContent.from_cid(db_session, content.json_format()['metadata_cid']) with open(metadata_content.filepath, 'r') as f: metadata_content_json = json.loads(f.read()) except BaseException as e: # Logging metadata retrieval failure with detailed query information make_log("InlineSearch", f"Failed to retrieve metadata for query '{query.query}': {e}", level='warning') return await query.answer(content_list, cache_time=1) audio_title = metadata_content_json.get('name', "").split(' - ') title, performer = None, None if len(audio_title) > 1: performer = audio_title[0].strip() audio_title = audio_title[1:] title = audio_title[0].strip() result_kwargs = {} try: cover_content = StoredContent.from_cid(db_session, decrypted_content_meta.get('cover_cid') or None) except BaseException as e: cover_content = None if cover_content: result_kwargs['thumb_url'] = cover_content.web_url content_type_declared = decrypted_content_meta.get('content_type', 'application/x-binary').split('/')[0] hashtags_str = (' '.join(f"#{_h}" for _h in metadata_content_json.get('hashtags', []))).strip() if hashtags_str: hashtags_str = hashtags_str + '\n' # Upload preview of decrypted content (limit to 30 seconds) if content_type_declared in ('audio', 'video'): if not decrypted_content.meta.get('telegram_file_cache_preview'): try: # Construct URL for trimmed preview using decrypted content preview_content = db_session.query(StoredContent).filter_by( hash=content.meta.get('converted_content', {}).get('low_preview') ).first() preview_url = preview_content.web_url if content_type_declared == 'video': preview_message = await query.bot.send_video( chat_id=CACHE_CHAT_ID, # English: Cache chat ID defined in configuration video=types.URLInputFile(preview_url), # English: Upload video using URL caption="Preview upload", # English caption supports_streaming=True ) preview_file_id = preview_message.video.file_id else: preview_message = await query.bot.send_audio( chat_id=CACHE_CHAT_ID, # English: Cache chat ID defined in configuration audio=types.URLInputFile(preview_url), # English: Upload audio using URL caption="Preview upload", # English caption title=title, performer=performer, thumbnail=result_kwargs.get('thumb_url'), ) preview_file_id = preview_message.audio.file_id # Save the preview file_id in decrypted_content.meta for future use decrypted_content.meta = { **decrypted_content.meta, 'telegram_file_cache_preview': preview_file_id } db_session.commit() except Exception as e: # Logging error during preview upload with detailed content type and query information make_log("InlineSearch", f"Error uploading preview for content type '{content_type_declared}' during inline query '{query.query}': {e}", level='error') content_share_link = { 'text': user.translated('p_shareLinkContext').format(title=' – '.join(audio_title)), 'url': f"https://t.me/{CLIENT_TELEGRAM_BOT_USERNAME}/content?startapp={source_args}" } # Create inline query result using decrypted content's file_id if content_type_declared == 'audio': content_list.append( types.InlineQueryResultCachedAudio( id=f"NC_{content.id}_{int(datetime.now().timestamp() // 60)}", audio_file_id=decrypted_content.meta['telegram_file_cache_preview'], caption=hashtags_str + user.translated('p_playerContext_preview'), parse_mode='html', reply_markup=get_inline_keyboard([ [ { 'text': { 'audio': user.translated('shareTrack_button'), 'video': user.translated('shareVideo_button'), }[content_type_declared], 'switch_inline_query': source_args_ext }, { 'text': user.translated('shareLink_button'), 'url': f"https://t.me/share/url?text={urllib.parse.quote(content_share_link['text'])}&url={urllib.parse.quote(content_share_link['url'])}" } ], [{ 'text': user.translated('viewTrack_button'), 'url': f"https://t.me/MY_Web3Bot/content?startapp={source_args}" }] ]), **result_kwargs ) ) elif content_type_declared == 'video': content_list.append( types.InlineQueryResultCachedVideo( id=f"NC_{content.id}_{int(datetime.now().timestamp() // 60)}", video_file_id=decrypted_content.meta['telegram_file_cache_preview'], title=title, caption=hashtags_str + user.translated('p_playerContext_preview'), parse_mode='html', reply_markup=get_inline_keyboard([ [ { 'text': user.translated('shareVideo_button'), 'switch_inline_query': f"C{content.cid.serialize_v2()}" }, { 'text': user.translated('shareLink_button'), 'url': f"https://t.me/share/url?text={urllib.parse.quote(content_share_link['text'])}&url={urllib.parse.quote(content_share_link['url'])}" } ], [{ 'text': user.translated('viewTrack_button'), 'url': f"https://t.me/MY_Web3Bot/content?startapp={source_args}" }] ]), **result_kwargs ) ) # Logging the final inline query result count with detailed explanation make_log("InlineSearch", f"Processed inline query '{query.query}'. Found {len(content_list)} inline result(s) based on query parsing and content retrieval.", level='info') return await query.answer(content_list, cache_time=1) except BaseException as e: # Logging exception during inline query processing with detailed query information make_log("InlineSearch", f"Error processing inline query '{query.query}': {e}", level='error') traceback.print_exc() return await query.answer([], cache_time=1) async def t_chosen_inline_result_node_content(query: types.ChosenInlineResult, memory=None, user=None, db_session=None, chat_wrap=None, **extra): # Logging chosen inline result with detailed user and result ID make_log("ChosenResult", f"User {user.id} selected inline result with ID '{query.result_id}'.", level='info') # return await query.answer([]) router.inline_query.register(t_inline_query_node_content, F.query.startswith('C')) router.inline_query.register(t_inline_query_node_content, F.query.startswith('Q')) router.chosen_inline_result.register(t_chosen_inline_result_node_content, lambda: True)