253 lines
9.8 KiB
Python
253 lines
9.8 KiB
Python
import base58
|
|
from aiogram import types, Router, F
|
|
from collections import defaultdict
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
|
|
from app.core._config import WEB_APP_URLS
|
|
from app.core._keyboards import get_inline_keyboard
|
|
from app.core._utils.tg_process_template import tg_process_template
|
|
from app.core.logger import make_log
|
|
from app.core.models.node_storage import StoredContent
|
|
from app.core.models.content_v3 import UploadSession, EncryptedContent, ContentDerivative
|
|
from sqlalchemy import select, and_, or_
|
|
import json
|
|
|
|
router = Router()
|
|
|
|
|
|
def chunks(lst, n):
|
|
"""Yield successive n-sized chunks from lst."""
|
|
for i in range(0, len(lst), n):
|
|
yield lst[i:i + n]
|
|
|
|
|
|
async def _compute_content_status(db_session, encrypted_cid: Optional[str], fallback_content_type: Optional[str] = None):
|
|
if not encrypted_cid:
|
|
return {
|
|
'final_state': 'uploaded',
|
|
'conversion_state': 'pending',
|
|
'upload_state': None,
|
|
'summary': {},
|
|
'details': [],
|
|
'title': None,
|
|
'content_type': fallback_content_type,
|
|
}
|
|
|
|
ec = (await db_session.execute(select(EncryptedContent).where(EncryptedContent.encrypted_cid == encrypted_cid))).scalars().first()
|
|
content_type = fallback_content_type or (ec.content_type if ec else None) or 'application/octet-stream'
|
|
|
|
derivative_rows = []
|
|
if ec:
|
|
derivative_rows = (await db_session.execute(select(ContentDerivative).where(ContentDerivative.content_id == ec.id))).scalars().all()
|
|
upload_row = (await db_session.execute(select(UploadSession).where(UploadSession.encrypted_cid == encrypted_cid))).scalars().first()
|
|
|
|
derivative_sorted = sorted(derivative_rows, key=lambda row: row.created_at or datetime.min)
|
|
derivative_latest = {}
|
|
summary = defaultdict(int)
|
|
details = []
|
|
for row in derivative_sorted:
|
|
derivative_latest[row.kind] = row
|
|
for kind, row in derivative_latest.items():
|
|
summary[row.status] += 1
|
|
details.append({
|
|
'kind': kind,
|
|
'status': row.status,
|
|
'size_bytes': row.size_bytes,
|
|
'error': row.error,
|
|
'updated_at': (row.last_access_at or row.created_at).isoformat() + 'Z' if (row.last_access_at or row.created_at) else None,
|
|
})
|
|
|
|
if content_type.startswith('audio/'):
|
|
required = {'decrypted_low', 'decrypted_high'}
|
|
elif content_type.startswith('video/'):
|
|
required = {'decrypted_low', 'decrypted_high', 'decrypted_preview'}
|
|
else:
|
|
required = {'decrypted_original'}
|
|
|
|
statuses_by_kind = {kind: derivative_latest[kind].status for kind in required if kind in derivative_latest}
|
|
conversion_state = 'pending'
|
|
if required and all(statuses_by_kind.get(kind) == 'ready' for kind in required):
|
|
conversion_state = 'ready'
|
|
elif any(statuses_by_kind.get(kind) == 'failed' for kind in required):
|
|
conversion_state = 'failed'
|
|
elif any(statuses_by_kind.get(kind) in ('processing', 'pending') for kind in required):
|
|
conversion_state = 'processing'
|
|
elif statuses_by_kind:
|
|
conversion_state = 'partial'
|
|
|
|
upload_state = upload_row.state if upload_row else None
|
|
final_state = 'ready' if conversion_state == 'ready' else None
|
|
if not final_state:
|
|
if conversion_state == 'failed' or upload_state in ('failed', 'conversion_failed'):
|
|
final_state = 'failed'
|
|
elif conversion_state in ('processing', 'partial') or upload_state in ('processing', 'pinned'):
|
|
final_state = 'processing'
|
|
else:
|
|
final_state = 'uploaded'
|
|
|
|
return {
|
|
'final_state': final_state,
|
|
'conversion_state': conversion_state,
|
|
'upload_state': upload_state,
|
|
'summary': dict(summary),
|
|
'details': details,
|
|
'title': ec.title if ec else None,
|
|
'content_type': content_type,
|
|
}
|
|
|
|
|
|
async def t_callback_owned_content(query: types.CallbackQuery, memory=None, user=None, db_session=None, chat_wrap=None, **extra):
|
|
message_text = user.translated("ownedContent_menu")
|
|
content_list = []
|
|
user_addr = await user.wallet_address_async(db_session)
|
|
conditions = []
|
|
if user_addr:
|
|
conditions.append(and_(StoredContent.owner_address == user_addr, StoredContent.type.like('onchain%')))
|
|
conditions.append(and_(StoredContent.user_id == user.id, StoredContent.type.like('local/%')))
|
|
|
|
if not conditions:
|
|
conditions = [StoredContent.user_id == user.id]
|
|
|
|
stmt = select(StoredContent).where(
|
|
StoredContent.disabled.is_(None),
|
|
or_(*conditions) if len(conditions) > 1 else conditions[0]
|
|
).order_by(StoredContent.created.desc())
|
|
|
|
rows = (await db_session.execute(stmt)).scalars().all()
|
|
|
|
onchain_hashes = set()
|
|
local_items = []
|
|
|
|
icon_map = {
|
|
'ready': '✅',
|
|
'processing': '⏳',
|
|
'failed': '⚠️',
|
|
'uploaded': '📦',
|
|
}
|
|
|
|
for content in rows:
|
|
meta = content.meta or {}
|
|
encrypted_cid = meta.get('content_cid') or meta.get('encrypted_cid') or content.content_id
|
|
status_info = await _compute_content_status(db_session, encrypted_cid, meta.get('content_type'))
|
|
icon = icon_map.get(status_info['final_state'], '📦')
|
|
|
|
if content.type.startswith('onchain'):
|
|
try:
|
|
metadata_content = await StoredContent.from_cid_async(db_session, content.json_format()['metadata_cid'])
|
|
with open(metadata_content.filepath, 'r') as f:
|
|
metadata_content_json = json.loads(f.read())
|
|
except BaseException as e:
|
|
make_log("OwnedContent", f"Can't get metadata content: {e}", level='warning')
|
|
continue
|
|
|
|
onchain_hashes.add(content.hash)
|
|
display_name = metadata_content_json.get('name') or content.cid.serialize_v2()
|
|
content_list.append([
|
|
{
|
|
'text': f"{icon} {display_name}"[:64],
|
|
'callback_data': f'NC_{content.id}'
|
|
}
|
|
])
|
|
else:
|
|
local_items.append((content, status_info, icon))
|
|
|
|
for content, status_info, icon in local_items:
|
|
if content.hash in onchain_hashes:
|
|
continue
|
|
meta = content.meta or {}
|
|
encrypted_cid = meta.get('encrypted_cid') or content.content_id
|
|
display_name = status_info['title'] or content.filename or content.cid.serialize_v2()
|
|
button_text = f"{icon} {display_name}"
|
|
content_list.append([
|
|
{
|
|
'text': button_text[:64],
|
|
'callback_data': f'LC_{content.id}'
|
|
}
|
|
])
|
|
|
|
return await tg_process_template(
|
|
chat_wrap, message_text,
|
|
keyboard=get_inline_keyboard([
|
|
*content_list,
|
|
[{
|
|
'text': user.translated('webApp_uploadContent_button'),
|
|
'web_app': types.WebAppInfo(
|
|
url=WEB_APP_URLS['uploadContent']
|
|
)
|
|
}],
|
|
[{
|
|
'text': user.translated('back_button'),
|
|
'callback_data': 'home'
|
|
}]
|
|
]), message_id=query.message.message_id
|
|
)
|
|
|
|
|
|
async def t_callback_node_content(query: types.CallbackQuery, memory=None, user=None, db_session=None, chat_wrap=None, **extra):
|
|
content_oid = int(query.data.split('_')[1])
|
|
row = (await db_session.execute(select(StoredContent).where(StoredContent.id == content_oid))).scalars().first()
|
|
return await chat_wrap.send_content(
|
|
db_session, row,
|
|
extra_buttons=[
|
|
[{
|
|
'text': user.translated('back_button'),
|
|
'callback_data': 'ownedContent'
|
|
}]
|
|
],
|
|
message_id=query.message.message_id
|
|
)
|
|
|
|
|
|
|
|
router.callback_query.register(t_callback_owned_content, F.data == 'ownedContent')
|
|
router.callback_query.register(t_callback_node_content, F.data.startswith('NC_'))
|
|
|
|
|
|
async def t_callback_local_content(query: types.CallbackQuery, memory=None, user=None, db_session=None, chat_wrap=None, **extra):
|
|
content_oid = int(query.data.split('_')[1])
|
|
content = (await db_session.execute(select(StoredContent).where(StoredContent.id == content_oid))).scalars().first()
|
|
if not content:
|
|
return await query.answer(user.translated('error_contentNotFound'), show_alert=True)
|
|
|
|
upload_id = (content.meta or {}).get('upload_id')
|
|
upload_session = await db_session.get(UploadSession, upload_id) if upload_id else None
|
|
|
|
encrypted_cid = (content.meta or {}).get('encrypted_cid') or content.content_id
|
|
status_info = await _compute_content_status(db_session, encrypted_cid, (content.meta or {}).get('content_type'))
|
|
display_name = status_info['title'] or content.filename or content.cid.serialize_v2()
|
|
state_label = {
|
|
'ready': 'Готов',
|
|
'processing': 'Обработка',
|
|
'failed': 'Ошибка',
|
|
'uploaded': 'Загружено',
|
|
}.get(status_info['final_state'], 'Статус неизвестен')
|
|
|
|
lines = [
|
|
f"<b>{display_name}</b>",
|
|
f"Состояние: {state_label}"
|
|
]
|
|
if upload_session:
|
|
lines.append(f"Статус загрузки: {upload_session.state}")
|
|
if upload_session.error:
|
|
lines.append(f"Ошибка: {upload_session.error}")
|
|
if status_info['summary']:
|
|
lines.append("Конвертация:")
|
|
for status, count in status_info['summary'].items():
|
|
lines.append(f"• {status}: {count}")
|
|
|
|
await chat_wrap.send_message(
|
|
'\n'.join(lines),
|
|
message_type='notification',
|
|
message_meta={'content_id': content.id},
|
|
reply_markup=get_inline_keyboard([
|
|
[{
|
|
'text': user.translated('back_button'),
|
|
'callback_data': 'ownedContent'
|
|
}]
|
|
])
|
|
)
|
|
|
|
|
|
router.callback_query.register(t_callback_local_content, F.data.startswith('LC_'))
|