uploader-bot/app/api/routes/content.py

320 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from datetime import datetime, timedelta
from sanic import response
from sqlalchemy import select, and_, func
from aiogram import Bot, types
from sqlalchemy import and_
from app.core.logger import make_log
from app.core.models._config import ServiceConfig
from app.core.models.node_storage import StoredContent
from app.core.models.keys import KnownKey
from app.core.models import StarsInvoice
from app.core.models.content.user_content import UserContent
from app.core._config import CLIENT_TELEGRAM_API_KEY, PROJECT_HOST
import json
import uuid
async def s_api_v1_content_list(request):
offset = int(request.args.get('offset', 0))
limit = int(request.args.get('limit', 100))
assert 0 <= offset, "Invalid offset"
assert 0 < limit <= 1000, "Invalid limit"
store = request.args.get('store', 'local')
assert store in ('local', 'onchain'), "Invalid store"
stmt = (
select(StoredContent)
.where(
StoredContent.type.like(store + '%'),
StoredContent.disabled == False
)
.order_by(StoredContent.created.desc())
.offset(offset)
.limit(limit)
)
rows = (await request.ctx.db_session.execute(stmt)).scalars().all()
make_log("Content", f"Listed {len(rows)} contents", level='info')
result = {}
for content in rows:
content_json = content.json_format()
result[content_json["cid"]] = content_json
return response.json(result)
async def s_api_v1_content_view(request, content_address: str):
# content_address can be CID or TON address
license_exist = (await request.ctx.db_session.execute(
select(UserContent).where(UserContent.onchain_address == content_address)
)).scalars().first()
if license_exist:
content_address = license_exist.content.cid.serialize_v2()
from app.core.content.content_id import ContentId
cid = ContentId.deserialize(content_address)
r_content = (await request.ctx.db_session.execute(
select(StoredContent).where(StoredContent.hash == cid.content_hash_b58)
)).scalars().first()
async def open_content_async(session, sc: StoredContent):
if not sc.encrypted:
decrypted = sc
encrypted = (await session.execute(select(StoredContent).where(StoredContent.decrypted_content_id == sc.id))).scalars().first()
else:
encrypted = sc
decrypted = (await session.execute(select(StoredContent).where(StoredContent.id == sc.decrypted_content_id))).scalars().first()
assert decrypted and encrypted, "Can't open content"
ctype = decrypted.json_format().get('content_type', 'application/x-binary')
try:
content_type = ctype.split('/')[0]
except Exception:
content_type = 'application'
return {'encrypted_content': encrypted, 'decrypted_content': decrypted, 'content_type': content_type}
content = await open_content_async(request.ctx.db_session, r_content)
opts = {
'content_type': content['content_type'], # возможно с ошибками, нужно переделать на ffprobe
'content_address': content['encrypted_content'].meta.get('item_address', '')
}
if content['encrypted_content'].key_id:
known_key = (await request.ctx.db_session.execute(
select(KnownKey).where(KnownKey.id == content['encrypted_content'].key_id)
)).scalars().first()
if known_key:
opts['key_hash'] = known_key.seed_hash # нахер не нужно на данный момент
# чисто болванки, заполнение дальше
opts['have_licenses'] = []
opts['invoice'] = None
have_access = False
if request.ctx.user:
user_wallet_address = await request.ctx.user.wallet_address_async(request.ctx.db_session)
have_access = (
(content['encrypted_content'].owner_address == user_wallet_address)
or bool((await request.ctx.db_session.execute(select(UserContent).where(
and_(UserContent.owner_address == user_wallet_address, UserContent.status == 'active', UserContent.content_id == content['encrypted_content'].id)
))).scalars().first()) \
or bool((await request.ctx.db_session.execute(select(StarsInvoice).where(
and_(
StarsInvoice.user_id == request.ctx.user.id,
StarsInvoice.content_hash == content['encrypted_content'].hash,
StarsInvoice.paid == True
)
))).scalars().first())
)
if not have_access:
current_star_rate = (await ServiceConfig(request.ctx.db_session).get('live_tonPerStar', [0, 0]))[0]
if current_star_rate < 0:
current_star_rate = 0.00000001
stars_cost = int(int(content['encrypted_content'].meta['license']['resale']['price']) / 1e9 / current_star_rate * 1.2)
if request.ctx.user.telegram_id in [5587262915, 6861699286]:
stars_cost = 2
invoice_id = f"access_{uuid.uuid4().hex}"
exist_invoice = (await request.ctx.db_session.execute(select(StarsInvoice).where(
and_(
StarsInvoice.user_id == request.ctx.user.id,
StarsInvoice.created > datetime.now() - timedelta(minutes=25),
StarsInvoice.amount == stars_cost,
StarsInvoice.content_hash == content['encrypted_content'].hash,
)
))).scalars().first()
if exist_invoice:
invoice_url = exist_invoice.invoice_url
else:
invoice_url = None
try:
invoice_url = await Bot(token=CLIENT_TELEGRAM_API_KEY).create_invoice_link(
'Неограниченный доступ к контенту',
'Неограниченный доступ к контенту',
invoice_id, "XTR",
[
types.LabeledPrice(label='Lifetime access', amount=stars_cost),
], provider_token = ''
)
request.ctx.db_session.add(
StarsInvoice(
external_id=invoice_id,
type='access',
amount=stars_cost,
user_id=request.ctx.user.id,
content_hash=content['encrypted_content'].hash,
invoice_url=invoice_url
)
)
await request.ctx.db_session.commit()
except BaseException as e:
make_log("Content", f"Can't create invoice link: {e}", level='warning')
if invoice_url:
opts['invoice'] = {
'url': invoice_url,
'amount': stars_cost,
}
display_options = {
'content_url': None,
}
if have_access:
opts['have_licenses'].append('listen')
converted_content = content['encrypted_content'].meta.get('converted_content')
if converted_content:
user_content_option = 'low_preview'
if have_access:
user_content_option = 'low' # TODO: подключать high если человек внезапно меломан
converted_content = (await request.ctx.db_session.execute(select(StoredContent).where(
StoredContent.hash == converted_content[user_content_option]
))).scalars().first()
if converted_content:
display_options['content_url'] = converted_content.web_url
opts['content_ext'] = converted_content.filename.split('.')[-1]
content_meta = content['encrypted_content'].json_format()
from app.core.content.content_id import ContentId
_mcid = content_meta.get('metadata_cid') or None
content_metadata = None
if _mcid:
_cid = ContentId.deserialize(_mcid)
content_metadata = (await request.ctx.db_session.execute(select(StoredContent).where(StoredContent.hash == _cid.content_hash_b58))).scalars().first()
with open(content_metadata.filepath, 'r') as f:
content_metadata_json = json.loads(f.read())
display_options['metadata'] = content_metadata_json
opts['downloadable'] = content_metadata_json.get('downloadable', False)
if opts['downloadable']:
if not ('listen' in opts['have_licenses']):
opts['downloadable'] = False
return response.json({
**opts,
'encrypted': content['encrypted_content'].json_format(),
'display_options': display_options,
})
async def s_api_v1_content_friendly_list(request):
# return html table with content list. bootstrap is used
result = """
<html>
<head>
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.3/dist/css/bootstrap.min.css" rel="stylesheet" integrity="sha384-QWTKZyjpPEjISv5WaRU9OFeRpok6YctnYmDr5pNlyT2bRjXh0JMhjY6hW+ALEwIH" crossorigin="anonymous">
<script src="https://cdn.jsdelivr.net/npm/bootstrap@5.3.3/dist/js/bootstrap.bundle.min.js" integrity="sha384-YvpcrYf0tY3lHB60NNkmXc5s9fDVZLESaAA55NDzOxhy9GkcIdslK1eN7N6jIeHz" crossorigin="anonymous"></script>
</head>
<body>
<table class="table table-striped">
<thead>
<tr>
<th>CID</th>
<th>Title</th>
<th>Onchain</th>
<th>Preview link</th>
</tr>
</thead>
"""
contents = (await request.ctx.db_session.execute(select(StoredContent).where(
StoredContent.type == 'onchain/content'
))).scalars().all()
for content in contents:
if not content.meta.get('metadata_cid'):
make_log("Content", f"Content {content.cid.serialize_v2()} has no metadata", level='warning')
continue
from app.core.content.content_id import ContentId
_cid = ContentId.deserialize(content.meta.get('metadata_cid'))
metadata_content = (await request.ctx.db_session.execute(select(StoredContent).where(StoredContent.hash == _cid.content_hash_b58))).scalars().first()
with open(metadata_content.filepath, 'r') as f:
metadata = json.loads(f.read())
preview_link = None
if content.meta.get('converted_content'):
preview_link = f"{PROJECT_HOST}/api/v1.5/storage/{content.meta['converted_content']['low_preview']}"
result += f"""
<tr>
<td>{content.cid.serialize_v2()}</td>
<td>{metadata.get('name', "")}</td>
<td>{content.meta.get('item_address')}</td>
<td>""" + (f'<a href="{preview_link}">Preview</a>' if preview_link else "not ready") + """</td>
</tr>
"""
result += """
</table>
</body>
</html>
"""
return response.html(result)
async def s_api_v1_5_content_list(request):
# Validate offset and limit parameters
offset = int(request.args.get('offset', 0))
limit = int(request.args.get('limit', 100))
if offset < 0:
return response.json({'error': 'Invalid offset'}, status=400)
if limit <= 0 or limit > 1000:
return response.json({'error': 'Invalid limit'}, status=400)
# Query onchain contents which are not disabled
contents = (await request.ctx.db_session.execute(
select(StoredContent)
.where(StoredContent.type == 'onchain/content', StoredContent.disabled == False)
.order_by(StoredContent.created.desc())
.offset(offset).limit(limit)
)).scalars().all()
result = []
for content in contents:
# Retrieve metadata content using metadata_cid from content.meta
metadata_cid = content.meta.get('metadata_cid')
if not metadata_cid:
continue # Skip if no metadata_cid is found
from app.core.content.content_id import ContentId
_cid = ContentId.deserialize(metadata_cid)
metadata_content = (await request.ctx.db_session.execute(select(StoredContent).where(StoredContent.hash == _cid.content_hash_b58))).scalars().first()
try:
with open(metadata_content.filepath, 'r') as f:
metadata = json.load(f)
except Exception as e:
metadata = {}
media_type = 'audio'
# Get title from metadata (key 'name')
title = metadata.get('name', '')
# Build preview link if converted_content exists and contains 'low_preview'
preview_link = None
converted_content = content.meta.get('converted_content')
if converted_content:
converted_content = (await request.ctx.db_session.execute(select(StoredContent).where(
StoredContent.hash == converted_content['low_preview']
))).scalars().first()
preview_link = converted_content.web_url
if converted_content.filename.split('.')[-1] in ('mp4', 'mov'):
media_type = 'video'
else:
preview_link = None
# Get onchain address from content.meta
onchain_address = content.meta.get('item_address', '')
result.append({
'cid': content.cid.serialize_v2(),
'onchain_address': onchain_address,
'type': media_type,
'title': title,
'preview_link': preview_link,
'created_at': content.created.isoformat() # ISO 8601 format for datetime
})
return response.json(result)