from datetime import datetime, timedelta from sanic import response from aiogram import Bot, types from sqlalchemy import and_ from app.core.logger import make_log from app.core.models.node_storage import StoredContent from app.core.models.keys import KnownKey from app.core.models import StarsInvoice from app.core.models.content.user_content import UserContent from app.core._config import CLIENT_TELEGRAM_API_KEY, PROJECT_HOST import json import uuid async def s_api_v1_content_list(request): offset = int(request.args.get('offset', 0)) limit = int(request.args.get('limit', 100)) assert 0 <= offset, "Invalid offset" assert 0 < limit <= 1000, "Invalid limit" store = request.args.get('store', 'local') assert store in ('local', 'onchain'), "Invalid store" content_list = request.ctx.db_session.query(StoredContent).filter( StoredContent.type.like(store + '%'), StoredContent.disabled == False ).order_by(StoredContent.created.desc()).offset(offset).limit(limit) make_log("Content", f"Listed {content_list.count()} contents", level='info') result = {} for content in content_list.all(): content_json = content.json_format() result[content_json["cid"]] = content_json return response.json(result) async def s_api_v1_content_view(request, content_address: str): # content_address can be CID or TON address r_content = StoredContent.from_cid(request.ctx.db_session, content_address) content = r_content.open_content(request.ctx.db_session) opts = { 'content_type': content['content_type'], # возможно с ошибками, нужно переделать на ffprobe 'content_address': content['encrypted_content'].meta.get('item_address', '') } if content['encrypted_content'].key_id: known_key = request.ctx.db_session.query(KnownKey).filter( KnownKey.id == content['encrypted_content'].key_id ).first() if known_key: opts['key_hash'] = known_key.seed_hash # нахер не нужно на данный момент # чисто болванки, заполнение дальше opts['have_licenses'] = [] opts['invoice'] = None have_access = False if request.ctx.user: user_wallet_address = request.ctx.user.wallet_address(request.ctx.db_session) have_access = ( (content['encrypted_content'].owner_address == user_wallet_address) or bool(request.ctx.db_session.query(UserContent).filter_by(owner_address=user_wallet_address, status='active', content_id=content['encrypted_content'].id).first()) \ or bool(request.ctx.db_session.query(UserContent).filter( and_( StarsInvoice.user_id == request.ctx.user.id, StarsInvoice.content_hash == content['encrypted_content'].hash, StarsInvoice.paid == True ) )) ) if not have_access: stars_cost = 1 # TODO: как считать стоимость в звездах? exist_invoice = request.ctx.db_session.query(StarsInvoice).filter( and_( StarsInvoice.user_id == request.ctx.user.id, StarsInvoice.created > datetime.now() - timedelta(minutes=25), ) ).first() if exist_invoice: invoice_url = exist_invoice.invoice_url else: invoice_id = f"access_{uuid.uuid4().hex}" try: invoice_url = await Bot(token=CLIENT_TELEGRAM_API_KEY).create_invoice_link( 'Lifetime access to content', 'You will receive NFT with lifetime access to content', invoice_id, "", "XTR", [ types.LabeledPrice(label='Lifetime access', amount=stars_cost), ], ) request.ctx.db_session.add( StarsInvoice( external_id=invoice_id, type='access', amount=stars_cost, user_id=request.ctx.user.id, content_hash=content['encrypted_content'].hash, invoice_url=invoice_url ) ) request.ctx.db_session.commit() except BaseException as e: make_log("Content", f"Can't create invoice link: {e}", level='warning') if invoice_url: opts['invoice'] = { 'url': invoice_url, 'amount': stars_cost, } display_options = { 'content_url': None, } if have_access: opts['have_licenses'].append('listen') converted_content = content['encrypted_content'].meta.get('converted_content') if converted_content: user_content_option = 'low_preview' if have_access: user_content_option = 'low' # TODO: подключать high если человек внезапно меломан converted_content = request.ctx.db_session.query(StoredContent).filter( StoredContent.hash == converted_content[user_content_option] ).first() if converted_content: display_options['content_url'] = converted_content.web_url content_meta = content['encrypted_content'].json_format() content_metadata = StoredContent.from_cid(request.ctx.db_session, content_meta.get('metadata_cid') or None) with open(content_metadata.filepath, 'r') as f: content_metadata_json = json.loads(f.read()) display_options['metadata'] = content_metadata_json return response.json({ **opts, 'encrypted': content['encrypted_content'].json_format(), 'display_options': display_options }) async def s_api_v1_content_friendly_list(request): # return html table with content list. bootstrap is used result = """ """ for content in request.ctx.db_session.query(StoredContent).filter( StoredContent.type == 'onchain/content' ).all(): if not content.meta.get('metadata_cid'): make_log("Content", f"Content {content.cid.serialize_v2()} has no metadata", level='warning') continue metadata_content = StoredContent.from_cid(request.ctx.db_session, content.meta.get('metadata_cid')) with open(metadata_content.filepath, 'r') as f: metadata = json.loads(f.read()) preview_link = None if content.meta.get('converted_content'): preview_link = f"{PROJECT_HOST}/api/v1.5/storage/{content.meta['converted_content']['low_preview']}" result += f""" """ result += """
CID Title Onchain Preview link
{content.cid.serialize_v2()} {metadata.get('name', "")} {content.meta.get('item_address')} """ + (f'Preview' if preview_link else "not ready") + """
""" return response.html(result) async def s_api_v1_5_content_list(request): # Validate offset and limit parameters offset = int(request.args.get('offset', 0)) limit = int(request.args.get('limit', 100)) if offset < 0: return response.json({'error': 'Invalid offset'}, status=400) if limit <= 0 or limit > 1000: return response.json({'error': 'Invalid limit'}, status=400) # Query onchain contents which are not disabled contents = request.ctx.db_session.query(StoredContent).filter( StoredContent.type == 'onchain/content', StoredContent.disabled == False ).order_by(StoredContent.created.desc()).offset(offset).limit(limit).all() result = [] for content in contents: # Retrieve metadata content using metadata_cid from content.meta metadata_cid = content.meta.get('metadata_cid') if not metadata_cid: continue # Skip if no metadata_cid is found metadata_content = StoredContent.from_cid(request.ctx.db_session, metadata_cid) try: with open(metadata_content.filepath, 'r') as f: metadata = json.load(f) except Exception as e: metadata = {} media_type = 'audio' # Get title from metadata (key 'name') title = metadata.get('name', '') # Build preview link if converted_content exists and contains 'low_preview' preview_link = None converted_content = content.meta.get('converted_content') if converted_content: converted_content = request.ctx.db_session.query(StoredContent).filter( StoredContent.hash == converted_content['low_preview'] ).first() preview_link = converted_content.web_url if converted_content.filename.split('.')[-1] in ('mp4', 'mov'): media_type = 'video' else: preview_link = None # Get onchain address from content.meta onchain_address = content.meta.get('item_address', '') result.append({ 'cid': content.cid.serialize_v2(), 'onchain_address': onchain_address, 'type': media_type, 'title': title, 'preview_link': preview_link, 'created_at': content.created.isoformat() # ISO 8601 format for datetime }) return response.json(result)