uploader-bot/app/bot/routers/content.py

250 lines
9.6 KiB
Python

import base58
from aiogram import types, Router, F
from collections import defaultdict
from datetime import datetime
from typing import Optional
from app.core._config import WEB_APP_URLS
from app.core._keyboards import get_inline_keyboard
from app.core._utils.tg_process_template import tg_process_template
from app.core.logger import make_log
from app.core.models.node_storage import StoredContent
from app.core.models.content_v3 import UploadSession, EncryptedContent, ContentDerivative
from sqlalchemy import select, and_, or_
import json
router = Router()
def chunks(lst, n):
"""Yield successive n-sized chunks from lst."""
for i in range(0, len(lst), n):
yield lst[i:i + n]
async def _compute_content_status(db_session, encrypted_cid: Optional[str], fallback_content_type: Optional[str] = None):
if not encrypted_cid:
return {
'final_state': 'uploaded',
'conversion_state': 'pending',
'upload_state': None,
'summary': {},
'details': [],
'title': None,
'content_type': fallback_content_type,
}
ec = (await db_session.execute(select(EncryptedContent).where(EncryptedContent.encrypted_cid == encrypted_cid))).scalars().first()
content_type = fallback_content_type or (ec.content_type if ec else None) or 'application/octet-stream'
derivative_rows = []
if ec:
derivative_rows = (await db_session.execute(select(ContentDerivative).where(ContentDerivative.content_id == ec.id))).scalars().all()
upload_row = (await db_session.execute(select(UploadSession).where(UploadSession.encrypted_cid == encrypted_cid))).scalars().first()
derivative_sorted = sorted(derivative_rows, key=lambda row: row.created_at or datetime.min)
derivative_latest = {}
summary = defaultdict(int)
details = []
for row in derivative_sorted:
derivative_latest[row.kind] = row
for kind, row in derivative_latest.items():
summary[row.status] += 1
details.append({
'kind': kind,
'status': row.status,
'size_bytes': row.size_bytes,
'error': row.error,
'updated_at': (row.last_access_at or row.created_at).isoformat() + 'Z' if (row.last_access_at or row.created_at) else None,
})
required = {'decrypted_low', 'decrypted_high'}
if content_type.startswith('video/'):
required.add('decrypted_preview')
statuses_by_kind = {kind: derivative_latest[kind].status for kind in required if kind in derivative_latest}
conversion_state = 'pending'
if required and all(statuses_by_kind.get(kind) == 'ready' for kind in required):
conversion_state = 'ready'
elif any(statuses_by_kind.get(kind) == 'failed' for kind in required):
conversion_state = 'failed'
elif any(statuses_by_kind.get(kind) in ('processing', 'pending') for kind in required):
conversion_state = 'processing'
elif statuses_by_kind:
conversion_state = 'partial'
upload_state = upload_row.state if upload_row else None
final_state = 'ready' if conversion_state == 'ready' else None
if not final_state:
if conversion_state == 'failed' or upload_state in ('failed', 'conversion_failed'):
final_state = 'failed'
elif conversion_state in ('processing', 'partial') or upload_state in ('processing', 'pinned'):
final_state = 'processing'
else:
final_state = 'uploaded'
return {
'final_state': final_state,
'conversion_state': conversion_state,
'upload_state': upload_state,
'summary': dict(summary),
'details': details,
'title': ec.title if ec else None,
'content_type': content_type,
}
async def t_callback_owned_content(query: types.CallbackQuery, memory=None, user=None, db_session=None, chat_wrap=None, **extra):
message_text = user.translated("ownedContent_menu")
content_list = []
user_addr = await user.wallet_address_async(db_session)
conditions = []
if user_addr:
conditions.append(and_(StoredContent.owner_address == user_addr, StoredContent.type.like('onchain%')))
conditions.append(and_(StoredContent.user_id == user.id, StoredContent.type.like('local/%')))
if not conditions:
conditions = [StoredContent.user_id == user.id]
stmt = select(StoredContent).where(
StoredContent.disabled.is_(None),
or_(*conditions) if len(conditions) > 1 else conditions[0]
).order_by(StoredContent.created.desc())
rows = (await db_session.execute(stmt)).scalars().all()
onchain_hashes = set()
local_items = []
icon_map = {
'ready': '',
'processing': '',
'failed': '⚠️',
'uploaded': '📦',
}
for content in rows:
meta = content.meta or {}
encrypted_cid = meta.get('content_cid') or meta.get('encrypted_cid') or content.content_id
status_info = await _compute_content_status(db_session, encrypted_cid, meta.get('content_type'))
icon = icon_map.get(status_info['final_state'], '📦')
if content.type.startswith('onchain'):
try:
metadata_content = await StoredContent.from_cid_async(db_session, content.json_format()['metadata_cid'])
with open(metadata_content.filepath, 'r') as f:
metadata_content_json = json.loads(f.read())
except BaseException as e:
make_log("OwnedContent", f"Can't get metadata content: {e}", level='warning')
continue
onchain_hashes.add(content.hash)
display_name = metadata_content_json.get('name') or content.cid.serialize_v2()
content_list.append([
{
'text': f"{icon} {display_name}"[:64],
'callback_data': f'NC_{content.id}'
}
])
else:
local_items.append((content, status_info, icon))
for content, status_info, icon in local_items:
if content.hash in onchain_hashes:
continue
meta = content.meta or {}
encrypted_cid = meta.get('encrypted_cid') or content.content_id
display_name = status_info['title'] or content.filename or content.cid.serialize_v2()
button_text = f"{icon} {display_name}"
content_list.append([
{
'text': button_text[:64],
'callback_data': f'LC_{content.id}'
}
])
return await tg_process_template(
chat_wrap, message_text,
keyboard=get_inline_keyboard([
*content_list,
[{
'text': user.translated('webApp_uploadContent_button'),
'web_app': types.WebAppInfo(
url=WEB_APP_URLS['uploadContent']
)
}],
[{
'text': user.translated('back_button'),
'callback_data': 'home'
}]
]), message_id=query.message.message_id
)
async def t_callback_node_content(query: types.CallbackQuery, memory=None, user=None, db_session=None, chat_wrap=None, **extra):
content_oid = int(query.data.split('_')[1])
row = (await db_session.execute(select(StoredContent).where(StoredContent.id == content_oid))).scalars().first()
return await chat_wrap.send_content(
db_session, row,
extra_buttons=[
[{
'text': user.translated('back_button'),
'callback_data': 'ownedContent'
}]
],
message_id=query.message.message_id
)
router.callback_query.register(t_callback_owned_content, F.data == 'ownedContent')
router.callback_query.register(t_callback_node_content, F.data.startswith('NC_'))
async def t_callback_local_content(query: types.CallbackQuery, memory=None, user=None, db_session=None, chat_wrap=None, **extra):
content_oid = int(query.data.split('_')[1])
content = (await db_session.execute(select(StoredContent).where(StoredContent.id == content_oid))).scalars().first()
if not content:
return await query.answer(user.translated('error_contentNotFound'), show_alert=True)
upload_id = (content.meta or {}).get('upload_id')
upload_session = await db_session.get(UploadSession, upload_id) if upload_id else None
encrypted_cid = (content.meta or {}).get('encrypted_cid') or content.content_id
status_info = await _compute_content_status(db_session, encrypted_cid, (content.meta or {}).get('content_type'))
display_name = status_info['title'] or content.filename or content.cid.serialize_v2()
state_label = {
'ready': 'Готов',
'processing': 'Обработка',
'failed': 'Ошибка',
'uploaded': 'Загружено',
}.get(status_info['final_state'], 'Статус неизвестен')
lines = [
f"<b>{display_name}</b>",
f"Состояние: {state_label}"
]
if upload_session:
lines.append(f"Статус загрузки: {upload_session.state}")
if upload_session.error:
lines.append(f"Ошибка: {upload_session.error}")
if status_info['summary']:
lines.append("Конвертация:")
for status, count in status_info['summary'].items():
lines.append(f"{status}: {count}")
await chat_wrap.send_message(
'\n'.join(lines),
message_type='notification',
message_meta={'content_id': content.id},
reply_markup=get_inline_keyboard([
[{
'text': user.translated('back_button'),
'callback_data': 'ownedContent'
}]
])
)
router.callback_query.register(t_callback_local_content, F.data.startswith('LC_'))