diff --git a/app/api/routes/_blockchain.py b/app/api/routes/_blockchain.py index cffccb6..ce9e61d 100644 --- a/app/api/routes/_blockchain.py +++ b/app/api/routes/_blockchain.py @@ -1,4 +1,4 @@ -from base64 import b64encode, b32decode +from base64 import b64encode from datetime import datetime import traceback @@ -7,7 +7,6 @@ from sqlalchemy import and_, select, func from tonsdk.boc import begin_cell, begin_dict from tonsdk.utils import Address -from base58 import b58encode, b58decode from app.core._blockchain.ton.connect import TonConnect, wallet_obj_by_name from app.core._blockchain.ton.platform import platform from app.core._config import PROJECT_HOST @@ -58,92 +57,21 @@ async def s_api_v1_blockchain_send_new_content_message(request): assert field_value(request.json[field_key]), f"Invalid {field_key} provided" # Support legacy: 'content' as decrypted ContentId; and new: 'content' as encrypted IPFS CID + source_content_cid, cid_err = resolve_content(request.json['content']) + assert not cid_err, f"Invalid content CID provided: {cid_err}" + encrypted_content_cid = None - try: - # Legacy path - decrypted_content_cid, err = resolve_content(request.json['content']) - assert not err - decrypted_content = (await request.ctx.db_session.execute( - select(StoredContent).where(StoredContent.hash == decrypted_content_cid.content_hash_b58) - )).scalars().first() - assert decrypted_content and decrypted_content.type == "local/content_bin" + decrypted_content = (await request.ctx.db_session.execute( + select(StoredContent).where(StoredContent.hash == source_content_cid.content_hash_b58) + )).scalars().first() + + if decrypted_content and decrypted_content.type == "local/content_bin": encrypted_content = await create_encrypted_content(request.ctx.db_session, decrypted_content) encrypted_content_cid = encrypted_content.cid - except BaseException: - # New path: treat provided string as encrypted IPFS CID (ENCF v1) - encrypted_ipfs_cid = request.json['content'] - - class _EC: - """Adapter to provide ContentId-like interface for IPFS CID strings.""" - - def __init__(self, cid_str: str): - self._cid = cid_str - self.content_hash = self._extract_content_hash(cid_str) - self._content_hash_b58 = None - - @staticmethod - def _decode_multibase(cid_str: str) -> bytes: - if not cid_str: - raise ValueError("empty CID") - prefix = cid_str[0] - if prefix in ('b', 'B'): - payload = cid_str[1:] - padding = (-len(payload)) % 8 - return b32decode(payload.upper() + ('=' * padding), casefold=True) - if prefix in ('z', 'Z'): - return b58decode(cid_str[1:]) - # CIDv0 (base58btc without explicit multibase prefix) - return b58decode(cid_str) - - @staticmethod - def _read_varint(data: bytes, offset: int): - result = 0 - shift = 0 - while True: - if offset >= len(data): - raise ValueError("truncated varint") - byte = data[offset] - offset += 1 - result |= (byte & 0x7F) << shift - if not (byte & 0x80): - break - shift += 7 - if shift > 63: - raise ValueError("varint overflow") - return result, offset - - @classmethod - def _extract_content_hash(cls, cid_str: str) -> bytes: - data = cls._decode_multibase(cid_str) - offset = 0 - if data and data[0] == 0x01: - version, offset = cls._read_varint(data, offset) - if version != 1: - raise ValueError("unsupported CID version") - _, offset = cls._read_varint(data, offset) # skip codec - code, offset = cls._read_varint(data, offset) - length, offset = cls._read_varint(data, offset) - digest = data[offset:offset + length] - if len(digest) != length: - raise ValueError("truncated multihash digest") - if code != 0x12 or length != 32: - raise ValueError("unsupported multihash (expect sha2-256)") - return digest - - def serialize_v2(self, include_accept_type: bool = False): - return self._cid - - @property - def content_hash_b58(self) -> str: - if self._content_hash_b58 is None: - self._content_hash_b58 = b58encode(self.content_hash).decode() - return self._content_hash_b58 - - try: - encrypted_content_cid = _EC(encrypted_ipfs_cid) - except Exception as exc: - make_log("Blockchain", f"Provided encrypted IPFS CID is invalid: {exc}", level='error') - raise AssertionError("Invalid encrypted content CID provided") from exc + elif source_content_cid.cid_format == 'ipfs': + encrypted_content_cid = source_content_cid + else: + raise AssertionError("Provided content is neither locally available nor a valid encrypted CID") if request.json['image']: image_content_cid, err = resolve_content(request.json['image']) @@ -218,7 +146,7 @@ async def s_api_v1_blockchain_send_new_content_message(request): user_id = str(request.ctx.user.id), user_internal_id=request.ctx.user.id, action_type='freeUpload', - action_ref=str(encrypted_content_cid.content_hash), + action_ref=encrypted_content_cid.serialize_v2(), created=datetime.now() ) request.ctx.db_session.add(promo_action) @@ -275,7 +203,7 @@ async def s_api_v1_blockchain_send_new_content_message(request): title=content_title, free_count=(promo_free_upload_available - 1) ), message_type='hint', message_meta={ - 'encrypted_content_hash': b58encode(encrypted_content_cid.content_hash).decode(), + 'encrypted_content_hash': encrypted_content_cid.content_hash_b58, 'hint_type': 'uploadContentTxRequested' } ) @@ -285,54 +213,59 @@ async def s_api_v1_blockchain_send_new_content_message(request): 'payload': "" }) + user_wallet_address = await request.ctx.user.wallet_address_async(request.ctx.db_session) + assert user_wallet_address, "Wallet address is not linked" + await request.ctx.user_uploader_wrapper.send_message( request.ctx.user.translated('p_uploadContentTxRequested').format( title=content_title, ), message_type='hint', message_meta={ - 'encrypted_content_hash': b58encode(encrypted_content_cid.content_hash).decode(), + 'encrypted_content_hash': encrypted_content_cid.content_hash_b58, 'hint_type': 'uploadContentTxRequested' } ) + payload_cell = ( + begin_cell() + .store_uint(0x5491d08c, 32) + .store_uint(int.from_bytes(encrypted_content_cid.content_hash, "big", signed=False), 256) + .store_address(Address(user_wallet_address)) + .store_ref( + begin_cell() + .store_ref( + begin_cell() + .store_coins(int(0)) + .store_coins(int(0)) + .store_coins(int(request.json['price'])) + .end_cell() + ) + .store_maybe_ref(royalties_dict.end_dict()) + .store_uint(0, 1) + .end_cell() + ) + .store_ref( + begin_cell() + .store_ref( + begin_cell() + .store_bytes(f"{PROJECT_HOST}/api/v1.5/storage/{metadata_content.cid.serialize_v2(include_accept_type=True)}".encode()) + .end_cell() + ) + .store_ref( + begin_cell() + .store_ref(begin_cell().store_bytes(f"{encrypted_content_cid.serialize_v2()}".encode()).end_cell()) + .store_ref(begin_cell().store_bytes(f"{image_content_cid.serialize_v2() if image_content_cid else ''}".encode()).end_cell()) + .store_ref(begin_cell().store_bytes(f"{metadata_content.cid.serialize_v2()}".encode()).end_cell()) + .end_cell() + ) + .end_cell() + ) + .end_cell() + ) + return response.json({ 'address': platform.address.to_string(1, 1, 1), 'amount': str(int(0.03 * 10 ** 9)), - 'payload': b64encode( - begin_cell() - .store_uint(0x5491d08c, 32) - .store_uint(int.from_bytes(encrypted_content_cid.content_hash, "big", signed=False), 256) - .store_uint(0, 2) - .store_ref( - begin_cell() - .store_ref( - begin_cell() - .store_coins(int(0)) - .store_coins(int(0)) - .store_coins(int(request.json['price'])) - .end_cell() - ) - .store_maybe_ref(royalties_dict.end_dict()) - .store_uint(0, 1) - .end_cell() - ) - .store_ref( - begin_cell() - .store_ref( - begin_cell() - .store_bytes(f"{PROJECT_HOST}/api/v1.5/storage/{metadata_content.cid.serialize_v2(include_accept_type=True)}".encode()) - .end_cell() - ) - .store_ref( - begin_cell() - .store_ref(begin_cell().store_bytes(f"{encrypted_content_cid.serialize_v2()}".encode()).end_cell()) - .store_ref(begin_cell().store_bytes(f"{image_content_cid.serialize_v2() if image_content_cid else ''}".encode()).end_cell()) - .store_ref(begin_cell().store_bytes(f"{metadata_content.cid.serialize_v2()}".encode()).end_cell()) - .end_cell() - ) - .end_cell() - ) - .end_cell().to_boc(False) - ).decode() + 'payload': b64encode(payload_cell.to_boc(False)).decode() }) except BaseException as e: make_log("Blockchain", f"Error while sending new content message: {e}" + '\n' + traceback.format_exc(), level='error') @@ -356,14 +289,15 @@ async def s_api_v1_blockchain_send_purchase_content_message(request): license_exist = (await request.ctx.db_session.execute(select(UserContent).where( UserContent.onchain_address == request.json['content_address'] ))).scalars().first() - if license_exist: - from app.core.content.content_id import ContentId - _cid = ContentId.deserialize(license_exist.content.cid.serialize_v2()) - r_content = (await request.ctx.db_session.execute(select(StoredContent).where(StoredContent.hash == _cid.content_hash_b58))).scalars().first() + from app.core.content.content_id import ContentId + + if license_exist and license_exist.content_id: + r_content = (await request.ctx.db_session.execute(select(StoredContent).where( + StoredContent.id == license_exist.content_id + ))).scalars().first() else: - from app.core.content.content_id import ContentId - _cid = ContentId.deserialize(request.json['content_address']) - r_content = (await request.ctx.db_session.execute(select(StoredContent).where(StoredContent.hash == _cid.content_hash_b58))).scalars().first() + requested_cid = ContentId.deserialize(request.json['content_address']) + r_content = (await request.ctx.db_session.execute(select(StoredContent).where(StoredContent.hash == requested_cid.content_hash_b58))).scalars().first() async def open_content_async(session, sc: StoredContent): if not sc.encrypted: diff --git a/app/api/routes/admin.py b/app/api/routes/admin.py index f4e44df..400e83c 100644 --- a/app/api/routes/admin.py +++ b/app/api/routes/admin.py @@ -19,6 +19,7 @@ from app.core._config import ( BACKEND_DATA_DIR_HOST, BACKEND_LOGS_DIR_HOST, LOG_DIR, + CLIENT_TELEGRAM_BOT_USERNAME, PROJECT_HOST, UPLOADS_DIR, ) @@ -35,6 +36,15 @@ from app.core.models.content_v3 import ( ) from app.core.models.my_network import KnownNode from app.core.models.tasks import BlockchainTask +from app.core.models.node_storage import StoredContent +from app.core.models.user import User +from app.core.models.content.user_content import UserContent +from app.core._utils.share_links import build_content_links +from app.core.content.content_id import ContentId + +MIN_ONCHAIN_INDEX = int(os.getenv("MIN_ONCHAIN_INDEX", "8")) + +ALLOWED_UPLOAD_FILTERS = {"all", "issues", "processing", "ready", "unindexed"} ADMIN_COOKIE_NAME = os.getenv('ADMIN_COOKIE_NAME', 'admin_session') ADMIN_COOKIE_MAX_AGE = int(os.getenv('ADMIN_COOKIE_MAX_AGE', '172800')) # 48h default @@ -148,6 +158,42 @@ def _service_states(request) -> List[Dict[str, Any]]: return items +def _format_dt(value: Optional[datetime]) -> Optional[str]: + return value.isoformat() + 'Z' if isinstance(value, datetime) else None + + +def _extract_file_hash(local_path: Optional[str]) -> Optional[str]: + if not local_path: + return None + name = Path(local_path).name + return name or None + + +def _storage_download_url(file_hash: Optional[str]) -> Optional[str]: + if not file_hash: + return None + return f"{PROJECT_HOST}/api/v1.5/storage/{file_hash}" + + +def _pick_primary_download(candidates: List[tuple[str, Optional[str], Optional[int]]]) -> Optional[str]: + priority = ( + 'decrypted_high', + 'decrypted_low', + 'decrypted_preview', + 'high', + 'low', + 'preview', + ) + for target in priority: + for kind, url, _ in candidates: + if kind == target and url: + return url + for _, url, _ in candidates: + if url: + return url + return None + + async def s_api_v1_admin_login(request): token = os.getenv('ADMIN_API_TOKEN') if not token: @@ -309,6 +355,31 @@ async def s_api_v1_admin_uploads(request): session = request.ctx.db_session + raw_filter = (request.args.get('filter') or '').lower() + raw_filters: List[str] = [] + if raw_filter: + raw_filters = [item.strip() for item in raw_filter.split(',') if item.strip()] + effective_filters = [item for item in raw_filters if item in ALLOWED_UPLOAD_FILTERS and item != 'all'] + + search_query = (request.args.get('search') or '').strip() + search_lower = search_query.lower() + + try: + limit = int(request.args.get('limit') or 50) + except Exception: + limit = 50 + limit = max(1, min(limit, 200)) + + try: + scan_limit = int(request.args.get('scan') or 0) + except Exception: + scan_limit = 0 + if scan_limit <= 0: + scan_default = max(limit, 100 if (effective_filters or search_lower) else limit) + scan_limit = min(max(scan_default, limit), 500) + else: + scan_limit = max(limit, min(scan_limit, 500)) + counts_rows = (await session.execute( select(UploadSession.state, func.count()).group_by(UploadSession.state) )).all() @@ -332,10 +403,337 @@ async def s_api_v1_admin_uploads(request): for row in recent_rows ] + content_rows = (await session.execute( + select(EncryptedContent).order_by(EncryptedContent.created_at.desc()).limit(scan_limit) + )).scalars().all() + + content_ids = [row.id for row in content_rows] + encrypted_cids = [row.encrypted_cid for row in content_rows] + + derivatives_map: Dict[int, list[ContentDerivative]] = {cid: [] for cid in content_ids} + if content_ids: + derivative_rows = (await session.execute( + select(ContentDerivative).where(ContentDerivative.content_id.in_(content_ids)) + )).scalars().all() + for derivative in derivative_rows: + derivatives_map.setdefault(derivative.content_id, []).append(derivative) + + ipfs_map: Dict[int, Optional[IpfsSync]] = {} + if content_ids: + ipfs_rows = (await session.execute( + select(IpfsSync).where(IpfsSync.content_id.in_(content_ids)) + )).scalars().all() + for sync in ipfs_rows: + ipfs_map[sync.content_id] = sync + + uploads_map: Dict[str, List[UploadSession]] = {cid: [] for cid in encrypted_cids} + if encrypted_cids: + uploads_for_content = (await session.execute( + select(UploadSession).where(UploadSession.encrypted_cid.in_(encrypted_cids)) + )).scalars().all() + for upload in uploads_for_content: + uploads_map.setdefault(upload.encrypted_cid, []).append(upload) + for chain in uploads_map.values(): + chain.sort(key=lambda u: (u.updated_at or u.created_at or datetime.min)) + + stored_map: Dict[str, StoredContent] = {} + stored_by_id: Dict[int, StoredContent] = {} + if encrypted_cids: + stored_rows = (await session.execute( + select(StoredContent).where(StoredContent.content_id.in_(encrypted_cids)) + )).scalars().all() + for stored in stored_rows: + stored_map[stored.content_id] = stored + stored_by_id[stored.id] = stored + + user_map: Dict[int, User] = {} + user_ids = {stored.user_id for stored in stored_map.values() if stored.user_id} + if user_ids: + user_rows = (await session.execute(select(User).where(User.id.in_(user_ids)))).scalars().all() + for user in user_rows: + user_map[user.id] = user + + license_counts: Dict[int, int] = {} + stored_ids = list(stored_by_id.keys()) + if stored_ids: + license_rows = (await session.execute( + select(UserContent.content_id, func.count()) + .where(UserContent.content_id.in_(stored_ids)) + .group_by(UserContent.content_id) + )).all() + for content_id, count in license_rows: + license_counts[int(content_id)] = int(count) + + contents_payload: List[Dict[str, Any]] = [] + category_totals: Dict[str, int] = {key: 0 for key in ALLOWED_UPLOAD_FILTERS if key != 'all'} + matched_total = 0 + for content in content_rows: + derivatives = derivatives_map.get(content.id, []) + attempts: Dict[str, int] = defaultdict(int) + derivative_entries: List[Dict[str, Any]] = [] + summary: Dict[str, int] = defaultdict(int) + download_candidates: List[tuple[str, Optional[str], Optional[int]]] = [] + + for derivative in sorted(derivatives, key=lambda item: item.created_at or datetime.min): + summary[derivative.status] += 1 + attempts[derivative.kind] += 1 + file_hash = _extract_file_hash(derivative.local_path) + download_url = _storage_download_url(file_hash) + derivative_entries.append({ + 'kind': derivative.kind, + 'status': derivative.status, + 'size_bytes': derivative.size_bytes, + 'error': derivative.error, + 'created_at': _format_dt(derivative.created_at), + 'updated_at': _format_dt(derivative.last_access_at or derivative.created_at), + 'attempts': attempts[derivative.kind], + 'download_url': download_url, + }) + download_candidates.append((derivative.kind, download_url, derivative.size_bytes)) + + conversion_state = None + if summary.get('ready'): + conversion_state = 'ready' + elif summary.get('processing'): + conversion_state = 'processing' + elif summary.get('pending'): + conversion_state = 'pending' + elif summary.get('failed'): + conversion_state = 'failed' + + upload_chain = uploads_map.get(content.encrypted_cid, []) + latest_upload = upload_chain[-1] if upload_chain else None + upload_history = [ + { + 'state': entry.state, + 'at': _format_dt(entry.updated_at or entry.created_at), + 'error': entry.error, + 'filename': entry.filename, + } + for entry in upload_chain + ] + + ipfs_sync = ipfs_map.get(content.id) + + stored = stored_map.get(content.encrypted_cid) + metadata_cid = None + content_hash = None + stored_payload: Optional[Dict[str, Any]] = None + blockchain_payload: Optional[Dict[str, Any]] = None + if stored: + metadata_cid = (stored.meta or {}).get('metadata_cid') + content_hash = stored.hash + download_candidates.append(('stored', stored.web_url, None)) + stored_payload = { + 'stored_id': stored.id, + 'type': stored.type, + 'owner_address': stored.owner_address, + 'user_id': stored.user_id, + 'status': stored.status, + 'content_url': stored.web_url, + 'download_url': stored.web_url, + 'created': _format_dt(stored.created), + 'updated': _format_dt(stored.updated), + } + if stored.user_id and stored.user_id in user_map: + user = user_map[stored.user_id] + stored_payload['user'] = { + 'id': user.id, + 'telegram_id': user.telegram_id, + 'username': user.username, + 'first_name': (user.meta or {}).get('first_name') if user.meta else None, + 'last_name': (user.meta or {}).get('last_name') if user.meta else None, + } + blockchain_payload = { + 'onchain_index': stored.onchain_index, + 'item_address': (stored.meta or {}).get('item_address'), + 'indexed': stored.onchain_index is not None and stored.onchain_index >= MIN_ONCHAIN_INDEX, + 'license_count': license_counts.get(stored.id, 0), + } + + else: + try: + cid_obj = ContentId.deserialize(content.encrypted_cid) + content_hash = cid_obj.content_hash_b58 + except Exception: + content_hash = None + + share_target = None + if stored: + try: + share_target = stored.cid.serialize_v2() + except Exception: + share_target = content.encrypted_cid + else: + share_target = content.encrypted_cid + + _, startapp_url, web_view_url = build_content_links( + share_target, + None, + project_host=PROJECT_HOST, + bot_username=CLIENT_TELEGRAM_BOT_USERNAME, + ) + + primary_download = _pick_primary_download(download_candidates) + derivative_downloads = [ + { + 'kind': kind, + 'url': url, + 'size_bytes': size_bytes, + } + for kind, url, size_bytes in download_candidates + if url + ] + + upload_state_norm = (latest_upload.state or '').lower() if latest_upload else '' + conversion_state_norm = (conversion_state or '').lower() if conversion_state else '' + ipfs_state_norm = (ipfs_sync.pin_state or '').lower() if (ipfs_sync and ipfs_sync.pin_state) else '' + derivative_states_norm = [(derivative.status or '').lower() for derivative in derivatives] + status_values = [upload_state_norm, conversion_state_norm, ipfs_state_norm] + derivative_states_norm + + has_issue = any( + value and ("fail" in value or "error" in value or "timeout" in value) + for value in status_values + ) + if not has_issue and any(event.get('error') for event in upload_history): + has_issue = True + if not has_issue and any(derivative.error for derivative in derivatives): + has_issue = True + if not has_issue and ipfs_sync and ipfs_sync.pin_error: + has_issue = True + + is_onchain_indexed = bool(blockchain_payload and blockchain_payload.get('indexed')) + is_unindexed = not is_onchain_indexed + + conversion_done = ( + summary.get('ready', 0) > 0 + or conversion_state_norm in ('ready', 'converted') + or any(state in ('ready', 'converted', 'complete') for state in derivative_states_norm) + ) + ipfs_done = ipfs_state_norm in ('pinned', 'ready') + + is_ready = not has_issue and conversion_done and (ipfs_done or not ipfs_sync) and is_onchain_indexed + + processing_tokens = ('process', 'pending', 'queue', 'upload', 'pin', 'sync') + has_processing_keywords = any( + value and any(token in value for token in processing_tokens) + for value in status_values + ) + + categories = set() + if has_issue: + categories.add('issues') + if is_ready: + categories.add('ready') + if is_unindexed: + categories.add('unindexed') + + is_processing = not is_ready and not has_issue and has_processing_keywords + if is_processing: + categories.add('processing') + if not is_ready and not has_issue and 'processing' not in categories: + categories.add('processing') + + flags = { + 'issues': 'issues' in categories, + 'processing': 'processing' in categories, + 'ready': 'ready' in categories, + 'unindexed': 'unindexed' in categories, + } + + search_parts: List[Any] = [ + content.title, + content.description, + content.encrypted_cid, + metadata_cid, + content_hash, + ] + if blockchain_payload: + search_parts.append(blockchain_payload.get('item_address') or '') + if stored_payload: + search_parts.append(stored_payload.get('owner_address') or '') + user_info = stored_payload.get('user') or {} + search_parts.extend( + [ + str(user_info.get('id') or ''), + str(user_info.get('telegram_id') or ''), + user_info.get('username') or '', + user_info.get('first_name') or '', + user_info.get('last_name') or '', + ] + ) + search_blob = ' '.join(str(part) for part in search_parts if part).lower() + + matches_filter = (not effective_filters) or any(cat in categories for cat in effective_filters) + matches_search = (not search_lower) or (search_lower in search_blob) + + if not matches_filter or not matches_search: + continue + + matched_total += 1 + for cat in categories: + if cat in category_totals: + category_totals[cat] += 1 + + if len(contents_payload) >= limit: + continue + + contents_payload.append({ + 'encrypted_cid': content.encrypted_cid, + 'metadata_cid': metadata_cid, + 'content_hash': content_hash, + 'title': content.title, + 'description': content.description, + 'content_type': content.content_type, + 'size': { + 'encrypted': content.enc_size_bytes, + 'plain': content.plain_size_bytes, + }, + 'created_at': _format_dt(content.created_at), + 'updated_at': _format_dt(content.updated_at), + 'status': { + 'upload_state': latest_upload.state if latest_upload else None, + 'conversion_state': conversion_state, + 'ipfs_state': ipfs_sync.pin_state if ipfs_sync else None, + 'onchain': blockchain_payload, + }, + 'upload_history': upload_history, + 'derivative_summary': dict(summary), + 'derivatives': derivative_entries, + 'ipfs': ( + { + 'pin_state': ipfs_sync.pin_state, + 'pin_error': ipfs_sync.pin_error, + 'bytes_total': ipfs_sync.bytes_total, + 'bytes_fetched': ipfs_sync.bytes_fetched, + 'pinned_at': _format_dt(ipfs_sync.pinned_at), + 'updated_at': _format_dt(ipfs_sync.updated_at), + } + if ipfs_sync else None + ), + 'stored': stored_payload, + 'links': { + 'web_view': web_view_url, + 'start_app': startapp_url, + 'api_view': f"{PROJECT_HOST}/api/v1/content.view/{share_target}", + 'download_primary': primary_download, + 'download_derivatives': derivative_downloads, + }, + 'flags': flags, + }) + payload = { 'total': total, 'states': counts, 'recent': recent, + 'contents': contents_payload, + 'matching_total': matched_total, + 'filter': effective_filters or ['all'], + 'search': search_query or None, + 'limit': limit, + 'scan': scan_limit, + 'scanned': len(content_rows), + 'category_totals': category_totals, } return response.json(payload) @@ -347,10 +745,15 @@ async def s_api_v1_admin_system(request): session = request.ctx.db_session config_rows = (await session.execute(select(ServiceConfigValue).order_by(ServiceConfigValue.key))).scalars().all() - config_payload = [ - {'key': row.key, 'value': row.value, 'raw': row.packed_value} - for row in config_rows - ] + config_payload = [] + for row in config_rows: + key_lower = (row.key or '').lower() + masked = ('private' in key_lower and 'key' in key_lower) or ('seed' in key_lower) + config_payload.append({ + 'key': row.key, + 'value': '*** hidden ***' if masked else row.value, + 'raw': None if masked else row.packed_value, + }) env_summary = { 'PROJECT_NAME': os.getenv('PROJECT_NAME'), diff --git a/app/api/routes/auth.py b/app/api/routes/auth.py index a27b33b..68c792c 100644 --- a/app/api/routes/auth.py +++ b/app/api/routes/auth.py @@ -61,6 +61,21 @@ async def s_api_v1_auth_twa(request): )).scalars().first() assert known_user, "User not created" + meta_updated = False + if not (known_user.meta or {}).get('ref_id'): + known_user.ensure_ref_id() + meta_updated = True + + incoming_ref_id = auth_data.get('ref_id') + stored_ref_id = (known_user.meta or {}).get('ref_id') + if incoming_ref_id and incoming_ref_id != stored_ref_id: + if (known_user.meta or {}).get('referrer_id') != incoming_ref_id: + known_user.meta = { + **(known_user.meta or {}), + 'referrer_id': incoming_ref_id + } + meta_updated = True + new_user_key = await known_user.create_api_token_v1(request.ctx.db_session, "USER_API_V1") if auth_data['ton_proof']: try: @@ -116,6 +131,8 @@ async def s_api_v1_auth_twa(request): ) ).order_by(WalletConnection.created.desc()))).scalars().first() known_user.last_use = datetime.now() + if meta_updated: + known_user.updated = datetime.now() await request.ctx.db_session.commit() return response.json({ diff --git a/app/api/routes/content.py b/app/api/routes/content.py index 5562cc5..eeedc12 100644 --- a/app/api/routes/content.py +++ b/app/api/routes/content.py @@ -10,7 +10,8 @@ from app.core.models.keys import KnownKey from app.core.models import StarsInvoice from app.core.models.content.user_content import UserContent from app.core._config import CLIENT_TELEGRAM_API_KEY, PROJECT_HOST -from app.core.models.content_v3 import EncryptedContent as ECv3, ContentDerivative as CDv3 +from app.core.models.content_v3 import EncryptedContent as ECv3, ContentDerivative as CDv3, UploadSession +from app.core.content.content_id import ContentId import json import uuid @@ -50,8 +51,15 @@ async def s_api_v1_content_view(request, content_address: str): license_exist = (await request.ctx.db_session.execute( select(UserContent).where(UserContent.onchain_address == content_address) )).scalars().first() + license_address = None if license_exist: - content_address = license_exist.content.cid.serialize_v2() + license_address = license_exist.onchain_address + if license_exist.content_id: + linked_content = (await request.ctx.db_session.execute( + select(StoredContent).where(StoredContent.id == license_exist.content_id) + )).scalars().first() + if linked_content: + content_address = linked_content.cid.serialize_v2() from app.core.content.content_id import ContentId cid = ContentId.deserialize(content_address) @@ -74,9 +82,12 @@ async def s_api_v1_content_view(request, content_address: str): return {'encrypted_content': encrypted, 'decrypted_content': decrypted, 'content_type': content_type} content = await open_content_async(request.ctx.db_session, r_content) + master_address = content['encrypted_content'].meta.get('item_address', '') opts = { 'content_type': content['content_type'], # возможно с ошибками, нужно переделать на ffprobe - 'content_address': content['encrypted_content'].meta.get('item_address', '') + 'content_address': license_address or master_address, + 'license_address': license_address, + 'master_address': master_address, } if content['encrypted_content'].key_id: known_key = (await request.ctx.db_session.execute( @@ -157,59 +168,168 @@ async def s_api_v1_content_view(request, content_address: str): 'amount': stars_cost, } - display_options = { - 'content_url': None, - } + display_options = {'content_url': None} if have_access: opts['have_licenses'].append('listen') - converted_content = content['encrypted_content'].meta.get('converted_content') - if converted_content: - user_content_option = 'low_preview' - if have_access: - user_content_option = 'low' + enc_cid = content['encrypted_content'].meta.get('content_cid') or content['encrypted_content'].meta.get('encrypted_cid') + ec_v3 = None + derivative_rows = [] + if enc_cid: + ec_v3 = (await request.ctx.db_session.execute(select(ECv3).where(ECv3.encrypted_cid == enc_cid))).scalars().first() + if ec_v3: + derivative_rows = (await request.ctx.db_session.execute(select(CDv3).where(CDv3.content_id == ec_v3.id))).scalars().all() - converted_content = (await request.ctx.db_session.execute(select(StoredContent).where( - StoredContent.hash == converted_content[user_content_option] - ))).scalars().first() - if converted_content: - display_options['content_url'] = converted_content.web_url - opts['content_ext'] = converted_content.filename.split('.')[-1] + upload_row = None + if enc_cid: + upload_row = (await request.ctx.db_session.execute(select(UploadSession).where(UploadSession.encrypted_cid == enc_cid))).scalars().first() + + converted_meta_map = dict(content['encrypted_content'].meta.get('converted_content') or {}) + + derivative_latest = {} + if derivative_rows: + derivative_sorted = sorted(derivative_rows, key=lambda row: row.created_at or datetime.min) + for row in derivative_sorted: + derivative_latest[row.kind] = row + + def _row_to_hash_and_url(row): + if not row or not row.local_path: + return None, None + file_hash = row.local_path.split('/')[-1] + return file_hash, f"{PROJECT_HOST}/api/v1.5/storage/{file_hash}" + + chosen_row = None + if have_access: + for key in ('decrypted_low', 'decrypted_high'): + if key in derivative_latest: + chosen_row = derivative_latest[key] + break else: - # v3 fallback: use derivatives table linked via encrypted_cid from onchain meta - enc_cid = content['encrypted_content'].meta.get('content_cid') or content['encrypted_content'].meta.get('encrypted_cid') - if enc_cid: - ec = (await request.ctx.db_session.execute(select(ECv3).where(ECv3.encrypted_cid == enc_cid))).scalars().first() - if ec: - # choose preview for non-access; low for access - desired = ['decrypted_preview'] if not have_access else ['decrypted_low', 'decrypted_high'] - rows = (await request.ctx.db_session.execute(select(CDv3).where(CDv3.content_id == ec.id, CDv3.status == 'ready'))).scalars().all() - chosen = None - for kind in desired: - chosen = next((r for r in rows if r.kind == kind), None) - if chosen: - break - if chosen and chosen.local_path: - h = chosen.local_path.split('/')[-1] - display_options['content_url'] = f"{PROJECT_HOST}/api/v1.5/storage/{h}" - opts['content_ext'] = (chosen.content_type or '').split('/')[-1] if chosen.content_type else None + for key in ('decrypted_preview', 'decrypted_low'): + if key in derivative_latest: + chosen_row = derivative_latest[key] + break + if chosen_row: + file_hash, url = _row_to_hash_and_url(chosen_row) + if url: + display_options['content_url'] = url + opts['content_ext'] = (chosen_row.content_type or '').split('/')[-1] if chosen_row.content_type else None + converted_meta_map.setdefault('low' if have_access else 'low_preview', file_hash) + + if not display_options['content_url'] and converted_meta_map: + preference = ['low', 'high', 'low_preview'] if have_access else ['low_preview', 'low', 'high'] + for key in preference: + hash_value = converted_meta_map.get(key) + if not hash_value: + continue + stored = (await request.ctx.db_session.execute(select(StoredContent).where(StoredContent.hash == hash_value))).scalars().first() + if stored: + display_options['content_url'] = stored.web_url + opts['content_ext'] = stored.filename.split('.')[-1] + break + + # Metadata fallback content_meta = content['encrypted_content'].json_format() - from app.core.content.content_id import ContentId + content_metadata_json = None _mcid = content_meta.get('metadata_cid') or None - content_metadata = None if _mcid: _cid = ContentId.deserialize(_mcid) content_metadata = (await request.ctx.db_session.execute(select(StoredContent).where(StoredContent.hash == _cid.content_hash_b58))).scalars().first() - with open(content_metadata.filepath, 'r') as f: - content_metadata_json = json.loads(f.read()) + if content_metadata: + try: + with open(content_metadata.filepath, 'r') as f: + content_metadata_json = json.loads(f.read()) + except Exception as exc: + make_log("Content", f"Can't read metadata file: {exc}", level='warning') + + if not content_metadata_json: + fallback_name = (ec_v3.title if ec_v3 else None) or content_meta.get('title') or content_meta.get('cid') + fallback_description = (ec_v3.description if ec_v3 else '') or '' + content_metadata_json = { + 'name': fallback_name or 'Без названия', + 'description': fallback_description, + 'downloadable': False, + } + cover_cid = content_meta.get('cover_cid') + if cover_cid: + content_metadata_json.setdefault('image', f"{PROJECT_HOST}/api/v1.5/storage/{cover_cid}") display_options['metadata'] = content_metadata_json + opts['downloadable'] = content_metadata_json.get('downloadable', False) - if opts['downloadable']: - if not ('listen' in opts['have_licenses']): - opts['downloadable'] = False + if opts['downloadable'] and 'listen' not in opts['have_licenses']: + opts['downloadable'] = False + + # Conversion status summary + conversion_summary = {} + conversion_details = [] + derivative_summary_map = {} + for row in derivative_latest.values(): + conversion_summary[row.status] = conversion_summary.get(row.status, 0) + 1 + derivative_summary_map[row.kind] = row + conversion_details.append({ + 'kind': row.kind, + 'status': row.status, + 'size_bytes': row.size_bytes, + 'content_type': row.content_type, + 'error': row.error, + 'updated_at': (row.last_access_at or row.created_at).isoformat() + 'Z' if (row.last_access_at or row.created_at) else None, + }) + + required_kinds = {'decrypted_low', 'decrypted_high'} + if ec_v3 and ec_v3.content_type.startswith('video/'): + required_kinds.add('decrypted_preview') + + statuses_by_kind = {kind: row.status for kind, row in derivative_summary_map.items() if kind in required_kinds} + conversion_state = 'pending' + if required_kinds and all(statuses_by_kind.get(kind) == 'ready' for kind in required_kinds): + conversion_state = 'ready' + elif any(statuses_by_kind.get(kind) == 'failed' for kind in required_kinds): + conversion_state = 'failed' + elif any(statuses_by_kind.get(kind) in ('processing', 'pending') for kind in required_kinds): + conversion_state = 'processing' + elif statuses_by_kind: + conversion_state = 'partial' + + if display_options['content_url']: + conversion_state = 'ready' + + upload_info = None + if upload_row: + upload_info = { + 'id': upload_row.id, + 'state': upload_row.state, + 'error': upload_row.error, + 'created_at': upload_row.created_at.isoformat() + 'Z' if upload_row.created_at else None, + 'updated_at': upload_row.updated_at.isoformat() + 'Z' if upload_row.updated_at else None, + } + + final_state = 'ready' if display_options['content_url'] else None + if final_state != 'ready': + upload_state = upload_row.state if upload_row else None + if conversion_state == 'failed' or upload_state in ('failed', 'conversion_failed'): + final_state = 'failed' + elif conversion_state in ('processing', 'partial') or upload_state in ('processing', 'pinned'): + final_state = 'processing' + else: + final_state = 'uploaded' + + conversion_info = { + 'state': conversion_state, + 'summary': conversion_summary, + 'details': conversion_details, + 'required_kinds': list(required_kinds), + } + + opts['conversion'] = conversion_info + opts['upload'] = upload_info + opts['status'] = { + 'state': final_state, + 'conversion_state': conversion_state, + 'upload_state': upload_info['state'] if upload_info else None, + } return response.json({ **opts, diff --git a/app/api/routes/upload_status.py b/app/api/routes/upload_status.py index c495004..e8cc0a1 100644 --- a/app/api/routes/upload_status.py +++ b/app/api/routes/upload_status.py @@ -1,5 +1,8 @@ from sanic import response -from app.core.models.content_v3 import UploadSession +from sqlalchemy import select + +from app.core.models.content_v3 import UploadSession, EncryptedContent, ContentDerivative +from app.core._utils.resolve_content import resolve_content async def s_api_v1_upload_status(request, upload_id: str): @@ -7,11 +10,45 @@ async def s_api_v1_upload_status(request, upload_id: str): row = await session.get(UploadSession, upload_id) if not row: return response.json({"error": "NOT_FOUND"}, status=404) + + encrypted_hash = None + conversion = {"state": "not_started", "details": []} + + if row.encrypted_cid: + cid_obj, err = resolve_content(row.encrypted_cid) + if not err: + encrypted_hash = cid_obj.content_hash_b58 + ec = (await session.execute(select(EncryptedContent).where(EncryptedContent.encrypted_cid == row.encrypted_cid))).scalars().first() + if ec: + derivative_rows = (await session.execute( + select(ContentDerivative.kind, ContentDerivative.status).where(ContentDerivative.content_id == ec.id) + )).all() + details = [ + {"kind": kind, "status": status} + for kind, status in derivative_rows + ] + required = {"decrypted_high", "decrypted_low"} + if ec.preview_enabled and ec.content_type.startswith("video/"): + required.add("decrypted_preview") + statuses = {kind: status for kind, status in derivative_rows} + if required and all(statuses.get(k) == "ready" for k in required): + conv_state = "ready" + elif any(statuses.get(k) == "failed" for k in required): + conv_state = "failed" + elif any(statuses.get(k) in ("processing", "pending") for k in required): + conv_state = "processing" + elif required: + conv_state = "pending" + else: + conv_state = "not_started" + conversion = {"state": conv_state, "details": details} + return response.json({ "id": row.id, "state": row.state, "encrypted_cid": row.encrypted_cid, + "encrypted_hash": encrypted_hash, "size_bytes": row.size_bytes, "error": row.error, + "conversion": conversion, }) - diff --git a/app/api/routes/upload_tus.py b/app/api/routes/upload_tus.py index 9fcbcbb..27a4e77 100644 --- a/app/api/routes/upload_tus.py +++ b/app/api/routes/upload_tus.py @@ -6,16 +6,21 @@ import os from datetime import datetime from typing import Dict, Any +import aiofiles from base58 import b58encode from sanic import response +from app.core._config import UPLOADS_DIR from app.core._secrets import hot_pubkey from app.core.crypto.aes_gcm_stream import encrypt_file_to_encf, CHUNK_BYTES from app.core.crypto.keywrap import wrap_dek, KeyWrapError from app.core.ipfs_client import add_streamed_file from app.core.logger import make_log from app.core.models.content_v3 import EncryptedContent, ContentKey, IpfsSync, ContentIndexItem, UploadSession +from app.core.models.node_storage import StoredContent from app.core.storage import db_session +from app.core._utils.resolve_content import resolve_content +from sqlalchemy import select def _b64(s: bytes) -> str: @@ -139,6 +144,12 @@ async def s_api_v1_upload_tus_hook(request): except Exception: enc_size = None + encrypted_cid_obj, cid_err = resolve_content(encrypted_cid) + if cid_err: + make_log("tus-hook", f"Encrypted CID resolve failed: {cid_err}", level="error") + return response.json({"ok": False, "error": "INVALID_ENCRYPTED_CID"}, status=500) + encrypted_hash_b58 = encrypted_cid_obj.content_hash_b58 + # Persist records async with db_session() as session: ec = EncryptedContent( @@ -165,6 +176,7 @@ async def s_api_v1_upload_tus_hook(request): allow_auto_grant=True, ) session.add(ck) + await session.flush() sync = IpfsSync( content_id=ec.id, @@ -175,6 +187,32 @@ async def s_api_v1_upload_tus_hook(request): ) session.add(sync) + existing_encrypted_content = (await session.execute( + select(StoredContent).where(StoredContent.hash == encrypted_hash_b58) + )).scalars().first() + if not existing_encrypted_content: + placeholder_meta = { + 'content_type': content_type, + 'storage': 'ipfs', + 'encrypted_cid': encrypted_cid, + 'upload_id': upload_id, + 'source': 'tusd' + } + encrypted_stored_content = StoredContent( + type="local/encrypted_ipfs", + hash=encrypted_hash_b58, + content_id=encrypted_cid, + filename=os.path.basename(file_path), + meta=placeholder_meta, + user_id=request.ctx.user.id if request.ctx.user else None, + owner_address=None, + encrypted=True, + decrypted_content_id=None, + key_id=None, + created=datetime.utcnow(), + ) + session.add(encrypted_stored_content) + # Publish signed index item item = { "encrypted_cid": encrypted_cid, @@ -206,6 +244,9 @@ async def s_api_v1_upload_tus_hook(request): if us: us.state = 'pinned' us.encrypted_cid = encrypted_cid + us.error = None + if size: + us.size_bytes = size # prefer using IPFS for downstream conversion; remove staging try: if file_path and os.path.exists(file_path): @@ -216,4 +257,15 @@ async def s_api_v1_upload_tus_hook(request): await session.commit() make_log("tus-hook", f"Uploaded+encrypted {file_path} -> {encrypted_cid}") + placeholder_path = os.path.join(UPLOADS_DIR, encrypted_hash_b58) + if not os.path.exists(placeholder_path): + try: + async with aiofiles.open(placeholder_path, "wb") as ph: + await ph.write(json.dumps({ + "ipfs_cid": encrypted_cid, + "note": "Encrypted payload stored in IPFS" + }).encode()) + except Exception as e: + make_log("tus-hook", f"Failed to create placeholder for {encrypted_hash_b58}: {e}", level="warning") + return response.json({"ok": True, "encrypted_cid": encrypted_cid, "upload_id": upload_id}) diff --git a/app/bot/routers/content.py b/app/bot/routers/content.py index 6bacb75..bc69cc6 100644 --- a/app/bot/routers/content.py +++ b/app/bot/routers/content.py @@ -1,12 +1,16 @@ import base58 from aiogram import types, Router, F +from collections import defaultdict +from datetime import datetime +from typing import Optional from app.core._config import WEB_APP_URLS from app.core._keyboards import get_inline_keyboard from app.core._utils.tg_process_template import tg_process_template from app.core.logger import make_log from app.core.models.node_storage import StoredContent -from sqlalchemy import select, and_ +from app.core.models.content_v3 import UploadSession, EncryptedContent, ContentDerivative +from sqlalchemy import select, and_, or_ import json router = Router() @@ -18,26 +22,144 @@ def chunks(lst, n): yield lst[i:i + n] +async def _compute_content_status(db_session, encrypted_cid: Optional[str], fallback_content_type: Optional[str] = None): + if not encrypted_cid: + return { + 'final_state': 'uploaded', + 'conversion_state': 'pending', + 'upload_state': None, + 'summary': {}, + 'details': [], + 'title': None, + 'content_type': fallback_content_type, + } + + ec = (await db_session.execute(select(EncryptedContent).where(EncryptedContent.encrypted_cid == encrypted_cid))).scalars().first() + content_type = fallback_content_type or (ec.content_type if ec else None) or 'application/octet-stream' + + derivative_rows = [] + if ec: + derivative_rows = (await db_session.execute(select(ContentDerivative).where(ContentDerivative.content_id == ec.id))).scalars().all() + upload_row = (await db_session.execute(select(UploadSession).where(UploadSession.encrypted_cid == encrypted_cid))).scalars().first() + + derivative_sorted = sorted(derivative_rows, key=lambda row: row.created_at or datetime.min) + derivative_latest = {} + summary = defaultdict(int) + details = [] + for row in derivative_sorted: + derivative_latest[row.kind] = row + for kind, row in derivative_latest.items(): + summary[row.status] += 1 + details.append({ + 'kind': kind, + 'status': row.status, + 'size_bytes': row.size_bytes, + 'error': row.error, + 'updated_at': (row.last_access_at or row.created_at).isoformat() + 'Z' if (row.last_access_at or row.created_at) else None, + }) + + required = {'decrypted_low', 'decrypted_high'} + if content_type.startswith('video/'): + required.add('decrypted_preview') + + statuses_by_kind = {kind: derivative_latest[kind].status for kind in required if kind in derivative_latest} + conversion_state = 'pending' + if required and all(statuses_by_kind.get(kind) == 'ready' for kind in required): + conversion_state = 'ready' + elif any(statuses_by_kind.get(kind) == 'failed' for kind in required): + conversion_state = 'failed' + elif any(statuses_by_kind.get(kind) in ('processing', 'pending') for kind in required): + conversion_state = 'processing' + elif statuses_by_kind: + conversion_state = 'partial' + + upload_state = upload_row.state if upload_row else None + final_state = 'ready' if conversion_state == 'ready' else None + if not final_state: + if conversion_state == 'failed' or upload_state in ('failed', 'conversion_failed'): + final_state = 'failed' + elif conversion_state in ('processing', 'partial') or upload_state in ('processing', 'pinned'): + final_state = 'processing' + else: + final_state = 'uploaded' + + return { + 'final_state': final_state, + 'conversion_state': conversion_state, + 'upload_state': upload_state, + 'summary': dict(summary), + 'details': details, + 'title': ec.title if ec else None, + 'content_type': content_type, + } + + async def t_callback_owned_content(query: types.CallbackQuery, memory=None, user=None, db_session=None, chat_wrap=None, **extra): message_text = user.translated("ownedContent_menu") content_list = [] user_addr = await user.wallet_address_async(db_session) - result = await db_session.execute(select(StoredContent).where( - and_(StoredContent.owner_address == user_addr, StoredContent.type == 'onchain/content') - )) - for content in result.scalars().all(): - try: - metadata_content = await StoredContent.from_cid_async(db_session, content.json_format()['metadata_cid']) - with open(metadata_content.filepath, 'r') as f: - metadata_content_json = json.loads(f.read()) - except BaseException as e: - make_log("OwnedContent", f"Can't get metadata content: {e}", level='warning') - continue + conditions = [] + if user_addr: + conditions.append(and_(StoredContent.owner_address == user_addr, StoredContent.type.like('onchain%'))) + conditions.append(and_(StoredContent.user_id == user.id, StoredContent.type.like('local/%'))) + if not conditions: + conditions = [StoredContent.user_id == user.id] + + stmt = select(StoredContent).where( + StoredContent.disabled.is_(None), + or_(*conditions) if len(conditions) > 1 else conditions[0] + ).order_by(StoredContent.created.desc()) + + rows = (await db_session.execute(stmt)).scalars().all() + + onchain_hashes = set() + local_items = [] + + icon_map = { + 'ready': '✅', + 'processing': '⏳', + 'failed': '⚠️', + 'uploaded': '📦', + } + + for content in rows: + meta = content.meta or {} + encrypted_cid = meta.get('content_cid') or meta.get('encrypted_cid') or content.content_id + status_info = await _compute_content_status(db_session, encrypted_cid, meta.get('content_type')) + icon = icon_map.get(status_info['final_state'], '📦') + + if content.type.startswith('onchain'): + try: + metadata_content = await StoredContent.from_cid_async(db_session, content.json_format()['metadata_cid']) + with open(metadata_content.filepath, 'r') as f: + metadata_content_json = json.loads(f.read()) + except BaseException as e: + make_log("OwnedContent", f"Can't get metadata content: {e}", level='warning') + continue + + onchain_hashes.add(content.hash) + display_name = metadata_content_json.get('name') or content.cid.serialize_v2() + content_list.append([ + { + 'text': f"{icon} {display_name}"[:64], + 'callback_data': f'NC_{content.id}' + } + ]) + else: + local_items.append((content, status_info, icon)) + + for content, status_info, icon in local_items: + if content.hash in onchain_hashes: + continue + meta = content.meta or {} + encrypted_cid = meta.get('encrypted_cid') or content.content_id + display_name = status_info['title'] or content.filename or content.cid.serialize_v2() + button_text = f"{icon} {display_name}" content_list.append([ { - 'text': metadata_content_json['name'], - 'callback_data': f'NC_{content.id}' + 'text': button_text[:64], + 'callback_data': f'LC_{content.id}' } ]) @@ -77,3 +199,51 @@ async def t_callback_node_content(query: types.CallbackQuery, memory=None, user= router.callback_query.register(t_callback_owned_content, F.data == 'ownedContent') router.callback_query.register(t_callback_node_content, F.data.startswith('NC_')) + + +async def t_callback_local_content(query: types.CallbackQuery, memory=None, user=None, db_session=None, chat_wrap=None, **extra): + content_oid = int(query.data.split('_')[1]) + content = (await db_session.execute(select(StoredContent).where(StoredContent.id == content_oid))).scalars().first() + if not content: + return await query.answer(user.translated('error_contentNotFound'), show_alert=True) + + upload_id = (content.meta or {}).get('upload_id') + upload_session = await db_session.get(UploadSession, upload_id) if upload_id else None + + encrypted_cid = (content.meta or {}).get('encrypted_cid') or content.content_id + status_info = await _compute_content_status(db_session, encrypted_cid, (content.meta or {}).get('content_type')) + display_name = status_info['title'] or content.filename or content.cid.serialize_v2() + state_label = { + 'ready': 'Готов', + 'processing': 'Обработка', + 'failed': 'Ошибка', + 'uploaded': 'Загружено', + }.get(status_info['final_state'], 'Статус неизвестен') + + lines = [ + f"{display_name}", + f"Состояние: {state_label}" + ] + if upload_session: + lines.append(f"Статус загрузки: {upload_session.state}") + if upload_session.error: + lines.append(f"Ошибка: {upload_session.error}") + if status_info['summary']: + lines.append("Конвертация:") + for status, count in status_info['summary'].items(): + lines.append(f"• {status}: {count}") + + await chat_wrap.send_message( + '\n'.join(lines), + message_type='notification', + message_meta={'content_id': content.id}, + reply_markup=get_inline_keyboard([ + [{ + 'text': user.translated('back_button'), + 'callback_data': 'ownedContent' + }] + ]) + ) + + +router.callback_query.register(t_callback_local_content, F.data.startswith('LC_')) diff --git a/app/client_bot/routers/home.py b/app/client_bot/routers/home.py index 5125f2f..6422578 100644 --- a/app/client_bot/routers/home.py +++ b/app/client_bot/routers/home.py @@ -86,7 +86,10 @@ async def t_home_menu(__msg, **extra): make_log("Home", f"Home menu args: {args}", level='debug') if args: if args[0].startswith('C'): - content = StoredContent.from_cid(db_session, args[0][1:]) + payload = args[0][1:] + if '!' in payload: + payload = payload.split('!', 1)[0] + content = StoredContent.from_cid(db_session, payload) return await chat_wrap.send_content(db_session, content, message_id=message_id) return await send_home_menu(chat_wrap, user, wallet_connection, message_id=message_id) diff --git a/app/core/_utils/resolve_content.py b/app/core/_utils/resolve_content.py index be48459..a866d4d 100644 --- a/app/core/_utils/resolve_content.py +++ b/app/core/_utils/resolve_content.py @@ -2,8 +2,9 @@ from app.core.content.content_id import ContentId def resolve_content(content_id) -> ContentId: # -> [content, error] + if isinstance(content_id, ContentId): + return content_id, None try: return ContentId.deserialize(content_id), None except BaseException as e: return None, f"{e}" - diff --git a/app/core/_utils/share_links.py b/app/core/_utils/share_links.py new file mode 100644 index 0000000..fb3a279 --- /dev/null +++ b/app/core/_utils/share_links.py @@ -0,0 +1,24 @@ +from typing import Optional +from urllib.parse import urlencode + +STARTAPP_SEPARATOR = '!' +STARTAPP_LIMIT = 64 + + +def build_content_links(content_token: str, ref_id: Optional[str], *, project_host: str, bot_username: str): + """Return tuple of (startapp_payload, telegram_url, web_url).""" + payload = content_token + short_ref = (ref_id or '').strip()[:3] + if short_ref: + candidate = f"{content_token}{STARTAPP_SEPARATOR}{short_ref}" + if len(candidate) <= STARTAPP_LIMIT: + payload = candidate + + telegram_url = f"https://t.me/{bot_username}/content?startapp={payload}" + + query = [('content', content_token)] + if ref_id: + query.append(('ref', ref_id)) + web_url = f"{project_host}/viewContent?{urlencode(query)}" + + return payload, telegram_url, web_url diff --git a/app/core/background/convert_service.py b/app/core/background/convert_service.py index 4dd660c..678879f 100644 --- a/app/core/background/convert_service.py +++ b/app/core/background/convert_service.py @@ -136,8 +136,7 @@ async def convert_loop(memory): ] if trim_value: cmd.extend(["--trim", trim_value]) - if content_kind == "audio": - cmd.append("--audio-only") # audio-only flag + # converter auto-detects audio/video, no explicit flag required process = await asyncio.create_subprocess_exec( *cmd, diff --git a/app/core/background/convert_v3_service.py b/app/core/background/convert_v3_service.py index 0ac26e6..f950ba7 100644 --- a/app/core/background/convert_v3_service.py +++ b/app/core/background/convert_v3_service.py @@ -2,8 +2,11 @@ import asyncio import os import json import shutil +import tempfile +from dataclasses import dataclass from datetime import datetime -from typing import List, Tuple, Optional +from pathlib import Path +from typing import List, Optional, Tuple from sqlalchemy import select @@ -22,9 +25,41 @@ from app.core.crypto.encf_stream import decrypt_encf_auto from app.core.crypto.keywrap import unwrap_dek, wrap_dek, KeyWrapError from app.core.network.key_client import request_key_from_peer from app.core.models.my_network import KnownNode +from app.core._utils.resolve_content import resolve_content +from app.core.content.content_id import ContentId CONCURRENCY = int(os.getenv("CONVERT_V3_MAX_CONCURRENCY", "3")) +STAGING_SUBDIR = os.getenv("CONVERT_V3_STAGING_SUBDIR", "convert-staging") +UPLOADS_PATH = Path(UPLOADS_DIR).resolve() +_host_uploads_env = os.getenv("BACKEND_DATA_DIR_HOST") +HOST_UPLOADS_PATH = Path(_host_uploads_env).resolve() if _host_uploads_env else None + + +@dataclass +class PlainStaging: + container_path: str + host_path: str + + +def _container_to_host(path: str) -> str: + """Map a container path under UPLOADS_DIR to the host path for docker -v.""" + if not HOST_UPLOADS_PATH: + raise RuntimeError("BACKEND_DATA_DIR_HOST is not configured for convert_v3") + real_path = Path(path).resolve() + try: + real_path.relative_to(UPLOADS_PATH) + except ValueError: + # Not under uploads; best effort fallback to original string + return str(real_path) + rel = real_path.relative_to(UPLOADS_PATH) + return str(HOST_UPLOADS_PATH / rel) + + +MEDIA_CONVERTER_CPU_LIMIT = os.getenv("MEDIA_CONVERTER_CPU_LIMIT") +MEDIA_CONVERTER_MEM_LIMIT = os.getenv("MEDIA_CONVERTER_MEM_LIMIT") +MEDIA_CONVERTER_CPUSET = os.getenv("MEDIA_CONVERTER_CPUSET") or os.getenv("CONVERT_CPUSET") +ERROR_TRUNCATE_LIMIT = 512 def _ensure_dir(path: str): @@ -57,27 +92,49 @@ async def _save_derivative(file_path: str, filename: str) -> Tuple[str, int]: return file_hash, size -async def _run_media_converter(input_host_path: str, input_ext: str, quality: str, trim_value: Optional[str], is_audio: bool) -> Tuple[str, dict]: +async def _run_media_converter(staging: PlainStaging, input_ext: str, quality: str, trim_value: Optional[str], is_audio: bool): + if not os.path.exists(staging.container_path): + raise FileNotFoundError(f"Plain input missing at {staging.container_path}") + + host_input_path = staging.host_path + if not host_input_path or not host_input_path.startswith('/'): + host_input_path = os.path.abspath(host_input_path) + rid = __import__('uuid').uuid4().hex[:8] - output_dir_container = f"/tmp/conv_{rid}" - output_dir_host = f"/tmp/conv_{rid}" - _ensure_dir(output_dir_host) - logs_dir_host = BACKEND_LOGS_DIR_HOST - _ensure_dir(logs_dir_host) + output_dir_container = UPLOADS_PATH / "convert-output" / f"conv_{rid}" + output_dir_host = _container_to_host(output_dir_container) + _ensure_dir(str(output_dir_container)) + + logs_dir_candidate = os.getenv("BACKEND_LOGS_DIR_HOST", "") + logs_dir_host = logs_dir_candidate if logs_dir_candidate else str(HOST_UPLOADS_PATH / "logs" / "converter") if HOST_UPLOADS_PATH else "/tmp/converter-logs" + if not logs_dir_host.startswith('/'): + logs_dir_host = os.path.join(os.getcwd(), logs_dir_host) + try: + os.makedirs(logs_dir_host, exist_ok=True) + except Exception: + fallback_logs = HOST_UPLOADS_PATH / "logs" / "converter" if HOST_UPLOADS_PATH else Path("/tmp/converter-logs") + logs_dir_host = str(fallback_logs) + os.makedirs(logs_dir_host, exist_ok=True) cmd = [ "docker", "run", "--rm", - "-v", f"{input_host_path}:/app/input:ro", + "-v", f"{host_input_path}:/app/input:ro", "-v", f"{output_dir_host}:/app/output", "-v", f"{logs_dir_host}:/app/logs", - "media_converter", - "--ext", input_ext, - "--quality", quality, ] + if MEDIA_CONVERTER_CPU_LIMIT: + cmd.extend(["--cpus", str(MEDIA_CONVERTER_CPU_LIMIT)]) + if MEDIA_CONVERTER_MEM_LIMIT: + cmd.extend(["--memory", str(MEDIA_CONVERTER_MEM_LIMIT)]) + if MEDIA_CONVERTER_CPUSET: + cmd.extend(["--cpuset-cpus", MEDIA_CONVERTER_CPUSET]) + + cmd.append("media_converter") + cmd.extend(["--ext", input_ext, "--quality", quality]) if trim_value: cmd.extend(["--trim", trim_value]) - if is_audio: - cmd.append("--audio-only") + + make_log('convert_v3', f"Run media_converter cmd: {' '.join(cmd)}") proc = await asyncio.create_subprocess_exec( *cmd, @@ -90,15 +147,15 @@ async def _run_media_converter(input_host_path: str, input_ext: str, quality: st # Find produced media file and optional output.json try: - files = os.listdir(output_dir_host) + files = os.listdir(output_dir_container) except Exception as e: raise RuntimeError(f"Read output dir error: {e}") media_files = [f for f in files if f != "output.json"] if len(media_files) != 1: raise RuntimeError(f"Expected one media file, found {len(media_files)}: {media_files}") - output_media = os.path.join(output_dir_host, media_files[0]) + output_media = os.path.join(output_dir_container, media_files[0]) ffprobe_meta = {} - out_json = os.path.join(output_dir_host, "output.json") + out_json = os.path.join(output_dir_container, "output.json") if os.path.exists(out_json): try: with open(out_json, 'r') as f: @@ -108,24 +165,74 @@ async def _run_media_converter(input_host_path: str, input_ext: str, quality: st return output_media, ffprobe_meta -async def _convert_content(ec: EncryptedContent, input_host_path: str): - content_kind = 'audio' if ec.content_type.startswith('audio/') else ('video' if ec.content_type.startswith('video/') else 'other') - if content_kind == 'other': - return +async def _update_upload_session(ec: EncryptedContent, all_success: bool, errors: List[str]): + async with db_session() as session: + upload_row = (await session.execute( + select(UploadSession).where(UploadSession.encrypted_cid == ec.encrypted_cid) + )).scalars().first() + if upload_row: + if all_success: + upload_row.state = 'converted' + upload_row.error = None + elif upload_row.state != 'converted': + upload_row.state = 'conversion_failed' + if errors: + upload_row.error = _short_error(errors[0]) + await session.commit() + +async def _convert_content(ec: EncryptedContent, staging: PlainStaging): + content_kind = 'audio' if ec.content_type.startswith('audio/') else ('video' if ec.content_type.startswith('video/') else 'other') input_ext = (ec.content_type.split('/')[-1] or 'bin') is_audio = content_kind == 'audio' - # Required outputs - required = ['high', 'low', 'low_preview'] + encrypted_hash_b58 = ContentId.deserialize(ec.encrypted_cid).content_hash_b58 - # Preview interval + if content_kind == 'other': + errors: List[str] = [] + all_success = True + try: + file_hash, size_bytes = await _save_derivative(staging.container_path, staging.container_path) + plain_path = os.path.join(UPLOADS_DIR, file_hash) + plain_filename = f"{ec.encrypted_cid}.{input_ext}" if input_ext else ec.encrypted_cid + async with db_session() as session: + existing = (await session.execute(select(StoredContent).where(StoredContent.hash == file_hash))).scalars().first() + if not existing: + sc = StoredContent( + type="local/content_bin", + hash=file_hash, + user_id=None, + filename=plain_filename, + meta={'encrypted_cid': ec.encrypted_cid, 'kind': 'original'}, + created=datetime.utcnow(), + ) + session.add(sc) + await session.flush() + derivative = ContentDerivative( + content_id=ec.id, + kind='decrypted_original', + local_path=plain_path, + content_type=ec.content_type, + size_bytes=size_bytes, + status='ready', + ) + session.add(derivative) + await session.commit() + make_log('convert_v3', f"Stored original derivative for {ec.encrypted_cid}") + except Exception as e: + all_success = False + errors.append(str(e)) + make_log('convert_v3', f"Convert error {ec.encrypted_cid} opt=original: {e}", level='error') + await _update_upload_session(ec, all_success, errors) + return + + # audio/video path + required = ['high', 'low', 'low_preview'] conf = ec.preview_conf or {} intervals = conf.get('intervals') or [[0, int(conf.get('duration_ms', 30000))]] main_interval = intervals[0] - trim_value = None start_s = max(0, int(main_interval[0]) // 1000) dur_s = max(1, int((main_interval[1] - main_interval[0]) // 1000) or 30) - trim_value = f"{start_s},{dur_s}" + trim_value = f"{start_s}-{start_s + dur_s}" qualities = { 'high': 'high', @@ -133,81 +240,128 @@ async def _convert_content(ec: EncryptedContent, input_host_path: str): 'low_preview': 'low', } + all_success = True + errors: List[str] = [] + for opt in required: + derivative_kind = f"decrypted_{opt if opt != 'low_preview' else 'preview'}" + derivative_id: Optional[int] = None try: - # Mark derivative processing async with db_session() as session: cd = ContentDerivative( content_id=ec.id, - kind=f"decrypted_{opt if opt != 'low_preview' else 'preview'}", + kind=derivative_kind, interval_start_ms=main_interval[0] if opt == 'low_preview' else None, interval_end_ms=main_interval[1] if opt == 'low_preview' else None, local_path="", status='processing', ) session.add(cd) + await session.flush() + derivative_id = cd.id await session.commit() out_path, ffprobe = await _run_media_converter( - input_host_path=input_host_path, + staging=staging, input_ext=input_ext, quality=qualities[opt], trim_value=trim_value if opt == 'low_preview' else None, is_audio=is_audio, ) - # Save into store and StoredContent file_hash, size_bytes = await _save_derivative(out_path, os.path.basename(out_path)) async with db_session() as session: - sc = StoredContent( - type="local/content_bin", - hash=file_hash, - user_id=None, - filename=os.path.basename(out_path), - meta={'encrypted_cid': ec.encrypted_cid, 'kind': opt, 'ffprobe_meta': ffprobe}, - created=datetime.utcnow(), - ) - session.add(sc) - await session.flush() + sc = (await session.execute(select(StoredContent).where(StoredContent.hash == file_hash))).scalars().first() + meta_payload = {'encrypted_cid': ec.encrypted_cid, 'kind': opt, 'ffprobe_meta': ffprobe} + if sc: + sc.type = sc.type or "local/content_bin" + sc.filename = os.path.basename(out_path) + sc.meta = meta_payload + sc.updated = datetime.utcnow() + else: + sc = StoredContent( + type="local/content_bin", + hash=file_hash, + user_id=None, + filename=os.path.basename(out_path), + meta=meta_payload, + created=datetime.utcnow(), + ) + session.add(sc) + await session.flush() - # Update derivative record - cd = (await session.execute(select(ContentDerivative).where( - ContentDerivative.content_id == ec.id, - ContentDerivative.kind == (f"decrypted_{opt if opt != 'low_preview' else 'preview'}"), - ContentDerivative.status == 'processing' - ))).scalars().first() + encrypted_sc = (await session.execute(select(StoredContent).where(StoredContent.hash == encrypted_hash_b58))).scalars().first() + if encrypted_sc: + meta = dict(encrypted_sc.meta or {}) + converted = dict(meta.get('converted_content') or {}) + converted[opt] = file_hash + meta['converted_content'] = converted + encrypted_sc.meta = meta + if opt == 'high': + encrypted_sc.decrypted_content_id = sc.id + encrypted_sc.updated = datetime.utcnow() + + cd = await session.get(ContentDerivative, derivative_id) if derivative_id else None if cd: cd.local_path = os.path.join(UPLOADS_DIR, file_hash) cd.size_bytes = size_bytes - cd.content_type = ('audio/mpeg' if is_audio else 'video/mp4') if opt != 'high' else ec.content_type + if is_audio: + cd.content_type = 'audio/flac' if opt == 'high' else 'audio/mpeg' + else: + cd.content_type = ec.content_type if opt == 'high' else 'video/mp4' cd.status = 'ready' + cd.error = None await session.commit() + output_parent = Path(out_path).parent + shutil.rmtree(output_parent, ignore_errors=True) make_log('convert_v3', f"Converted {ec.encrypted_cid} opt={opt} -> {file_hash}") except Exception as e: make_log('convert_v3', f"Convert error {ec.encrypted_cid} opt={opt}: {e}", level='error') + all_success = False + errors.append(_short_error(e)) async with db_session() as session: - cd = ContentDerivative( - content_id=ec.id, - kind=f"decrypted_{opt if opt != 'low_preview' else 'preview'}", - status='failed', - error=str(e), - local_path="", - ) - session.add(cd) + cd = await session.get(ContentDerivative, derivative_id) if derivative_id else None + if cd: + cd.status = 'failed' + cd.error = _short_error(e) + else: + session.add(ContentDerivative( + content_id=ec.id, + kind=derivative_kind, + status='failed', + error=_short_error(e), + local_path="", + )) await session.commit() + await _update_upload_session(ec, all_success, errors) -async def _pick_pending(limit: int) -> List[Tuple[EncryptedContent, str]]: + +async def _pick_pending(limit: int) -> List[Tuple[EncryptedContent, PlainStaging]]: async with db_session() as session: # Find A/V contents with preview_enabled and no ready low/low_preview derivatives yet ecs = (await session.execute(select(EncryptedContent).where( EncryptedContent.preview_enabled == True ).order_by(EncryptedContent.created_at.desc()))).scalars().all() - picked: List[Tuple[EncryptedContent, str]] = [] + picked: List[Tuple[EncryptedContent, PlainStaging]] = [] for ec in ecs: + try: + cid_obj, cid_err = resolve_content(ec.encrypted_cid) + if cid_err: + make_log('convert_v3', f"Skip {ec.encrypted_cid}: resolve error {cid_err}", level='debug') + continue + encrypted_hash_b58 = cid_obj.content_hash_b58 + except Exception as exc: + make_log('convert_v3', f"Skip {ec.encrypted_cid}: resolve exception {exc}", level='warning') + continue + + sc = (await session.execute(select(StoredContent).where(StoredContent.hash == encrypted_hash_b58))).scalars().first() + if not sc or sc.onchain_index is None: + continue + # Check if derivatives already ready rows = (await session.execute(select(ContentDerivative).where(ContentDerivative.content_id == ec.id))).scalars().all() kinds_ready = {r.kind for r in rows if r.status == 'ready'} @@ -215,11 +369,11 @@ async def _pick_pending(limit: int) -> List[Tuple[EncryptedContent, str]]: if required.issubset(kinds_ready): continue # Always decrypt from IPFS using local or remote key - storage_path: Optional[str] = None + staging: Optional[PlainStaging] = None ck = (await session.execute(select(ContentKey).where(ContentKey.content_id == ec.id))).scalars().first() if ck: - storage_path = await stage_plain_from_ipfs(ec, ck.key_ciphertext_b64) - if not storage_path: + staging = await stage_plain_from_ipfs(ec, ck.key_ciphertext_b64) + if not staging: peers = (await session.execute(select(KnownNode))).scalars().all() for peer in peers: base_url = f"http://{peer.ip}:{peer.port}" @@ -240,12 +394,12 @@ async def _pick_pending(limit: int) -> List[Tuple[EncryptedContent, str]]: ) session.add(session_ck) await session.commit() - storage_path = await stage_plain_from_ipfs(ec, dek_b64) - if storage_path: + staging = await stage_plain_from_ipfs(ec, dek_b64) + if staging: break - if not storage_path or not os.path.exists(storage_path): + if not staging or not os.path.exists(staging.container_path): continue - picked.append((ec, storage_path)) + picked.append((ec, staging)) if len(picked) >= limit: break return picked @@ -254,14 +408,14 @@ async def _pick_pending(limit: int) -> List[Tuple[EncryptedContent, str]]: async def worker_loop(): sem = asyncio.Semaphore(CONCURRENCY) - async def _run_one(ec: EncryptedContent, input_path: str): + async def _run_one(ec: EncryptedContent, staging: PlainStaging): async with sem: try: - await _convert_content(ec, input_path) + await _convert_content(ec, staging) # After successful conversion, attempt to remove staging file to avoid duplicates try: - if input_path and input_path.startswith("/data/") and os.path.exists(input_path): - os.remove(input_path) + if staging and staging.container_path and os.path.exists(staging.container_path): + os.remove(staging.container_path) except Exception: pass except Exception as e: @@ -273,7 +427,7 @@ async def worker_loop(): if not batch: await asyncio.sleep(3) continue - tasks = [asyncio.create_task(_run_one(ec, path)) for (ec, path) in batch] + tasks = [asyncio.create_task(_run_one(ec, staging)) for (ec, staging) in batch] await asyncio.gather(*tasks) except Exception as e: make_log('convert_v3', f"loop error: {e}", level='error') @@ -285,15 +439,20 @@ async def main_fn(memory): await worker_loop() -async def stage_plain_from_ipfs(ec: EncryptedContent, dek_wrapped: str) -> Optional[str]: - """Download encrypted ENCF stream from IPFS and decrypt on the fly into a temp file.""" - import tempfile +async def stage_plain_from_ipfs(ec: EncryptedContent, dek_wrapped: str) -> Optional[PlainStaging]: + """Download encrypted ENCF stream from IPFS and decrypt on the fly into shared staging.""" + os.makedirs(UPLOADS_PATH / STAGING_SUBDIR, exist_ok=True) try: dek = unwrap_dek(dek_wrapped) except KeyWrapError as exc: make_log('convert_v3', f"unwrap failed for {ec.encrypted_cid}: {exc}", level='error') return None - tmp = tempfile.NamedTemporaryFile(prefix=f"dec_{ec.encrypted_cid[:8]}_", delete=False) + + tmp = tempfile.NamedTemporaryFile( + prefix=f"dec_{ec.encrypted_cid[:8]}_", + dir=UPLOADS_PATH / STAGING_SUBDIR, + delete=False, + ) tmp_path = tmp.name tmp.close() try: @@ -301,7 +460,8 @@ async def stage_plain_from_ipfs(ec: EncryptedContent, dek_wrapped: str) -> Optio async for ch in cat_stream(ec.encrypted_cid): yield ch await decrypt_encf_auto(_aiter(), dek, tmp_path) - return tmp_path + host_path = _container_to_host(tmp_path) + return PlainStaging(container_path=tmp_path, host_path=host_path) except Exception as e: make_log('convert_v3', f"decrypt from ipfs failed: {e}", level='error') try: @@ -312,3 +472,8 @@ async def stage_plain_from_ipfs(ec: EncryptedContent, dek_wrapped: str) -> Optio +def _short_error(message: str, limit: int = ERROR_TRUNCATE_LIMIT) -> str: + if not message: + return message + message = str(message) + return message if len(message) <= limit else message[: limit - 3] + '...' diff --git a/app/core/background/derivative_cache_janitor.py b/app/core/background/derivative_cache_janitor.py index 1898252..1ffddd4 100644 --- a/app/core/background/derivative_cache_janitor.py +++ b/app/core/background/derivative_cache_janitor.py @@ -25,14 +25,14 @@ async def _evict_over_ttl(now: datetime) -> int: removed = 0 # Pull TTL from ServiceConfig each time async with db_session() as session: - ttl_days = await ServiceConfig(session).get('DERIVATIVE_CACHE_TTL_DAYS', ENV_TTL_DAYS) - if int(ttl_days) <= 0: + ttl_days = int(await ServiceConfig(session).get('DERIVATIVE_CACHE_TTL_DAYS', ENV_TTL_DAYS)) + if ttl_days <= 0: return 0 async with db_session() as session: rows = (await session.execute(select(ContentDerivative).where(ContentDerivative.status == 'ready'))).scalars().all() for r in rows: la = r.last_access_at or r.created_at - if la and (now - la) > timedelta(days=TTL_DAYS): + if la and (now - la) > timedelta(days=ttl_days): try: if r.local_path and os.path.exists(r.local_path): os.remove(r.local_path) @@ -80,7 +80,11 @@ async def _evict_to_fit(): async def main_fn(memory): - make_log('derivative_janitor', f"Started (MAX_GB={MAX_GB}, TTL_DAYS={TTL_DAYS})", level='info') + async with db_session() as session: + cfg = ServiceConfig(session) + runtime_max_gb = float(await cfg.get('DERIVATIVE_CACHE_MAX_GB', ENV_MAX_GB)) + runtime_ttl_days = int(await cfg.get('DERIVATIVE_CACHE_TTL_DAYS', ENV_TTL_DAYS)) + make_log('derivative_janitor', f"Started (MAX_GB={runtime_max_gb}, TTL_DAYS={runtime_ttl_days})", level='info') while True: try: now = datetime.utcnow() diff --git a/app/core/background/index_scout_v3.py b/app/core/background/index_scout_v3.py index 0c16026..3c514f7 100644 --- a/app/core/background/index_scout_v3.py +++ b/app/core/background/index_scout_v3.py @@ -1,5 +1,6 @@ import asyncio -from typing import List +import os +from typing import List, Optional import httpx import random @@ -18,7 +19,7 @@ ENV_PIN_CONCURRENCY = int(os.getenv('SYNC_MAX_CONCURRENT_PINS', '4')) ENV_DISK_WATERMARK_PCT = int(os.getenv('SYNC_DISK_LOW_WATERMARK_PCT', '90')) -async def fetch_index(base_url: str, etag: str | None, since: str | None) -> tuple[List[dict], str | None]: +async def fetch_index(base_url: str, etag: Optional[str], since: Optional[str]) -> tuple[List[dict], Optional[str]]: try: headers = {} params = {} diff --git a/app/core/background/indexer_service.py b/app/core/background/indexer_service.py index af37852..3178c7b 100644 --- a/app/core/background/indexer_service.py +++ b/app/core/background/indexer_service.py @@ -1,4 +1,5 @@ import asyncio +import os from base64 import b64decode from datetime import datetime @@ -6,7 +7,7 @@ from base58 import b58encode from sqlalchemy import String, and_, desc, cast from tonsdk.boc import Cell from tonsdk.utils import Address -from app.core._config import CLIENT_TELEGRAM_BOT_USERNAME +from app.core._config import CLIENT_TELEGRAM_BOT_USERNAME, PROJECT_HOST from app.core._blockchain.ton.platform import platform from app.core._blockchain.ton.toncenter import toncenter from app.core._utils.send_status import send_status @@ -18,6 +19,10 @@ from app.core._utils.resolve_content import resolve_content from app.core.models.wallet_connection import WalletConnection from app.core._keyboards import get_inline_keyboard from app.core.models._telegram import Wrapped_CBotChat +from app.core._utils.share_links import build_content_links + + +MIN_ONCHAIN_INDEX = int(os.getenv("MIN_ONCHAIN_INDEX", "8")) from sqlalchemy import select, and_, desc from app.core.storage import db_session import os @@ -110,11 +115,15 @@ async def indexer_loop(memory, platform_found: bool, seqno: int) -> [bool, int]: )).scalars().first() last_known_index = last_known_index_.onchain_index if last_known_index_ else 0 last_known_index = max(last_known_index, 0) + if last_known_index < (MIN_ONCHAIN_INDEX - 1): + make_log( + "Indexer", + f"Adjusting last_known_index from {last_known_index} to {MIN_ONCHAIN_INDEX - 1} (MIN_ONCHAIN_INDEX)", + level="debug" + ) + last_known_index = MIN_ONCHAIN_INDEX - 1 make_log("Indexer", f"Last known index: {last_known_index}", level="debug") - if last_known_index_: - next_item_index = last_known_index + 1 - else: - next_item_index = 0 + next_item_index = last_known_index + 1 resolve_item_result = await toncenter.run_get_method(platform.address.to_string(1, 1, 1), 'get_nft_address_by_index', [['num', next_item_index]]) make_log("Indexer", f"Resolve item result: {resolve_item_result}", level="debug") @@ -141,6 +150,13 @@ async def indexer_loop(memory, platform_found: bool, seqno: int) -> [bool, int]: assert item_get_data_result['stack'][2][0] == 'num', "Item index is not a number" item_index = int(item_get_data_result['stack'][2][1], 16) + if item_index < MIN_ONCHAIN_INDEX: + make_log( + "Indexer", + f"Skip on-chain item {item_index}: below MIN_ONCHAIN_INDEX={MIN_ONCHAIN_INDEX}", + level="info" + ) + return platform_found, seqno assert item_index == next_item_index, "Item index mismatch" item_platform_address = Cell.one_from_boc(b64decode(item_get_data_result['stack'][3][1]['bytes'])).begin_parse().read_msg_addr() @@ -222,17 +238,32 @@ async def indexer_loop(memory, platform_found: bool, seqno: int) -> [bool, int]: if user: user_uploader_wrapper = Wrapped_CBotChat(memory._telegram_bot, chat_id=user.telegram_id, user=user, db_session=session) + ref_id = (user.meta or {}).get('ref_id') + if not ref_id: + ref_id = user.ensure_ref_id() + await session.commit() + + _, startapp_url, web_url = build_content_links( + encrypted_stored_content.cid.serialize_v2(), + ref_id, + project_host=PROJECT_HOST, + bot_username=CLIENT_TELEGRAM_BOT_USERNAME + ) + + message_text = user.translated('p_contentWasIndexed').format( + item_address=item_address.to_string(1, 1, 1), + item_index=item_index, + ) + message_text += f"\n\n🔗 Открыть контент" + await user_uploader_wrapper.send_message( - user.translated('p_contentWasIndexed').format( - item_address=item_address.to_string(1, 1, 1), - item_index=item_index, - ), + message_text, message_type='notification', reply_markup=get_inline_keyboard([ [{ 'text': user.translated('viewTrackAsClient_button'), - 'url': f"https://t.me/{CLIENT_TELEGRAM_BOT_USERNAME}?start=C{encrypted_stored_content.cid.serialize_v2()}" - }], + 'url': startapp_url + }] ]) ) @@ -263,6 +294,7 @@ async def indexer_loop(memory, platform_found: bool, seqno: int) -> [bool, int]: **encrypted_stored_content.meta, **item_metadata_packed } + encrypted_stored_content.content_id = item_content_cid_str await session.commit() return platform_found, seqno @@ -283,6 +315,7 @@ async def indexer_loop(memory, platform_found: bool, seqno: int) -> [bool, int]: encrypted=True, decrypted_content_id=None, key_id=None, + content_id=item_content_cid_str, updated=datetime.now() ) session.add(onchain_stored_content) diff --git a/app/core/background/license_service.py b/app/core/background/license_service.py index 67cc9a0..c824ae0 100644 --- a/app/core/background/license_service.py +++ b/app/core/background/license_service.py @@ -100,6 +100,7 @@ async def license_index_loop(memory, platform_found: bool, seqno: int) -> [bool, process_content = (await session.execute(select(UserContent).where( and_( UserContent.type.startswith('nft/'), + UserContent.type != 'nft/ignored', UserContent.updated < (datetime.now() - timedelta(minutes=60)), ) ).order_by(UserContent.updated.asc()))).scalars().first() diff --git a/app/core/content/content_id.py b/app/core/content/content_id.py index cb53baa..d9d79d0 100644 --- a/app/core/content/content_id.py +++ b/app/core/content/content_id.py @@ -1,3 +1,6 @@ +from base64 import b32decode +from typing import Optional, Tuple + from base58 import b58encode, b58decode from tonsdk.boc import begin_cell @@ -12,25 +15,50 @@ from app.core._utils.string_binary import string_to_bytes_fixed_size, bytes_to_s # cid_v2#_ cid_version:uint8 content_sha256:uint256 *[Param]s = CIDv2; class ContentId: + """Unified abstraction for legacy ContentID and ENCF/IPFS CID strings.""" + def __init__( self, - version: int = None, - content_hash: bytes = None, # only SHA256 - onchain_index: int = None, - accept_type: str = None, - encryption_key_sha256: bytes = None, + version: Optional[int] = None, + content_hash: Optional[bytes] = None, # only SHA256 + onchain_index: Optional[int] = None, + accept_type: Optional[str] = None, + encryption_key_sha256: Optional[bytes] = None, + *, + raw_value: Optional[str] = None, + cid_format: Optional[str] = None, + multibase_prefix: Optional[str] = None, + multicodec: Optional[int] = None, + multihash_code: Optional[int] = 0x12, + multihash_length: Optional[int] = 32, ): self.version = version self.content_hash = content_hash - self.onchain_index = onchain_index or -1 + self.onchain_index = onchain_index if onchain_index is not None else -1 self.accept_type = accept_type self.encryption_key_sha256 = encryption_key_sha256 if self.encryption_key_sha256: assert len(self.encryption_key_sha256) == 32, "Invalid encryption key length" + self._raw_value = raw_value + if cid_format: + self.cid_format = cid_format + else: + if self.version == 1: + self.cid_format = 'content_id_v1' + elif self.version == 2: + self.cid_format = 'content_id_v2' + else: + self.cid_format = 'content_id_v2' + self.multibase_prefix = multibase_prefix + self.multicodec = multicodec + self.multihash_code = multihash_code + self.multihash_length = multihash_length + @property def content_hash_b58(self) -> str: + assert self.content_hash, "Content hash is not set" return b58encode(self.content_hash).decode() @property @@ -38,6 +66,11 @@ class ContentId: return self.onchain_index if (not (self.onchain_index is None) and self.onchain_index >= 0) else None def serialize_v2(self, include_accept_type=False) -> str: + if self.cid_format == 'ipfs': + if self._raw_value: + return self._raw_value + return self._serialize_ipfs() + cid_bin = ( (2).to_bytes(1, 'big') # cid version + self.content_hash @@ -60,6 +93,8 @@ class ContentId: return b58encode(cid_bin).decode() def serialize_v1(self) -> str: + if self.cid_format == 'ipfs': + raise ValueError("Cannot serialize IPFS CID as ContentId v1") at_bin = string_to_bytes_fixed_size(self.accept_type, 15) assert len(self.content_hash) == 32, "Invalid hash length" if self.onchain_index < 0: @@ -133,13 +168,31 @@ class ContentId: @classmethod def deserialize(cls, cid: str): - cid_version = int.from_bytes(b58decode(cid)[0:1], 'big') + if not cid: + raise ValueError("Empty content id provided") + + first_char = cid[0] + if first_char in ('b', 'B', 'z', 'Z'): + return cls.from_ipfs(cid) + + try: + cid_version = int.from_bytes(b58decode(cid)[0:1], 'big') + except Exception: + cid_version = None + if cid_version == 1: - return cls.from_v1(cid) - elif cid_version == 2: - return cls.from_v2(cid) - else: - raise ValueError("Invalid cid version") + obj = cls.from_v1(cid) + obj._raw_value = cid + return obj + if cid_version == 2: + obj = cls.from_v2(cid) + obj._raw_value = cid + return obj + + try: + return cls.from_ipfs(cid) + except Exception as exc: + raise ValueError(f"Invalid cid format: {exc}") from exc def json_format(self): return { @@ -147,7 +200,130 @@ class ContentId: "content_hash": self.content_hash_b58, "onchain_index": self.safe_onchain_index, "accept_type": self.accept_type, - "encryption_key_sha256": b58encode(self.encryption_key_sha256).decode() if self.encryption_key_sha256 else None + "encryption_key_sha256": b58encode(self.encryption_key_sha256).decode() if self.encryption_key_sha256 else None, + "format": self.cid_format, + "raw": self.serialize_v2() if self.cid_format == 'ipfs' else None, } + # --- helpers for IPFS/ENCF CID handling --------------------------------- + @staticmethod + def _decode_multibase(cid_str: str) -> Tuple[bytes, Optional[str]]: + prefix = cid_str[0] + if prefix in ('b', 'B'): + payload = cid_str[1:] + padding = (-len(payload)) % 8 + decoded = b32decode(payload.upper() + ('=' * padding), casefold=True) + return decoded, prefix.lower() + if prefix in ('z', 'Z'): + return b58decode(cid_str[1:]), prefix.lower() + # CIDv0 without explicit prefix + return b58decode(cid_str), None + + @staticmethod + def _read_varint(data: bytes, offset: int) -> Tuple[int, int]: + result = 0 + shift = 0 + while True: + if offset >= len(data): + raise ValueError("truncated varint") + byte = data[offset] + offset += 1 + result |= (byte & 0x7F) << shift + if not (byte & 0x80): + break + shift += 7 + if shift > 63: + raise ValueError("varint overflow") + return result, offset + + @classmethod + def from_ipfs(cls, cid: str): + cid = cid.strip() + payload, multibase_prefix = cls._decode_multibase(cid) + + idx = 0 + version: Optional[int] = None + codec: Optional[int] = None + + if multibase_prefix is not None: + version, idx = cls._read_varint(payload, idx) + if version not in (0, 1): + raise ValueError(f"unsupported CID version: {version}") + if version == 1: + codec, idx = cls._read_varint(payload, idx) + else: + codec = 0x70 # dag-pb default for CIDv0 + else: + # CIDv0 without explicit version/codec + version = 0 + codec = 0x70 + + multihash_code, idx = cls._read_varint(payload, idx) + multihash_length, idx = cls._read_varint(payload, idx) + digest = payload[idx:idx + multihash_length] + if len(digest) != multihash_length: + raise ValueError("truncated multihash digest") + if multihash_length != 32: + raise ValueError("unsupported multihash length (expected 32 bytes)") + if multihash_code != 0x12: + raise ValueError(f"unsupported multihash code: {hex(multihash_code)}") + + return cls( + version=version, + content_hash=digest, + onchain_index=None, + accept_type=None, + encryption_key_sha256=None, + raw_value=cid, + cid_format='ipfs', + multibase_prefix=multibase_prefix, + multicodec=codec, + multihash_code=multihash_code, + multihash_length=multihash_length, + ) + + def _serialize_ipfs(self) -> str: + if not self.content_hash: + raise ValueError("Cannot serialize IPFS CID without content hash") + if self.multibase_prefix is None: + # default to CIDv0 (base58btc) dag-pb + multihash = self._encode_varint(self.multihash_code or 0x12) + self._encode_varint(self.multihash_length or len(self.content_hash)) + self.content_hash + return b58encode(multihash).decode() + + version_bytes = self._encode_varint(self.version or 1) + codec_bytes = b'' + if (self.version or 1) == 1: + codec_bytes = self._encode_varint(self.multicodec or 0x70) + + multihash = ( + version_bytes + + codec_bytes + + self._encode_varint(self.multihash_code or 0x12) + + self._encode_varint(self.multihash_length or len(self.content_hash)) + + self.content_hash + ) + + if self.multibase_prefix == 'z': + return 'z' + b58encode(multihash).decode() + if self.multibase_prefix == 'b': + from base64 import b32encode + encoded = b32encode(multihash).decode().rstrip('=').lower() + return 'b' + encoded + # Fallback to base58btc without prefix + return b58encode(multihash).decode() + + @staticmethod + def _encode_varint(value: int) -> bytes: + if value < 0: + raise ValueError("varint cannot encode negative values") + out = bytearray() + while True: + to_write = value & 0x7F + value >>= 7 + if value: + out.append(to_write | 0x80) + else: + out.append(to_write) + break + return bytes(out) diff --git a/app/core/models/_telegram/templates/player.py b/app/core/models/_telegram/templates/player.py index 8f7d1bc..ca74726 100644 --- a/app/core/models/_telegram/templates/player.py +++ b/app/core/models/_telegram/templates/player.py @@ -11,6 +11,7 @@ import json import urllib from app.core.models.transaction import StarsInvoice +from app.core._utils.share_links import build_content_links class PlayerTemplates: @@ -59,12 +60,26 @@ class PlayerTemplates: cd_log += f"Can't get cover content: {e}. " cover_content = None + share_target = user_existing_license.onchain_address if user_existing_license else content.cid.serialize_v2() + ref_id = (self.user.meta or {}).get('ref_id') + if not ref_id: + ref_id = self.user.ensure_ref_id() + if self.db_session: + await self.db_session.commit() + + _, startapp_url, web_app_url = build_content_links( + share_target, + ref_id, + project_host=PROJECT_HOST, + bot_username=CLIENT_TELEGRAM_BOT_USERNAME + ) + content_share_link = { 'text': self.user.translated('p_shareLinkContext').format(title=content_metadata_json.get('name', "")), - 'url': f"https://t.me/{CLIENT_TELEGRAM_BOT_USERNAME}/content?startapp={content.cid.serialize_v2()}" + 'url': startapp_url, + 'web_url': web_app_url, + 'ref_id': ref_id } - if user_existing_license: - content_share_link['url'] = f"https://t.me/{CLIENT_TELEGRAM_BOT_USERNAME}/content?startapp={user_existing_license.onchain_address}" if cover_content: template_kwargs['photo'] = URLInputFile(cover_content.web_url) @@ -85,7 +100,8 @@ class PlayerTemplates: {content_hashtags} Этот контент был загружен в MY \t/ p2p content market / -
🔴 «открыть в MY»
""" +
🔴 «открыть в MY»
+
🌐 «открыть в браузере»
""" make_log("TG-Player", f"Send content {content_type} ({content_encoding}) to chat {self._chat_id}. {cd_log}") kmsgs = (await self.db_session.execute(select(KnownTelegramMessage).where( diff --git a/app/core/models/content/indexation_mixins.py b/app/core/models/content/indexation_mixins.py index 61ee5d5..072f027 100644 --- a/app/core/models/content/indexation_mixins.py +++ b/app/core/models/content/indexation_mixins.py @@ -1,7 +1,9 @@ +import os import traceback import base58 -from sqlalchemy import and_, select +from sqlalchemy import select +from datetime import datetime from app.core.logger import make_log from app.core.models import StoredContent @@ -42,6 +44,9 @@ class NodeStorageIndexationMixin: pass # async def fetch_onchain_metadata(self): +MIN_ONCHAIN_INDEX = int(os.getenv("MIN_ONCHAIN_INDEX", "8")) + + class UserContentIndexationMixin: async def sync_with_chain(self, db_session): errored = False @@ -54,12 +59,29 @@ class UserContentIndexationMixin: cc_indexator_data = unpack_item_indexator_data(cc_indexator_result) assert cc_indexator_data['type'] == 1, "Type is not a content" assert cc_indexator_data['address'] == self.onchain_address, "Address is not equal" + if cc_indexator_data['index'] < MIN_ONCHAIN_INDEX: + make_log( + "UserContent", + f"Skip license {self.onchain_address} with index {cc_indexator_data['index']} < MIN_ONCHAIN_INDEX={MIN_ONCHAIN_INDEX}", + level="info" + ) + self.type = 'nft/ignored' + self.content_id = None + self.updated = datetime.utcnow() + await db_session.commit() + return values_slice = cc_indexator_data['values'].begin_parse() content_hash_b58 = base58.b58encode(bytes.fromhex(hex(values_slice.read_uint(256))[2:])).decode() make_log("UserContent", f"License ({self.onchain_address}) content hash: {content_hash_b58}", level="info") stored_content = (await db_session.execute(select(StoredContent).where( - and_(StoredContent.type == 'onchain/content', StoredContent.hash == content_hash_b58) + StoredContent.hash == content_hash_b58 ))).scalars().first() + if not stored_content: + raise AssertionError(f"Stored content not found for hash={content_hash_b58}") + if not (stored_content.type or '').startswith('onchain/content'): + stored_content.type = 'onchain/content' if stored_content.key_id else 'onchain/content_unknown' + stored_content.onchain_index = stored_content.onchain_index or cc_indexator_data['index'] + stored_content.owner_address = stored_content.owner_address or cc_indexator_data['owner_address'] trusted_cop_address_result = await toncenter.run_get_method(stored_content.meta['item_address'], 'get_nft_address_by_index', [['num', cc_indexator_data['index']]]) assert trusted_cop_address_result.get('exit_code', -1) == 0, "Trusted cop address error" trusted_cop_address = Cell.one_from_boc(b64decode(trusted_cop_address_result['stack'][0][1]['bytes'])).begin_parse().read_msg_addr().to_string(1, 1, 1) @@ -77,5 +99,3 @@ class UserContentIndexationMixin: self.type = 'nft/unknown' self.content_id = None await db_session.commit() - - diff --git a/app/core/models/node_storage.py b/app/core/models/node_storage.py index cec6a67..bf56c56 100644 --- a/app/core/models/node_storage.py +++ b/app/core/models/node_storage.py @@ -53,6 +53,11 @@ class StoredContent(AlchemyBase, AudioContentMixin): @property def cid(self) -> ContentId: + if self.content_id: + try: + return ContentId.deserialize(self.content_id) + except Exception as exc: + make_log("StoredContent", f"Failed to deserialize stored content_id '{self.content_id}': {exc}", level='warning') return ContentId( content_hash=b58decode(self.hash), onchain_index=self.onchain_index, diff --git a/app/core/models/user/__init__.py b/app/core/models/user/__init__.py index 447352c..750918d 100644 --- a/app/core/models/user/__init__.py +++ b/app/core/models/user/__init__.py @@ -9,6 +9,10 @@ from app.core.translation import TranslationCore from ..base import AlchemyBase +_BASE62_ALPHABET = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" +_BASE62 = len(_BASE62_ALPHABET) + + class User(AlchemyBase, DisplayMixin, TranslationCore, AuthenticationMixin_V1, WalletMixin): LOCALE_DOMAIN = 'sanic_telegram_bot' @@ -32,3 +36,26 @@ class User(AlchemyBase, DisplayMixin, TranslationCore, AuthenticationMixin_V1, W def __str__(self): return f"User, {self.id}_{self.telegram_id} | Username: {self.username} " + '\\' + def ensure_ref_id(self): + """Return a short referral identifier, generating it if missing.""" + meta = self.meta or {} + ref_id = meta.get('ref_id') + if isinstance(ref_id, str) and ref_id: + return ref_id + + ref_id = self._generate_ref_id() + self.meta = {**meta, 'ref_id': ref_id} + return ref_id + + def _generate_ref_id(self): + user_id = int(self.id or 0) + if user_id <= 0: + return '000' + + value = user_id % (_BASE62 ** 3) + chars = [] + for _ in range(3): + chars.append(_BASE62_ALPHABET[value % _BASE62]) + value //= _BASE62 + + return ''.join(reversed(chars)) or '000' diff --git a/app/core/models/user/wallet_mixin.py b/app/core/models/user/wallet_mixin.py index d7e18ba..b577587 100644 --- a/app/core/models/user/wallet_mixin.py +++ b/app/core/models/user/wallet_mixin.py @@ -7,6 +7,11 @@ from tonsdk.utils import Address from datetime import datetime, timedelta from app.core.logger import make_log from httpx import AsyncClient +from app.core.models.content.indexation_mixins import unpack_item_indexator_data, MIN_ONCHAIN_INDEX + +def _platform_address_str() -> str: + from app.core._blockchain.ton.platform import platform + return platform.address.to_string(1, 1, 1) class WalletMixin: @@ -43,6 +48,43 @@ class WalletMixin: item_address = Address(nft_item['address']).to_string(1, 1, 1) owner_address = Address(nft_item['owner']['address']).to_string(1, 1, 1) + platform_address = _platform_address_str() + collection_address = None + if isinstance(nft_item, dict): + collection_data = nft_item.get('collection') + if isinstance(collection_data, dict): + collection_address = collection_data.get('address') + collection_address = collection_address or nft_item.get('collection_address') + if collection_address: + try: + normalized_collection = Address(collection_address).to_string(1, 1, 1) + except Exception: + normalized_collection = collection_address + if normalized_collection != platform_address: + make_log(self, f"Skip foreign NFT {item_address} from collection {normalized_collection}", level='debug') + continue + + item_index = None + # Prefer index from tonapi payload if available + raw_index = nft_item.get('index') if isinstance(nft_item, dict) else None + if isinstance(raw_index, int): + item_index = raw_index + if item_index is None: + try: + indexator_raw = await toncenter.run_get_method(item_address, 'indexator_data') + if indexator_raw.get('exit_code', -1) == 0: + item_index = unpack_item_indexator_data(indexator_raw)['index'] + except BaseException as err: + make_log(self, f"Failed to fetch indexator data for {item_address}: {err}", level='warning') + + if item_index is None: + make_log(self, f"Skip NFT {item_address}: unable to resolve on-chain index", level='warning') + continue + + if item_index is not None and item_index < MIN_ONCHAIN_INDEX: + make_log(self, f"Ignore NFT {item_address} with index {item_index} < MIN_ONCHAIN_INDEX={MIN_ONCHAIN_INDEX}", level='debug') + continue + from sqlalchemy import select user_content = (await db_session.execute(select(UserContent).where(UserContent.onchain_address == item_address))).scalars().first() if user_content: @@ -83,6 +125,33 @@ class WalletMixin: item_address = Address(nft_item['address']).to_string(1, 1, 1) owner_address = Address(nft_item['owner_address']).to_string(1, 1, 1) + platform_address = _platform_address_str() + collection_address = nft_item.get('collection_address') if isinstance(nft_item, dict) else None + if collection_address: + try: + normalized_collection = Address(collection_address).to_string(1, 1, 1) + except Exception: + normalized_collection = collection_address + if normalized_collection != platform_address: + make_log(self, f"Skip foreign NFT {item_address} from collection {normalized_collection}", level='debug') + continue + + item_index = None + try: + indexator_raw = await toncenter.run_get_method(item_address, 'indexator_data') + if indexator_raw.get('exit_code', -1) == 0: + item_index = unpack_item_indexator_data(indexator_raw)['index'] + except BaseException as err: + make_log(self, f"Failed to fetch indexator data for {item_address}: {err}", level='warning') + + if item_index is None: + make_log(self, f"Skip NFT {item_address}: unable to resolve on-chain index", level='warning') + continue + + if item_index is not None and item_index < MIN_ONCHAIN_INDEX: + make_log(self, f"Ignore NFT {item_address} with index {item_index} < MIN_ONCHAIN_INDEX={MIN_ONCHAIN_INDEX}", level='debug') + continue + from sqlalchemy import select user_content = (await db_session.execute(select(UserContent).where(UserContent.onchain_address == item_address))).scalars().first() if user_content: diff --git a/app/core/projscale_logger.py b/app/core/projscale_logger.py index c5ad668..3f3260d 100644 --- a/app/core/projscale_logger.py +++ b/app/core/projscale_logger.py @@ -1,13 +1,15 @@ -from datetime import datetime import logging import time import httpx import threading import os +from logging.handlers import TimedRotatingFileHandler PROJSCALE_APP_NAME = os.getenv('APP_PROJSCALE_NAME', 'my-uploader') LOGS_DIRECTORY = os.getenv('APP_LOGS_DIRECTORY', 'logs') os.makedirs(LOGS_DIRECTORY, exist_ok=True) +LOG_FILE_BASENAME = os.getenv('APP_LOG_FILE_BASENAME', 'app.log') +LOG_ROTATION_KEEP_HOURS = max(int(os.getenv('APP_LOG_ROTATION_KEEP_HOURS', '168')), 1) FORMAT_STRING = '%(asctime)s - %(levelname)s – %(pathname)s – %(funcName)s – %(lineno)d - %(message)s' @@ -62,8 +64,14 @@ projscale_handler = ProjscaleLoggingHandler() projscale_handler.setLevel(logging.DEBUG) logger.addHandler(projscale_handler) -log_filepath = f"{LOGS_DIRECTORY}/{datetime.now().strftime('%Y-%m-%d_%H')}.log" -file_handler = logging.FileHandler(log_filepath) +log_filepath = os.path.join(LOGS_DIRECTORY, LOG_FILE_BASENAME) +file_handler = TimedRotatingFileHandler( + log_filepath, + when='H', + interval=1, + backupCount=LOG_ROTATION_KEEP_HOURS, + utc=False +) file_handler.setLevel(logging.DEBUG) file_handler.setFormatter(logging.Formatter(FORMAT_STRING)) logger.addHandler(file_handler)