uploader-bot/app/client_bot/routers/content.py

277 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import sys
import traceback
from aiogram import types, Router, F
from app.core._keyboards import get_inline_keyboard
from app.core.models.node_storage import StoredContent
import json
from app.core.logger import make_log
from app.core.models.content.user_content import UserAction, UserContent
from app.client_bot.routers.home import router as home_router
from app.client_bot.routers.tonconnect import router as tonconnect_router
from app.core._config import CLIENT_TELEGRAM_BOT_USERNAME, PROJECT_HOST
from app.core.logger import logger
from app.core.content.content_id import ContentId
import base58
from datetime import datetime, timedelta
import asyncio
import urllib
from app.core._blockchain.ton.connect import TonConnect, wallet_obj_by_name
router = Router()
CACHE_CHAT_ID = -1002390124789
async def t_callback_purchase_node_content(query: types.CallbackQuery, memory=None, user=None, db_session=None, chat_wrap=None, **extra):
content_oid = int(query.data.split('_')[1])
is_cancel_request = query.data.split('_')[2] == 'cancel' if len(query.data.split('_')) > 2 else False
content = db_session.query(StoredContent).filter_by(id=content_oid).first()
if not content:
return await query.answer(user.translated('error_contentNotFound'), show_alert=True)
if not is_cancel_request:
# Logging purchase attempt with detailed information
license_price = content.meta.get('license', {}).get('listen', {}).get('price')
license_price_num = int(license_price)
if license_price_num < 1:
return await query.answer(user.translated('error_contentPrice'), show_alert=True)
make_log("Purchase", f"User {user.id} initiated purchase for content ID {content_oid}. License price: {license_price_num}.", level='info')
ton_connect, ton_connection = TonConnect.by_user(db_session, user, callback_fn=())
await ton_connect.restore_connection()
assert ton_connect.connected, "No connected wallet"
user_wallet_address = user.wallet_address(db_session)
memory._app.add_task(ton_connect._sdk_client.send_transaction({
'valid_until': int(datetime.now().timestamp() + 300),
'messages': [
{
'address': content.meta['item_address'],
'amount': license_price
}
]
}))
new_action = UserAction(
type='purchase',
user_id=user.id,
content_id=content.id,
telegram_message_id=query.message.message_id,
from_address=user_wallet_address,
to_address=content.meta['item_address'],
status='requested',
meta={
'confirmation_url': wallet_obj_by_name(ton_connection.wallet_key.split('==')[0])['universal_url']
},
created=datetime.now()
)
db_session.add(new_action)
else:
# Logging cancellation attempt with detailed information
make_log("Purchase", f"User {user.id} cancelled purchase for content ID {content_oid}.", level='info')
action = db_session.query(UserAction).filter_by(
type='purchase',
content_id=content_oid,
user_id=user.id,
status='requested'
).first()
if not action:
return await query.answer()
action.status = 'canceled'
db_session.commit()
await chat_wrap.send_content(db_session, content, message_id=query.message.message_id)
router.callback_query.register(t_callback_purchase_node_content, F.data.startswith('PC_'))
async def t_inline_query_node_content(query: types.InlineQuery, memory=None, user=None, db_session=None, chat_wrap=None, **extra):
# Logging the received inline query using issuer "InlineSearch"
make_log("InlineSearch", f"Received inline query: '{query.query}'", level='info')
try:
source_args = query.query[1:]
source_args_ext = query.query
args = None
if source_args.startswith('Q'):
license_onchain_address = source_args_ext[1:]
licensed_content = db_session.query(UserContent).filter_by(
onchain_address=license_onchain_address,
).first().content
make_log("InlineSearch", f"Query '{query.query}' is a license query for content ID {licensed_content.id}.", level='info')
args = content.cid.serialize_v2()
else:
args = source_args_ext[1:]
make_log("InlineSearch", f"Query '{query.query}' is a content query with args '{args}'.", level='info')
cid = ContentId.deserialize(args)
content_list = []
content = db_session.query(StoredContent).filter_by(hash=cid.content_hash_b58).first()
content_prod = content.open_content(db_session)
# Get both encrypted and decrypted content objects
encrypted_content = content_prod['encrypted_content']
decrypted_content = content_prod['decrypted_content']
decrypted_content_meta = decrypted_content.json_format()
try:
metadata_content = StoredContent.from_cid(db_session, content.json_format()['metadata_cid'])
with open(metadata_content.filepath, 'r') as f:
metadata_content_json = json.loads(f.read())
except BaseException as e:
# Logging metadata retrieval failure with detailed query information
make_log("InlineSearch", f"Failed to retrieve metadata for query '{query.query}': {e}", level='warning')
return await query.answer(content_list, cache_time=1)
audio_title = metadata_content_json.get('name', "").split(' - ')
title, performer = None, None
if len(audio_title) > 1:
performer = audio_title[0].strip()
audio_title = audio_title[1:]
title = audio_title[0].strip()
result_kwargs = {}
try:
cover_content = StoredContent.from_cid(db_session, decrypted_content_meta.get('cover_cid') or None)
except BaseException as e:
cover_content = None
if cover_content:
result_kwargs['thumb_url'] = cover_content.web_url
content_type_declared = decrypted_content_meta.get('content_type', 'application/x-binary').split('/')[0]
hashtags_str = (' '.join(f"#{_h}" for _h in metadata_content_json.get('hashtags', []))).strip()
if hashtags_str:
hashtags_str = hashtags_str + '\n'
# Upload preview of decrypted content (limit to 30 seconds)
if content_type_declared in ('audio', 'video'):
if not decrypted_content.meta.get('telegram_file_cache_preview'):
try:
# Construct URL for trimmed preview using decrypted content
preview_content = db_session.query(StoredContent).filter_by(
hash=content.meta.get('converted_content', {}).get('low_preview')
).first()
preview_url = preview_content.web_url
if content_type_declared == 'video':
preview_message = await query.bot.send_video(
chat_id=CACHE_CHAT_ID, # English: Cache chat ID defined in configuration
video=types.URLInputFile(preview_url), # English: Upload video using URL
caption="Preview upload", # English caption
supports_streaming=True
)
preview_file_id = preview_message.video.file_id
else:
preview_message = await query.bot.send_audio(
chat_id=CACHE_CHAT_ID, # English: Cache chat ID defined in configuration
audio=types.URLInputFile(preview_url), # English: Upload audio using URL
caption="Preview upload", # English caption
title=title,
performer=performer,
thumbnail=result_kwargs.get('thumb_url'),
)
preview_file_id = preview_message.audio.file_id
# Save the preview file_id in decrypted_content.meta for future use
decrypted_content.meta = {
**decrypted_content.meta,
'telegram_file_cache_preview': preview_file_id
}
db_session.commit()
except Exception as e:
# Logging error during preview upload with detailed content type and query information
make_log("InlineSearch", f"Error uploading preview for content type '{content_type_declared}' during inline query '{query.query}': {e}", level='error')
content_share_link = {
'text': user.translated('p_shareLinkContext').format(title=' '.join(audio_title)),
'url': f"https://t.me/{CLIENT_TELEGRAM_BOT_USERNAME}/content?startapp={source_args}"
}
# Create inline query result using decrypted content's file_id
if content_type_declared == 'audio':
content_list.append(
types.InlineQueryResultCachedAudio(
id=f"NC_{content.id}_{int(datetime.now().timestamp() // 60)}",
audio_file_id=decrypted_content.meta['telegram_file_cache_preview'],
caption=hashtags_str + user.translated('p_playerContext_preview'),
parse_mode='html',
reply_markup=get_inline_keyboard([
[
{
'text': {
'audio': user.translated('shareTrack_button'),
'video': user.translated('shareVideo_button'),
}[content_type_declared],
'switch_inline_query': source_args_ext
},
{
'text': user.translated('shareLink_button'),
'url': f"https://t.me/share/url?text={urllib.parse.quote(content_share_link['text'])}&url={urllib.parse.quote(content_share_link['url'])}"
}
],
[{
'text': user.translated('viewTrack_button'),
'url': f"https://t.me/MY_Web3Bot/content?startapp={source_args}"
}]
]),
**result_kwargs
)
)
elif content_type_declared == 'video':
content_list.append(
types.InlineQueryResultCachedVideo(
id=f"NC_{content.id}_{int(datetime.now().timestamp() // 60)}",
video_file_id=decrypted_content.meta['telegram_file_cache_preview'],
title=title,
caption=hashtags_str + user.translated('p_playerContext_preview'),
parse_mode='html',
reply_markup=get_inline_keyboard([
[
{
'text': user.translated('shareVideo_button'),
'switch_inline_query': f"C{content.cid.serialize_v2()}"
},
{
'text': user.translated('shareLink_button'),
'url': f"https://t.me/share/url?text={urllib.parse.quote(content_share_link['text'])}&url={urllib.parse.quote(content_share_link['url'])}"
}
],
[{
'text': user.translated('viewTrack_button'),
'url': f"https://t.me/MY_Web3Bot/content?startapp={source_args}"
}]
]),
**result_kwargs
)
)
# Logging the final inline query result count with detailed explanation
make_log("InlineSearch", f"Processed inline query '{query.query}'. Found {len(content_list)} inline result(s) based on query parsing and content retrieval.", level='info')
return await query.answer(content_list, cache_time=1)
except BaseException as e:
# Logging exception during inline query processing with detailed query information
make_log("InlineSearch", f"Error processing inline query '{query.query}': {e}", level='error')
traceback.print_exc()
return await query.answer([], cache_time=1)
async def t_chosen_inline_result_node_content(query: types.ChosenInlineResult, memory=None, user=None, db_session=None, chat_wrap=None, **extra):
# Logging chosen inline result with detailed user and result ID
make_log("ChosenResult", f"User {user.id} selected inline result with ID '{query.result_id}'.", level='info')
# return await query.answer([])
router.inline_query.register(t_inline_query_node_content, F.query.startswith('C'))
router.inline_query.register(t_inline_query_node_content, F.query.startswith('Q'))
router.chosen_inline_result.register(t_chosen_inline_result_node_content, lambda: True)