uploader-bot/app/api/routes/content.py

652 lines
28 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
from datetime import datetime, timedelta
from sanic import response
from sqlalchemy import select, and_, func, or_
from aiogram import Bot, types
from sqlalchemy import and_
from app.core.logger import make_log
from app.core.models._config import ServiceConfig
from app.core.models.node_storage import StoredContent
from app.core.models.keys import KnownKey
from app.core.models import StarsInvoice
from app.core.models.content.user_content import UserContent
from app.core._config import CLIENT_TELEGRAM_API_KEY, CLIENT_TELEGRAM_BOT_USERNAME, PROJECT_HOST
from app.core.models.content_v3 import EncryptedContent as ECv3, ContentDerivative as CDv3, UploadSession
from app.core.content.content_id import ContentId
from app.core.network.dht import MetricsAggregator
import os
import json
import time
import uuid
async def s_api_v1_content_list(request):
offset = int(request.args.get('offset', 0))
limit = int(request.args.get('limit', 100))
assert 0 <= offset, "Invalid offset"
assert 0 < limit <= 1000, "Invalid limit"
store = request.args.get('store', 'local')
assert store in ('local', 'onchain'), "Invalid store"
stmt = (
select(StoredContent)
.where(
StoredContent.type.like(store + '%'),
StoredContent.disabled.is_(None)
)
.order_by(StoredContent.created.desc())
.offset(offset)
.limit(limit)
)
rows = (await request.ctx.db_session.execute(stmt)).scalars().all()
make_log("Content", f"Listed {len(rows)} contents", level='info')
result = {}
for content in rows:
content_json = content.json_format()
result[content_json["cid"]] = content_json
return response.json(result)
async def s_api_v1_content_view(request, content_address: str):
# content_address can be CID or TON address
license_exist = (await request.ctx.db_session.execute(
select(UserContent).where(UserContent.onchain_address == content_address)
)).scalars().first()
license_address = None
if license_exist:
license_address = license_exist.onchain_address
if license_exist.content_id:
linked_content = (await request.ctx.db_session.execute(
select(StoredContent).where(StoredContent.id == license_exist.content_id)
)).scalars().first()
if linked_content:
content_address = linked_content.cid.serialize_v2()
from app.core.content.content_id import ContentId
cid = ContentId.deserialize(content_address)
r_content = (await request.ctx.db_session.execute(
select(StoredContent).where(StoredContent.hash == cid.content_hash_b58)
)).scalars().first()
async def open_content_async(session, sc: StoredContent):
if not sc.encrypted:
decrypted = sc
encrypted = (await session.execute(select(StoredContent).where(StoredContent.decrypted_content_id == sc.id))).scalars().first()
else:
encrypted = sc
decrypted = (await session.execute(select(StoredContent).where(StoredContent.id == sc.decrypted_content_id))).scalars().first()
assert decrypted and encrypted, "Can't open content"
ctype = decrypted.json_format().get('content_type', 'application/x-binary')
try:
content_type = ctype.split('/')[0]
except Exception:
content_type = 'application'
return {
'encrypted_content': encrypted,
'decrypted_content': decrypted,
'content_type': content_type,
'content_mime': ctype,
}
try:
content = await open_content_async(request.ctx.db_session, r_content)
except AssertionError:
# Fallback: handle plain stored content without encrypted/decrypted pairing
sc = r_content
from mimetypes import guess_type as _guess
_mime, _ = _guess(sc.filename or '')
_mime = _mime or 'application/octet-stream'
try:
_ctype = _mime.split('/')[0]
except Exception:
_ctype = 'application'
content = {
'encrypted_content': sc,
'decrypted_content': sc,
'content_type': _ctype,
'content_mime': _mime,
}
master_address = content['encrypted_content'].meta.get('item_address', '')
opts = {
'content_type': content['content_type'], # возможно с ошибками, нужно переделать на ffprobe
'content_mime': content.get('content_mime'),
'content_address': license_address or master_address,
'license_address': license_address,
'master_address': master_address,
}
if content['encrypted_content'].key_id:
known_key = (await request.ctx.db_session.execute(
select(KnownKey).where(KnownKey.id == content['encrypted_content'].key_id)
)).scalars().first()
if known_key:
opts['key_hash'] = known_key.seed_hash # нахер не нужно на данный момент
# чисто болванки, заполнение дальше
opts['have_licenses'] = []
opts['invoice'] = None
have_access = False
if request.ctx.user:
user_wallet_address = await request.ctx.user.wallet_address_async(request.ctx.db_session)
user_telegram_id = getattr(request.ctx.user, 'telegram_id', None)
or_clauses = [StarsInvoice.user_id == request.ctx.user.id]
if user_telegram_id is not None:
or_clauses.append(StarsInvoice.telegram_id == user_telegram_id)
stars_access = False
if or_clauses:
stars_access = bool((await request.ctx.db_session.execute(select(StarsInvoice).where(
and_(
StarsInvoice.content_hash == content['encrypted_content'].hash,
StarsInvoice.paid.is_(True),
or_(*or_clauses)
)
))).scalars().first())
have_access = (
(content['encrypted_content'].owner_address == user_wallet_address)
or bool((await request.ctx.db_session.execute(select(UserContent).where(
and_(UserContent.owner_address == user_wallet_address, UserContent.status == 'active', UserContent.content_id == content['encrypted_content'].id)
))).scalars().first())
or stars_access
)
if not have_access:
current_star_rate = (await ServiceConfig(request.ctx.db_session).get('live_tonPerStar', [0, 0]))[0]
if current_star_rate < 0:
current_star_rate = 0.00000001
stars_cost = int(int(content['encrypted_content'].meta['license']['resale']['price']) / 1e9 / current_star_rate * 1.2)
if getattr(request.ctx.user, 'is_admin', False):
stars_cost = 2
else:
stars_cost = int(int(content['encrypted_content'].meta['license']['resale']['price']) / 1e9 / current_star_rate * 1.2)
invoice_id = f"access_{uuid.uuid4().hex}"
exist_invoice = (await request.ctx.db_session.execute(select(StarsInvoice).where(
and_(
StarsInvoice.user_id == request.ctx.user.id,
StarsInvoice.created > datetime.now() - timedelta(minutes=25),
StarsInvoice.amount == stars_cost,
StarsInvoice.content_hash == content['encrypted_content'].hash,
)
))).scalars().first()
if exist_invoice:
invoice_url = exist_invoice.invoice_url
else:
invoice_url = None
try:
invoice_url = await Bot(token=CLIENT_TELEGRAM_API_KEY).create_invoice_link(
'Неограниченный доступ к контенту',
'Неограниченный доступ к контенту',
invoice_id, "XTR",
[
types.LabeledPrice(label='Lifetime access', amount=stars_cost),
], provider_token = ''
)
request.ctx.db_session.add(
StarsInvoice(
external_id=invoice_id,
type='access',
amount=stars_cost,
user_id=request.ctx.user.id,
content_hash=content['encrypted_content'].hash,
invoice_url=invoice_url,
telegram_id=getattr(request.ctx.user, 'telegram_id', None),
bot_username=CLIENT_TELEGRAM_BOT_USERNAME,
)
)
await request.ctx.db_session.commit()
except BaseException as e:
make_log("Content", f"Can't create invoice link: {e}", level='warning')
if invoice_url:
opts['invoice'] = {
'url': invoice_url,
'amount': stars_cost,
}
display_options = {
'content_url': None,
'content_kind': None,
'has_preview': False,
'original_available': False,
'requires_license': False,
}
if have_access:
opts['have_licenses'].append('listen')
encrypted_json = content['encrypted_content'].json_format()
decrypted_json = content['decrypted_content'].json_format()
enc_cid = encrypted_json.get('content_cid') or encrypted_json.get('encrypted_cid')
ec_v3 = None
derivative_rows = []
if enc_cid:
ec_v3 = (await request.ctx.db_session.execute(select(ECv3).where(ECv3.encrypted_cid == enc_cid))).scalars().first()
if ec_v3:
derivative_rows = (await request.ctx.db_session.execute(select(CDv3).where(CDv3.content_id == ec_v3.id))).scalars().all()
upload_row = None
if enc_cid:
upload_row = (await request.ctx.db_session.execute(select(UploadSession).where(UploadSession.encrypted_cid == enc_cid))).scalars().first()
converted_meta_map = dict(content['encrypted_content'].meta.get('converted_content') or {})
content_mime = (
(ec_v3.content_type if ec_v3 and ec_v3.content_type else None)
or decrypted_json.get('content_type')
or encrypted_json.get('content_type')
or opts.get('content_mime')
or 'application/octet-stream'
)
# Fallback: if stored content reports generic application/*, try guess by filename
try:
if content_mime.startswith('application/'):
from mimetypes import guess_type as _guess
_fn = decrypted_json.get('filename') or encrypted_json.get('filename') or ''
_gm, _ = _guess(_fn)
if _gm:
content_mime = _gm
except Exception:
pass
opts['content_mime'] = content_mime
try:
opts['content_type'] = content_mime.split('/')[0]
except Exception:
opts['content_type'] = opts.get('content_type') or 'application'
content_kind = 'audio'
if content_mime.startswith('video/'):
content_kind = 'video'
elif content_mime.startswith('audio/'):
content_kind = 'audio'
else:
content_kind = 'binary'
display_options['content_kind'] = content_kind
display_options['requires_license'] = (not have_access) and content_kind == 'binary'
derivative_latest = {}
if derivative_rows:
derivative_sorted = sorted(derivative_rows, key=lambda row: row.created_at or datetime.min)
for row in derivative_sorted:
derivative_latest[row.kind] = row
def _row_to_hash_and_url(row):
if not row or not row.local_path:
return None, None
file_hash = row.local_path.split('/')[-1]
return file_hash, f"{PROJECT_HOST}/api/v1/storage.proxy/{file_hash}"
has_preview = bool(derivative_latest.get('decrypted_preview') or converted_meta_map.get('low_preview'))
display_options['has_preview'] = has_preview
display_options['original_available'] = bool(derivative_latest.get('decrypted_original') or converted_meta_map.get('original'))
chosen_row = None
if content_kind == 'binary':
if have_access and 'decrypted_original' in derivative_latest:
chosen_row = derivative_latest['decrypted_original']
elif have_access:
for key in ('decrypted_low', 'decrypted_high'):
if key in derivative_latest:
chosen_row = derivative_latest[key]
break
else:
for key in ('decrypted_preview', 'decrypted_low'):
if key in derivative_latest:
chosen_row = derivative_latest[key]
break
def _make_token_for(hash_value: str, scope: str, user_id: int | None) -> str:
try:
from app.core._crypto.signer import Signer
from app.core._secrets import hot_seed, hot_pubkey
from app.core._utils.b58 import b58encode as _b58e
signer = Signer(hot_seed)
# Media URLs are polled very frequently by the web client (e.g. every 5s).
# If we generate a new exp for every request, the signed URL changes every poll,
# forcing the player to reload and breaking continuous streaming.
#
# To keep URLs stable while still expiring tokens, we "bucket" exp time.
# Default behavior keeps tokens stable for ~10 minutes; can be tuned via env.
ttl_sec = int(os.getenv("STORAGE_PROXY_TOKEN_TTL_SEC", "600"))
bucket_sec = int(os.getenv("STORAGE_PROXY_TOKEN_BUCKET_SEC", str(ttl_sec)))
ttl_sec = max(1, ttl_sec)
bucket_sec = max(1, bucket_sec)
now = int(time.time())
exp_base = now + ttl_sec
# Always move to the next bucket boundary so the token doesn't flip immediately
# after a boundary due to rounding edge cases.
exp = ((exp_base // bucket_sec) + 1) * bucket_sec
uid = int(user_id or 0)
payload = {'hash': hash_value, 'scope': scope, 'exp': exp, 'uid': uid}
blob = json.dumps(payload, sort_keys=True, separators=(",", ":")).encode()
sig = signer.sign(blob)
pub = _b58e(hot_pubkey).decode()
return f"pub={pub}&exp={exp}&scope={scope}&uid={uid}&sig={sig}"
except Exception:
return ""
if chosen_row:
file_hash, url = _row_to_hash_and_url(chosen_row)
if url:
token = _make_token_for(file_hash or '', 'full' if have_access else 'preview', getattr(request.ctx.user, 'id', None))
display_options['content_url'] = f"{url}?{token}" if token else url
ext_candidate = None
if chosen_row.content_type:
ext_candidate = chosen_row.content_type.split('/')[-1]
elif '/' in content_mime:
ext_candidate = content_mime.split('/')[-1]
if ext_candidate:
opts['content_ext'] = ext_candidate
if content_kind == 'binary':
display_options['original_available'] = True
converted_meta_map.setdefault('original', file_hash)
elif have_access:
converted_meta_map.setdefault('low', file_hash)
else:
converted_meta_map.setdefault('low_preview', file_hash)
if not display_options['content_url'] and converted_meta_map:
if content_kind == 'binary':
preference = ['original'] if have_access else []
else:
preference = ['low', 'high', 'low_preview'] if have_access else ['low_preview', 'low', 'high']
for key in preference:
hash_value = converted_meta_map.get(key)
if not hash_value:
continue
# Пробуем сразу через прокси (даже если локальной записи нет)
token = _make_token_for(hash_value, 'full' if have_access else 'preview', getattr(request.ctx.user, 'id', None))
display_options['content_url'] = f"{PROJECT_HOST}/api/v1/storage.proxy/{hash_value}?{token}" if token else f"{PROJECT_HOST}/api/v1/storage.proxy/{hash_value}"
if '/' in content_mime:
opts['content_ext'] = content_mime.split('/')[-1]
if content_kind == 'binary':
display_options['original_available'] = True
break
# Final fallback: no derivatives known — serve stored content directly for AV
if not display_options['content_url'] and content_kind in ('audio', 'video'):
from app.core._utils.b58 import b58encode as _b58e
scid = decrypted_json.get('cid') or encrypted_json.get('cid')
try:
from app.core.content.content_id import ContentId as _CID
if scid:
_cid = _CID.deserialize(scid)
h = _cid.content_hash_b58
else:
h = decrypted_json.get('hash')
except Exception:
h = decrypted_json.get('hash')
if h:
token = _make_token_for(h, 'preview' if not have_access else 'full', getattr(request.ctx.user, 'id', None))
display_options['content_url'] = f"{PROJECT_HOST}/api/v1/storage.proxy/{h}?{token}" if token else f"{PROJECT_HOST}/api/v1/storage.proxy/{h}"
# Metadata fallback
content_meta = encrypted_json
content_metadata_json = None
_mcid = content_meta.get('metadata_cid') or None
if _mcid:
_cid = ContentId.deserialize(_mcid)
content_metadata = (await request.ctx.db_session.execute(select(StoredContent).where(StoredContent.hash == _cid.content_hash_b58))).scalars().first()
if content_metadata:
try:
with open(content_metadata.filepath, 'r') as f:
content_metadata_json = json.loads(f.read())
except Exception as exc:
make_log("Content", f"Can't read metadata file: {exc}", level='warning')
if not content_metadata_json:
fallback_name = (ec_v3.title if ec_v3 else None) or content_meta.get('title') or content_meta.get('cid')
fallback_description = (ec_v3.description if ec_v3 else '') or ''
content_metadata_json = {
'name': fallback_name or 'Без названия',
'description': fallback_description,
'downloadable': False,
}
cover_cid = content_meta.get('cover_cid')
if cover_cid:
token = _make_token_for(cover_cid, 'preview', getattr(request.ctx.user, 'id', None))
content_metadata_json.setdefault('image', f"{PROJECT_HOST}/api/v1/storage.proxy/{cover_cid}?{token}" if token else f"{PROJECT_HOST}/api/v1/storage.proxy/{cover_cid}")
display_options['metadata'] = content_metadata_json
opts['downloadable'] = content_metadata_json.get('downloadable', False)
if opts['downloadable'] and 'listen' not in opts['have_licenses']:
opts['downloadable'] = False
# Conversion status summary
conversion_summary = {}
conversion_details = []
derivative_summary_map = {}
for row in derivative_latest.values():
conversion_summary[row.status] = conversion_summary.get(row.status, 0) + 1
derivative_summary_map[row.kind] = row
conversion_details.append({
'kind': row.kind,
'status': row.status,
'size_bytes': row.size_bytes,
'content_type': row.content_type,
'error': row.error,
'updated_at': (row.last_access_at or row.created_at).isoformat() + 'Z' if (row.last_access_at or row.created_at) else None,
})
required_kinds = set()
if content_kind == 'binary':
if derivative_latest.get('decrypted_original') or converted_meta_map.get('original'):
required_kinds.add('decrypted_original')
else:
required_kinds = {'decrypted_low', 'decrypted_high'}
if ec_v3 and ec_v3.content_type and ec_v3.content_type.startswith('video/'):
required_kinds.add('decrypted_preview')
statuses_by_kind = {kind: row.status for kind, row in derivative_summary_map.items() if kind in required_kinds}
conversion_state = 'pending'
if required_kinds and all(statuses_by_kind.get(kind) == 'ready' for kind in required_kinds):
conversion_state = 'ready'
elif any(statuses_by_kind.get(kind) == 'failed' for kind in required_kinds):
conversion_state = 'failed'
elif any(statuses_by_kind.get(kind) in ('processing', 'pending') for kind in required_kinds):
conversion_state = 'processing'
elif statuses_by_kind:
conversion_state = 'partial'
if display_options['content_url']:
conversion_state = 'ready'
upload_info = None
if upload_row:
upload_info = {
'id': upload_row.id,
'state': upload_row.state,
'error': upload_row.error,
'created_at': upload_row.created_at.isoformat() + 'Z' if upload_row.created_at else None,
'updated_at': upload_row.updated_at.isoformat() + 'Z' if upload_row.updated_at else None,
}
upload_state = upload_row.state if upload_row else None
if conversion_state == 'failed' or upload_state in ('failed', 'conversion_failed'):
final_state = 'failed'
elif conversion_state == 'ready':
final_state = 'ready'
elif conversion_state in ('processing', 'partial') or upload_state in ('processing', 'pinned'):
final_state = 'processing'
else:
final_state = 'uploaded'
conversion_info = {
'state': conversion_state,
'summary': conversion_summary,
'details': conversion_details,
'required_kinds': list(required_kinds),
}
opts['conversion'] = conversion_info
opts['upload'] = upload_info
opts['status'] = {
'state': final_state,
'conversion_state': conversion_state,
'upload_state': upload_info['state'] if upload_info else None,
'has_access': have_access,
}
if not opts.get('content_ext') and '/' in content_mime:
opts['content_ext'] = content_mime.split('/')[-1]
metrics_mgr: MetricsAggregator | None = getattr(request.app.ctx.memory, "metrics", None)
if metrics_mgr:
viewer_salt_raw = request.headers.get("X-View-Salt")
if viewer_salt_raw:
try:
viewer_salt = bytes.fromhex(viewer_salt_raw)
except ValueError:
viewer_salt = viewer_salt_raw.encode()
elif request.ctx.user:
viewer_salt = f"user:{request.ctx.user.id}".encode()
else:
viewer_salt = (request.remote_addr or request.ip or "anonymous").encode()
try:
watch_time_param = int(request.args.get("watch_time", 0))
except (TypeError, ValueError):
watch_time_param = 0
try:
bytes_out_param = int(request.args.get("bytes_out", 0))
except (TypeError, ValueError):
bytes_out_param = 0
completed_param = request.args.get("completed", "0") in ("1", "true", "True")
metrics_mgr.record_view(
content_id=content['encrypted_content'].hash,
viewer_salt=viewer_salt,
watch_time=watch_time_param,
bytes_out=bytes_out_param,
completed=completed_param,
)
return response.json({
**opts,
'encrypted': content['encrypted_content'].json_format(),
'display_options': display_options,
})
async def s_api_v1_content_friendly_list(request):
# return html table with content list. bootstrap is used
result = """
<html>
<head>
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.3/dist/css/bootstrap.min.css" rel="stylesheet" integrity="sha384-QWTKZyjpPEjISv5WaRU9OFeRpok6YctnYmDr5pNlyT2bRjXh0JMhjY6hW+ALEwIH" crossorigin="anonymous">
<script src="https://cdn.jsdelivr.net/npm/bootstrap@5.3.3/dist/js/bootstrap.bundle.min.js" integrity="sha384-YvpcrYf0tY3lHB60NNkmXc5s9fDVZLESaAA55NDzOxhy9GkcIdslK1eN7N6jIeHz" crossorigin="anonymous"></script>
</head>
<body>
<table class="table table-striped">
<thead>
<tr>
<th>CID</th>
<th>Title</th>
<th>Onchain</th>
<th>Preview link</th>
</tr>
</thead>
"""
contents = (await request.ctx.db_session.execute(select(StoredContent).where(
StoredContent.type == 'onchain/content'
))).scalars().all()
for content in contents:
if not content.meta.get('metadata_cid'):
make_log("Content", f"Content {content.cid.serialize_v2()} has no metadata", level='warning')
continue
from app.core.content.content_id import ContentId
_cid = ContentId.deserialize(content.meta.get('metadata_cid'))
metadata_content = (await request.ctx.db_session.execute(select(StoredContent).where(StoredContent.hash == _cid.content_hash_b58))).scalars().first()
with open(metadata_content.filepath, 'r') as f:
metadata = json.loads(f.read())
preview_link = None
if content.meta.get('converted_content'):
preview_link = f"{PROJECT_HOST}/api/v1.5/storage/{content.meta['converted_content']['low_preview']}"
result += f"""
<tr>
<td>{content.cid.serialize_v2()}</td>
<td>{metadata.get('name', "")}</td>
<td>{content.meta.get('item_address')}</td>
<td>""" + (f'<a href="{preview_link}">Preview</a>' if preview_link else "not ready") + """</td>
</tr>
"""
result += """
</table>
</body>
</html>
"""
return response.html(result)
async def s_api_v1_5_content_list(request):
# Validate offset and limit parameters
offset = int(request.args.get('offset', 0))
limit = int(request.args.get('limit', 100))
if offset < 0:
return response.json({'error': 'Invalid offset'}, status=400)
if limit <= 0 or limit > 1000:
return response.json({'error': 'Invalid limit'}, status=400)
# Query onchain contents which are not disabled
contents = (await request.ctx.db_session.execute(
select(StoredContent)
.where(StoredContent.type == 'onchain/content', StoredContent.disabled == False)
.order_by(StoredContent.created.desc())
.offset(offset).limit(limit)
)).scalars().all()
result = []
for content in contents:
# Retrieve metadata content using metadata_cid from content.meta
metadata_cid = content.meta.get('metadata_cid')
if not metadata_cid:
continue # Skip if no metadata_cid is found
from app.core.content.content_id import ContentId
_cid = ContentId.deserialize(metadata_cid)
metadata_content = (await request.ctx.db_session.execute(select(StoredContent).where(StoredContent.hash == _cid.content_hash_b58))).scalars().first()
try:
with open(metadata_content.filepath, 'r') as f:
metadata = json.load(f)
except Exception as e:
metadata = {}
media_type = 'audio'
# Get title from metadata (key 'name')
title = metadata.get('name', '')
# Build preview link if converted_content exists and contains 'low_preview'
preview_link = None
converted_content = content.meta.get('converted_content')
if converted_content:
converted_content = (await request.ctx.db_session.execute(select(StoredContent).where(
StoredContent.hash == converted_content['low_preview']
))).scalars().first()
preview_link = converted_content.web_url
if converted_content.filename.split('.')[-1] in ('mp4', 'mov'):
media_type = 'video'
else:
preview_link = None
# Get onchain address from content.meta
onchain_address = content.meta.get('item_address', '')
result.append({
'cid': content.cid.serialize_v2(),
'onchain_address': onchain_address,
'type': media_type,
'title': title,
'preview_link': preview_link,
'created_at': content.created.isoformat() # ISO 8601 format for datetime
})
return response.json(result)