import base58 from aiogram import types, Router, F from collections import defaultdict from datetime import datetime from typing import Optional from app.core._config import WEB_APP_URLS from app.core._keyboards import get_inline_keyboard from app.core._utils.tg_process_template import tg_process_template from app.core.logger import make_log from app.core.models.node_storage import StoredContent from app.core.models.content_v3 import UploadSession, EncryptedContent, ContentDerivative from sqlalchemy import select, and_, or_ import json router = Router() def chunks(lst, n): """Yield successive n-sized chunks from lst.""" for i in range(0, len(lst), n): yield lst[i:i + n] async def _compute_content_status(db_session, encrypted_cid: Optional[str], fallback_content_type: Optional[str] = None): if not encrypted_cid: return { 'final_state': 'uploaded', 'conversion_state': 'pending', 'upload_state': None, 'summary': {}, 'details': [], 'title': None, 'content_type': fallback_content_type, } ec = (await db_session.execute(select(EncryptedContent).where(EncryptedContent.encrypted_cid == encrypted_cid))).scalars().first() content_type = fallback_content_type or (ec.content_type if ec else None) or 'application/octet-stream' derivative_rows = [] if ec: derivative_rows = (await db_session.execute(select(ContentDerivative).where(ContentDerivative.content_id == ec.id))).scalars().all() upload_row = (await db_session.execute(select(UploadSession).where(UploadSession.encrypted_cid == encrypted_cid))).scalars().first() derivative_sorted = sorted(derivative_rows, key=lambda row: row.created_at or datetime.min) derivative_latest = {} summary = defaultdict(int) details = [] for row in derivative_sorted: derivative_latest[row.kind] = row for kind, row in derivative_latest.items(): summary[row.status] += 1 details.append({ 'kind': kind, 'status': row.status, 'size_bytes': row.size_bytes, 'error': row.error, 'updated_at': (row.last_access_at or row.created_at).isoformat() + 'Z' if (row.last_access_at or row.created_at) else None, }) if content_type.startswith('audio/'): required = {'decrypted_low', 'decrypted_high'} elif content_type.startswith('video/'): required = {'decrypted_low', 'decrypted_high', 'decrypted_preview'} else: required = {'decrypted_original'} statuses_by_kind = {kind: derivative_latest[kind].status for kind in required if kind in derivative_latest} conversion_state = 'pending' if required and all(statuses_by_kind.get(kind) == 'ready' for kind in required): conversion_state = 'ready' elif any(statuses_by_kind.get(kind) == 'failed' for kind in required): conversion_state = 'failed' elif any(statuses_by_kind.get(kind) in ('processing', 'pending') for kind in required): conversion_state = 'processing' elif statuses_by_kind: conversion_state = 'partial' upload_state = upload_row.state if upload_row else None final_state = 'ready' if conversion_state == 'ready' else None if not final_state: if conversion_state == 'failed' or upload_state in ('failed', 'conversion_failed'): final_state = 'failed' elif conversion_state in ('processing', 'partial') or upload_state in ('processing', 'pinned'): final_state = 'processing' else: final_state = 'uploaded' return { 'final_state': final_state, 'conversion_state': conversion_state, 'upload_state': upload_state, 'summary': dict(summary), 'details': details, 'title': ec.title if ec else None, 'content_type': content_type, } async def t_callback_owned_content(query: types.CallbackQuery, memory=None, user=None, db_session=None, chat_wrap=None, **extra): message_text = user.translated("ownedContent_menu") content_list = [] user_addr = await user.wallet_address_async(db_session) conditions = [] if user_addr: conditions.append(and_(StoredContent.owner_address == user_addr, StoredContent.type.like('onchain%'))) conditions.append(and_(StoredContent.user_id == user.id, StoredContent.type.like('local/%'))) if not conditions: conditions = [StoredContent.user_id == user.id] stmt = select(StoredContent).where( StoredContent.disabled.is_(None), or_(*conditions) if len(conditions) > 1 else conditions[0] ).order_by(StoredContent.created.desc()) rows = (await db_session.execute(stmt)).scalars().all() onchain_hashes = set() local_items = [] icon_map = { 'ready': '✅', 'processing': '⏳', 'failed': '⚠️', 'uploaded': '📦', } for content in rows: meta = content.meta or {} encrypted_cid = meta.get('content_cid') or meta.get('encrypted_cid') or content.content_id status_info = await _compute_content_status(db_session, encrypted_cid, meta.get('content_type')) icon = icon_map.get(status_info['final_state'], '📦') if content.type.startswith('onchain'): try: metadata_content = await StoredContent.from_cid_async(db_session, content.json_format()['metadata_cid']) with open(metadata_content.filepath, 'r') as f: metadata_content_json = json.loads(f.read()) except BaseException as e: make_log("OwnedContent", f"Can't get metadata content: {e}", level='warning') continue onchain_hashes.add(content.hash) display_name = metadata_content_json.get('name') or content.cid.serialize_v2() content_list.append([ { 'text': f"{icon} {display_name}"[:64], 'callback_data': f'NC_{content.id}' } ]) else: local_items.append((content, status_info, icon)) for content, status_info, icon in local_items: if content.hash in onchain_hashes: continue meta = content.meta or {} encrypted_cid = meta.get('encrypted_cid') or content.content_id display_name = status_info['title'] or content.filename or content.cid.serialize_v2() button_text = f"{icon} {display_name}" content_list.append([ { 'text': button_text[:64], 'callback_data': f'LC_{content.id}' } ]) return await tg_process_template( chat_wrap, message_text, keyboard=get_inline_keyboard([ *content_list, [{ 'text': user.translated('webApp_uploadContent_button'), 'web_app': types.WebAppInfo( url=WEB_APP_URLS['uploadContent'] ) }], [{ 'text': user.translated('back_button'), 'callback_data': 'home' }] ]), message_id=query.message.message_id ) async def t_callback_node_content(query: types.CallbackQuery, memory=None, user=None, db_session=None, chat_wrap=None, **extra): content_oid = int(query.data.split('_')[1]) row = (await db_session.execute(select(StoredContent).where(StoredContent.id == content_oid))).scalars().first() return await chat_wrap.send_content( db_session, row, extra_buttons=[ [{ 'text': user.translated('back_button'), 'callback_data': 'ownedContent' }] ], message_id=query.message.message_id ) router.callback_query.register(t_callback_owned_content, F.data == 'ownedContent') router.callback_query.register(t_callback_node_content, F.data.startswith('NC_')) async def t_callback_local_content(query: types.CallbackQuery, memory=None, user=None, db_session=None, chat_wrap=None, **extra): content_oid = int(query.data.split('_')[1]) content = (await db_session.execute(select(StoredContent).where(StoredContent.id == content_oid))).scalars().first() if not content: return await query.answer(user.translated('error_contentNotFound'), show_alert=True) upload_id = (content.meta or {}).get('upload_id') upload_session = await db_session.get(UploadSession, upload_id) if upload_id else None encrypted_cid = (content.meta or {}).get('encrypted_cid') or content.content_id status_info = await _compute_content_status(db_session, encrypted_cid, (content.meta or {}).get('content_type')) display_name = status_info['title'] or content.filename or content.cid.serialize_v2() state_label = { 'ready': 'Готов', 'processing': 'Обработка', 'failed': 'Ошибка', 'uploaded': 'Загружено', }.get(status_info['final_state'], 'Статус неизвестен') lines = [ f"{display_name}", f"Состояние: {state_label}" ] if upload_session: lines.append(f"Статус загрузки: {upload_session.state}") if upload_session.error: lines.append(f"Ошибка: {upload_session.error}") if status_info['summary']: lines.append("Конвертация:") for status, count in status_info['summary'].items(): lines.append(f"• {status}: {count}") await chat_wrap.send_message( '\n'.join(lines), message_type='notification', message_meta={'content_id': content.id}, reply_markup=get_inline_keyboard([ [{ 'text': user.translated('back_button'), 'callback_data': 'ownedContent' }] ]) ) router.callback_query.register(t_callback_local_content, F.data.startswith('LC_'))