diff --git a/app/api/__init__.py b/app/api/__init__.py index 2a41ebb..fac6588 100644 --- a/app/api/__init__.py +++ b/app/api/__init__.py @@ -36,16 +36,19 @@ from app.api.routes.admin import ( s_api_v1_admin_blockchain, s_api_v1_admin_cache_cleanup, s_api_v1_admin_cache_setlimits, + s_api_v1_admin_licenses, s_api_v1_admin_login, s_api_v1_admin_logout, s_api_v1_admin_node_setrole, s_api_v1_admin_nodes, s_api_v1_admin_overview, + s_api_v1_admin_stars, s_api_v1_admin_status, s_api_v1_admin_storage, s_api_v1_admin_sync_setlimits, s_api_v1_admin_system, s_api_v1_admin_uploads, + s_api_v1_admin_users, ) from app.api.routes.tonconnect import s_api_v1_tonconnect_new, s_api_v1_tonconnect_logout from app.api.routes.keys import s_api_v1_keys_request @@ -98,6 +101,9 @@ app.add_route(s_api_v1_admin_logout, "/api/v1/admin.logout", methods=["POST", "O app.add_route(s_api_v1_admin_overview, "/api/v1/admin.overview", methods=["GET", "OPTIONS"]) app.add_route(s_api_v1_admin_storage, "/api/v1/admin.storage", methods=["GET", "OPTIONS"]) app.add_route(s_api_v1_admin_uploads, "/api/v1/admin.uploads", methods=["GET", "OPTIONS"]) +app.add_route(s_api_v1_admin_users, "/api/v1/admin.users", methods=["GET", "OPTIONS"]) +app.add_route(s_api_v1_admin_licenses, "/api/v1/admin.licenses", methods=["GET", "OPTIONS"]) +app.add_route(s_api_v1_admin_stars, "/api/v1/admin.stars", methods=["GET", "OPTIONS"]) app.add_route(s_api_v1_admin_system, "/api/v1/admin.system", methods=["GET", "OPTIONS"]) app.add_route(s_api_v1_admin_blockchain, "/api/v1/admin.blockchain", methods=["GET", "OPTIONS"]) app.add_route(s_api_v1_admin_node_setrole, "/api/v1/admin.node.setRole", methods=["POST", "OPTIONS"]) diff --git a/app/api/routes/admin.py b/app/api/routes/admin.py index 400e83c..4940782 100644 --- a/app/api/routes/admin.py +++ b/app/api/routes/admin.py @@ -11,7 +11,7 @@ from typing import Any, Dict, List, Optional from base58 import b58encode from sanic import response -from sqlalchemy import func, select +from sqlalchemy import Integer, String, and_, case, cast, func, or_, select from app.api.routes._system import get_git_info from app.core._blockchain.ton.platform import platform @@ -39,6 +39,9 @@ from app.core.models.tasks import BlockchainTask from app.core.models.node_storage import StoredContent from app.core.models.user import User from app.core.models.content.user_content import UserContent +from app.core.models.transaction import StarsInvoice +from app.core.models.wallet_connection import WalletConnection +from app.core.models.user_activity import UserActivity from app.core._utils.share_links import build_content_links from app.core.content.content_id import ContentId @@ -194,6 +197,17 @@ def _pick_primary_download(candidates: List[tuple[str, Optional[str], Optional[i return None +def _parse_bool_arg(value: Optional[str]) -> Optional[bool]: + if value is None: + return None + normalized = value.strip().lower() + if normalized in {'1', 'true', 'yes', 'y', 'on'}: + return True + if normalized in {'0', 'false', 'no', 'n', 'off'}: + return False + return None + + async def s_api_v1_admin_login(request): token = os.getenv('ADMIN_API_TOKEN') if not token: @@ -738,6 +752,868 @@ async def s_api_v1_admin_uploads(request): return response.json(payload) +async def s_api_v1_admin_users(request): + if (unauth := _ensure_admin(request)): + return unauth + + session = request.ctx.db_session + + try: + limit = int(request.args.get('limit') or 50) + except (TypeError, ValueError): + limit = 50 + limit = max(1, min(limit, 200)) + + try: + offset = int(request.args.get('offset') or 0) + except (TypeError, ValueError): + offset = 0 + offset = max(0, offset) + + search = (request.args.get('search') or '').strip() + + filters = [] + if search: + search_like = f"%{search}%" + clauses = [ + cast(User.id, String).ilike(search_like), + cast(User.telegram_id, String).ilike(search_like), + User.username.ilike(search_like), + User.meta['first_name'].astext.ilike(search_like), + User.meta['last_name'].astext.ilike(search_like), + ] + numeric_candidate = search.lstrip('-') + if numeric_candidate.isdigit(): + try: + search_int = int(search) + clauses.append(User.id == search_int) + clauses.append(User.telegram_id == search_int) + except ValueError: + pass + filters.append(or_(*clauses)) + + total_stmt = select(func.count()).select_from(User) + if filters: + total_stmt = total_stmt.where(and_(*filters)) + total = (await session.execute(total_stmt)).scalar_one() + + query_stmt = ( + select(User) + .order_by(User.last_use.desc()) + .offset(offset) + .limit(limit) + ) + if filters: + query_stmt = query_stmt.where(and_(*filters)) + + user_rows = (await session.execute(query_stmt)).scalars().all() + + base_payload = { + 'total': int(total or 0), + 'limit': limit, + 'offset': offset, + 'search': search or None, + 'has_more': (offset + limit) < int(total or 0), + } + + if not user_rows: + base_payload.update({ + 'items': [], + 'summary': { + 'users_returned': 0, + 'wallets_total': 0, + 'wallets_active': 0, + 'licenses_total': 0, + 'licenses_active': 0, + 'stars_total': 0, + 'stars_paid': 0, + 'stars_unpaid': 0, + 'stars_amount_paid': 0, + 'stars_amount_unpaid': 0, + 'unique_ips_total': 0, + }, + }) + return response.json(base_payload) + + user_ids = [user.id for user in user_rows if user.id is not None] + + wallet_map: Dict[int, List[WalletConnection]] = defaultdict(list) + if user_ids: + wallet_rows = (await session.execute( + select(WalletConnection) + .where(WalletConnection.user_id.in_(user_ids)) + .order_by(WalletConnection.created.desc()) + )).scalars().all() + for wallet in wallet_rows: + if wallet.user_id is not None: + wallet_map[wallet.user_id].append(wallet) + + content_stats_map: Dict[int, Dict[str, int]] = {} + if user_ids: + content_rows = (await session.execute( + select( + StoredContent.user_id, + func.count().label('total'), + func.sum(case((StoredContent.type.like('onchain%'), 1), else_=0)).label('onchain'), + func.sum(case((StoredContent.disabled.isnot(None), 1), else_=0)).label('disabled'), + ) + .where(StoredContent.user_id.in_(user_ids)) + .group_by(StoredContent.user_id) + )).all() + for user_id, total_cnt, onchain_cnt, disabled_cnt in content_rows: + if user_id is None: + continue + total_value = int(total_cnt or 0) + onchain_value = int(onchain_cnt or 0) + disabled_value = int(disabled_cnt or 0) + content_stats_map[user_id] = { + 'total': total_value, + 'onchain': onchain_value, + 'local': max(total_value - onchain_value, 0), + 'disabled': disabled_value, + } + + license_type_map: Dict[int, Dict[str, int]] = defaultdict(lambda: defaultdict(int)) + license_status_map: Dict[int, Dict[str, int]] = defaultdict(lambda: defaultdict(int)) + if user_ids: + license_type_rows = (await session.execute( + select(UserContent.user_id, UserContent.type, func.count()) + .where(UserContent.user_id.in_(user_ids)) + .group_by(UserContent.user_id, UserContent.type) + )).all() + for user_id, ltype, count_value in license_type_rows: + if user_id is None: + continue + license_type_map[user_id][ltype or 'unknown'] += int(count_value or 0) + + license_status_rows = (await session.execute( + select(UserContent.user_id, UserContent.status, func.count()) + .where(UserContent.user_id.in_(user_ids)) + .group_by(UserContent.user_id, UserContent.status) + )).all() + for user_id, status_value, count_value in license_status_rows: + if user_id is None: + continue + license_status_map[user_id][status_value or 'unknown'] += int(count_value or 0) + + stars_stats_map: Dict[int, Dict[str, int]] = {} + if user_ids: + stars_rows = (await session.execute( + select( + StarsInvoice.user_id, + func.count().label('total'), + func.sum(case((StarsInvoice.paid.is_(True), 1), else_=0)).label('paid'), + func.sum(case((or_(StarsInvoice.paid.is_(False), StarsInvoice.paid.is_(None)), 1), else_=0)).label('unpaid'), + func.sum(StarsInvoice.amount).label('amount_total'), + func.sum(case((StarsInvoice.paid.is_(True), StarsInvoice.amount), else_=0)).label('amount_paid'), + ) + .where(StarsInvoice.user_id.in_(user_ids)) + .group_by(StarsInvoice.user_id) + )).all() + for user_id, total_cnt, paid_cnt, unpaid_cnt, amt_total, amt_paid in stars_rows: + if user_id is None: + continue + total_value = int(total_cnt or 0) + paid_value = int(paid_cnt or 0) + unpaid_value = int(unpaid_cnt or max(total_value - paid_value, 0)) + amount_total_value = int(amt_total or 0) + amount_paid_value = int(amt_paid or 0) + amount_unpaid_value = max(amount_total_value - amount_paid_value, 0) + stars_stats_map[user_id] = { + 'total': total_value, + 'paid': paid_value, + 'unpaid': unpaid_value, + 'amount_total': amount_total_value, + 'amount_paid': amount_paid_value, + 'amount_unpaid': amount_unpaid_value, + } + + ip_counts_map: Dict[int, int] = {} + if user_ids: + ip_count_rows = (await session.execute( + select( + UserActivity.user_id, + func.count(func.distinct(UserActivity.user_ip)), + ) + .where(UserActivity.user_id.in_(user_ids), UserActivity.user_ip.isnot(None)) + .group_by(UserActivity.user_id) + )).all() + for user_id, ip_count in ip_count_rows: + if user_id is None: + continue + ip_counts_map[user_id] = int(ip_count or 0) + + latest_ip_map: Dict[int, Dict[str, Optional[str]]] = {} + if user_ids: + latest_ip_rows = (await session.execute( + select( + UserActivity.user_id, + UserActivity.user_ip, + UserActivity.type, + UserActivity.created, + ) + .where(UserActivity.user_id.in_(user_ids), UserActivity.user_ip.isnot(None)) + .order_by(UserActivity.user_id, UserActivity.created.desc()) + .distinct(UserActivity.user_id) + )).all() + for user_id, ip_value, activity_type, created_at in latest_ip_rows: + if user_id is None: + continue + latest_ip_map[user_id] = { + 'ip': ip_value, + 'type': activity_type, + 'seen_at': _format_dt(created_at), + } + + recent_ip_map: Dict[int, List[Dict[str, Optional[str]]]] = defaultdict(list) + if user_ids: + activity_limit = max(200, len(user_ids) * 10) + recent_ip_rows = (await session.execute( + select( + UserActivity.user_id, + UserActivity.user_ip, + UserActivity.type, + UserActivity.created, + ) + .where(UserActivity.user_id.in_(user_ids), UserActivity.user_ip.isnot(None)) + .order_by(UserActivity.created.desc()) + .limit(activity_limit) + )).all() + for user_id, ip_value, activity_type, created_at in recent_ip_rows: + if user_id is None or not ip_value: + continue + bucket = recent_ip_map[user_id] + if len(bucket) >= 5: + continue + bucket.append({ + 'ip': ip_value, + 'type': activity_type, + 'seen_at': _format_dt(created_at), + }) + + items: List[Dict[str, Any]] = [] + summary = { + 'users_returned': 0, + 'wallets_total': 0, + 'wallets_active': 0, + 'licenses_total': 0, + 'licenses_active': 0, + 'stars_total': 0, + 'stars_paid': 0, + 'stars_unpaid': 0, + 'stars_amount_total': 0, + 'stars_amount_paid': 0, + 'stars_amount_unpaid': 0, + 'unique_ips_total': 0, + } + + for user in user_rows: + summary['users_returned'] += 1 + meta = user.meta or {} + + wallet_list = wallet_map.get(user.id, []) + active_wallets = [wallet for wallet in wallet_list if not wallet.invalidated] + primary_wallet = active_wallets[0] if active_wallets else None + last_connected_wallet = active_wallets[0] if active_wallets else (wallet_list[0] if wallet_list else None) + wallet_payload = { + 'primary_address': primary_wallet.wallet_address if primary_wallet else None, + 'active_count': len(active_wallets), + 'total_count': len(wallet_list), + 'last_connected_at': _format_dt(last_connected_wallet.created) if last_connected_wallet else None, + 'connections': [ + { + 'id': wallet.id, + 'address': wallet.wallet_address, + 'network': wallet.network, + 'invalidated': wallet.invalidated, + 'created_at': _format_dt(wallet.created), + 'updated_at': _format_dt(wallet.updated), + } + for wallet in wallet_list[:3] + ], + } + summary['wallets_total'] += wallet_payload['total_count'] + summary['wallets_active'] += wallet_payload['active_count'] + + content_stats = content_stats_map.get(user.id, {'total': 0, 'onchain': 0, 'local': 0, 'disabled': 0}) + + license_types = dict(license_type_map.get(user.id, {})) + license_statuses = dict(license_status_map.get(user.id, {})) + licenses_total = sum(license_types.values()) if license_types else sum(license_statuses.values()) + licenses_active = license_statuses.get('active', 0) + summary['licenses_total'] += licenses_total + summary['licenses_active'] += licenses_active + + stars_stats = stars_stats_map.get( + user.id, + { + 'total': 0, + 'paid': 0, + 'unpaid': 0, + 'amount_total': 0, + 'amount_paid': 0, + 'amount_unpaid': 0, + }, + ) + summary['stars_total'] += stars_stats['total'] + summary['stars_paid'] += stars_stats['paid'] + summary['stars_unpaid'] += stars_stats['unpaid'] + summary['stars_amount_total'] += stars_stats['amount_total'] + summary['stars_amount_paid'] += stars_stats['amount_paid'] + summary['stars_amount_unpaid'] += stars_stats['amount_unpaid'] + + unique_ips = ip_counts_map.get(user.id, 0) + summary['unique_ips_total'] += unique_ips + recent_ips = recent_ip_map.get(user.id, []) + latest_ip = latest_ip_map.get(user.id) + + items.append({ + 'id': user.id, + 'telegram_id': user.telegram_id, + 'username': user.username, + 'first_name': meta.get('first_name'), + 'last_name': meta.get('last_name'), + 'lang_code': user.lang_code, + 'created_at': _format_dt(user.created), + 'updated_at': _format_dt(user.updated), + 'last_use': _format_dt(user.last_use), + 'meta': { + 'ref_id': meta.get('ref_id'), + 'referrer_id': meta.get('referrer_id'), + }, + 'wallets': wallet_payload, + 'content': content_stats, + 'licenses': { + 'total': licenses_total, + 'active': licenses_active, + 'by_type': license_types, + 'by_status': license_statuses, + }, + 'stars': { + 'total': stars_stats['total'], + 'paid': stars_stats['paid'], + 'unpaid': stars_stats['unpaid'], + 'amount_total': stars_stats['amount_total'], + 'amount_paid': stars_stats['amount_paid'], + 'amount_unpaid': stars_stats['amount_unpaid'], + }, + 'ip_activity': { + 'last': latest_ip or None, + 'unique_ips': unique_ips, + 'recent': recent_ips, + }, + }) + + base_payload.update({ + 'items': items, + 'summary': summary, + }) + return response.json(base_payload) + + +async def s_api_v1_admin_licenses(request): + if (unauth := _ensure_admin(request)): + return unauth + + session = request.ctx.db_session + + try: + limit = int(request.args.get('limit') or 50) + except (TypeError, ValueError): + limit = 50 + limit = max(1, min(limit, 200)) + + try: + offset = int(request.args.get('offset') or 0) + except (TypeError, ValueError): + offset = 0 + offset = max(0, offset) + + search = (request.args.get('search') or '').strip() + type_param = (request.args.get('type') or '').strip() + status_param = (request.args.get('status') or '').strip() + license_type_param = (request.args.get('license_type') or '').strip() + user_id_param = (request.args.get('user_id') or '').strip() + owner_address_param = (request.args.get('owner') or '').strip() + onchain_address_param = (request.args.get('address') or '').strip() + content_hash_param = (request.args.get('content_hash') or '').strip() + + filters = [] + applied_filters: Dict[str, Any] = {} + + if type_param: + type_values = [value.strip() for value in type_param.split(',') if value.strip()] + if type_values: + filters.append(UserContent.type.in_(type_values)) + applied_filters['type'] = type_values + + if status_param: + status_values = [value.strip() for value in status_param.split(',') if value.strip()] + if status_values: + filters.append(UserContent.status.in_(status_values)) + applied_filters['status'] = status_values + + if license_type_param: + lt_values: List[int] = [] + for part in license_type_param.split(','): + part = part.strip() + if not part: + continue + try: + lt_values.append(int(part)) + except ValueError: + continue + if lt_values: + license_type_expr = cast(UserContent.meta['license_type'].astext, Integer) + filters.append(license_type_expr.in_(lt_values)) + applied_filters['license_type'] = lt_values + + if user_id_param: + try: + filters.append(UserContent.user_id == int(user_id_param)) + applied_filters['user_id'] = int(user_id_param) + except ValueError: + pass + + if owner_address_param: + filters.append(UserContent.owner_address.ilike(f"%{owner_address_param}%")) + applied_filters['owner'] = owner_address_param + + if onchain_address_param: + filters.append(UserContent.onchain_address.ilike(f"%{onchain_address_param}%")) + applied_filters['address'] = onchain_address_param + + if content_hash_param: + stored_target = (await session.execute( + select(StoredContent.id).where(StoredContent.hash == content_hash_param) + )).scalars().first() + if stored_target is None: + payload = { + 'total': 0, + 'limit': limit, + 'offset': offset, + 'search': search or None, + 'filters': {**applied_filters, 'content_hash': content_hash_param}, + 'items': [], + 'counts': { + 'status': {}, + 'type': {}, + 'license_type': {}, + }, + 'has_more': False, + } + return response.json(payload) + filters.append(UserContent.content_id == int(stored_target)) + applied_filters['content_hash'] = content_hash_param + + if search: + search_like = f"%{search}%" + clauses = [ + cast(UserContent.id, String).ilike(search_like), + UserContent.onchain_address.ilike(search_like), + UserContent.owner_address.ilike(search_like), + ] + filters.append(or_(*clauses)) + + total_stmt = select(func.count()).select_from(UserContent) + if filters: + total_stmt = total_stmt.where(and_(*filters)) + total = (await session.execute(total_stmt)).scalar_one() + + base_payload = { + 'total': int(total or 0), + 'limit': limit, + 'offset': offset, + 'search': search or None, + 'filters': applied_filters, + 'has_more': (offset + limit) < int(total or 0), + } + + if not total: + base_payload.update({ + 'items': [], + 'counts': { + 'status': {}, + 'type': {}, + 'license_type': {}, + }, + }) + return response.json(base_payload) + + query_stmt = ( + select(UserContent) + .order_by(UserContent.created.desc()) + .offset(offset) + .limit(limit) + ) + if filters: + query_stmt = query_stmt.where(and_(*filters)) + + license_rows = (await session.execute(query_stmt)).scalars().all() + + user_ids = [row.user_id for row in license_rows if row.user_id is not None] + content_ids = [row.content_id for row in license_rows if row.content_id is not None] + wallet_ids = [row.wallet_connection_id for row in license_rows if row.wallet_connection_id is not None] + + users_map: Dict[int, User] = {} + if user_ids: + user_records = (await session.execute( + select(User).where(User.id.in_(user_ids)) + )).scalars().all() + for user in user_records: + users_map[user.id] = user + + wallet_map: Dict[int, WalletConnection] = {} + if wallet_ids: + wallet_records = (await session.execute( + select(WalletConnection).where(WalletConnection.id.in_(wallet_ids)) + )).scalars().all() + for wallet in wallet_records: + wallet_map[wallet.id] = wallet + + content_map: Dict[int, StoredContent] = {} + if content_ids: + content_records = (await session.execute( + select(StoredContent).where(StoredContent.id.in_(content_ids)) + )).scalars().all() + for content in content_records: + content_map[content.id] = content + + status_stmt = select(UserContent.status, func.count()).group_by(UserContent.status) + if filters: + status_stmt = status_stmt.where(and_(*filters)) + status_counts_rows = (await session.execute(status_stmt)).all() + status_counts = {status or 'unknown': int(count or 0) for status, count in status_counts_rows} + + type_stmt = select(UserContent.type, func.count()).group_by(UserContent.type) + if filters: + type_stmt = type_stmt.where(and_(*filters)) + type_counts_rows = (await session.execute(type_stmt)).all() + type_counts = {ctype or 'unknown': int(count or 0) for ctype, count in type_counts_rows} + + license_type_expr = func.coalesce(cast(UserContent.meta['license_type'].astext, Integer), -1) + license_type_stmt = select(license_type_expr.label('license_type'), func.count()).group_by('license_type') + if filters: + license_type_stmt = license_type_stmt.where(and_(*filters)) + license_type_counts_rows = (await session.execute(license_type_stmt)).all() + license_type_counts: Dict[str, int] = {} + for lt_value, count in license_type_counts_rows: + key = 'unknown' if lt_value in (None, -1) else str(int(lt_value)) + license_type_counts[key] = int(count or 0) + + items: List[Dict[str, Any]] = [] + for license_row in license_rows: + meta = license_row.meta or {} + license_type_value = meta.get('license_type') + + owner_user = users_map.get(license_row.user_id) + wallet_connection = wallet_map.get(license_row.wallet_connection_id) + stored_content = content_map.get(license_row.content_id) + + owner_payload = None + if owner_user: + owner_meta = owner_user.meta or {} + owner_payload = { + 'id': owner_user.id, + 'telegram_id': owner_user.telegram_id, + 'username': owner_user.username, + 'first_name': owner_meta.get('first_name'), + 'last_name': owner_meta.get('last_name'), + } + + wallet_payload = None + if wallet_connection: + wallet_payload = { + 'id': wallet_connection.id, + 'address': wallet_connection.wallet_address, + 'network': wallet_connection.network, + 'invalidated': wallet_connection.invalidated, + 'created_at': _format_dt(wallet_connection.created), + 'updated_at': _format_dt(wallet_connection.updated), + } + + content_payload = None + if stored_content: + stored_meta = stored_content.meta or {} + metadata_candidate = stored_meta.get('metadata') if isinstance(stored_meta.get('metadata'), dict) else {} + title_candidates = [ + stored_meta.get('title'), + metadata_candidate.get('name') if isinstance(metadata_candidate, dict) else None, + stored_meta.get('license', {}).get('title') if isinstance(stored_meta.get('license'), dict) else None, + ] + title_value = next((value for value in title_candidates if isinstance(value, str) and value.strip()), None) + try: + cid_value = stored_content.cid.serialize_v2() + except Exception: + cid_value = None + content_payload = { + 'id': stored_content.id, + 'hash': stored_content.hash, + 'cid': cid_value, + 'title': title_value or stored_content.hash, + 'type': stored_content.type, + 'owner_address': stored_content.owner_address, + 'onchain_index': stored_content.onchain_index, + 'user_id': stored_content.user_id, + 'download_url': stored_content.web_url, + } + + items.append({ + 'id': license_row.id, + 'type': license_row.type, + 'status': license_row.status, + 'license_type': license_type_value, + 'onchain_address': license_row.onchain_address, + 'owner_address': license_row.owner_address, + 'user': owner_payload, + 'wallet_connection': wallet_payload, + 'content': content_payload, + 'created_at': _format_dt(license_row.created), + 'updated_at': _format_dt(license_row.updated), + 'meta': meta, + 'links': { + 'tonviewer': f"https://tonviewer.com/{license_row.onchain_address}" if license_row.onchain_address else None, + 'content_view': f"{PROJECT_HOST}/api/v1/content.view/{license_row.onchain_address}" if license_row.onchain_address else None, + }, + }) + + base_payload.update({ + 'items': items, + 'counts': { + 'status': status_counts, + 'type': type_counts, + 'license_type': license_type_counts, + }, + }) + return response.json(base_payload) + + +async def s_api_v1_admin_stars(request): + if (unauth := _ensure_admin(request)): + return unauth + + session = request.ctx.db_session + + try: + limit = int(request.args.get('limit') or 50) + except (TypeError, ValueError): + limit = 50 + limit = max(1, min(limit, 200)) + + try: + offset = int(request.args.get('offset') or 0) + except (TypeError, ValueError): + offset = 0 + offset = max(0, offset) + + search = (request.args.get('search') or '').strip() + type_param = (request.args.get('type') or '').strip() + paid_param = _parse_bool_arg(request.args.get('paid')) + user_id_param = (request.args.get('user_id') or '').strip() + content_hash_param = (request.args.get('content_hash') or '').strip() + + filters = [] + applied_filters: Dict[str, Any] = {} + + if paid_param is True: + filters.append(StarsInvoice.paid.is_(True)) + applied_filters['paid'] = True + elif paid_param is False: + filters.append(or_(StarsInvoice.paid.is_(False), StarsInvoice.paid.is_(None))) + applied_filters['paid'] = False + + if type_param: + type_values = [value.strip() for value in type_param.split(',') if value.strip()] + if type_values: + filters.append(StarsInvoice.type.in_(type_values)) + applied_filters['type'] = type_values + + if user_id_param: + try: + filters.append(StarsInvoice.user_id == int(user_id_param)) + applied_filters['user_id'] = int(user_id_param) + except ValueError: + pass + + if content_hash_param: + filters.append(StarsInvoice.content_hash == content_hash_param) + applied_filters['content_hash'] = content_hash_param + + if search: + search_like = f"%{search}%" + clauses = [ + StarsInvoice.external_id.ilike(search_like), + StarsInvoice.invoice_url.ilike(search_like), + cast(StarsInvoice.id, String).ilike(search_like), + ] + filters.append(or_(*clauses)) + + total_stmt = select(func.count()).select_from(StarsInvoice) + if filters: + total_stmt = total_stmt.where(and_(*filters)) + total = (await session.execute(total_stmt)).scalar_one() + + base_payload = { + 'total': int(total or 0), + 'limit': limit, + 'offset': offset, + 'search': search or None, + 'filters': applied_filters, + 'has_more': (offset + limit) < int(total or 0), + } + + if not total: + base_payload.update({ + 'items': [], + 'stats': { + 'total': 0, + 'paid': 0, + 'unpaid': 0, + 'amount_total': 0, + 'amount_paid': 0, + 'amount_unpaid': 0, + 'by_type': {}, + }, + }) + return response.json(base_payload) + + query_stmt = ( + select(StarsInvoice) + .order_by(StarsInvoice.created.desc()) + .offset(offset) + .limit(limit) + ) + if filters: + query_stmt = query_stmt.where(and_(*filters)) + + invoice_rows = (await session.execute(query_stmt)).scalars().all() + + user_ids = [row.user_id for row in invoice_rows if row.user_id is not None] + content_hashes = [row.content_hash for row in invoice_rows if row.content_hash] + + users_map: Dict[int, User] = {} + if user_ids: + user_records = (await session.execute(select(User).where(User.id.in_(user_ids)))).scalars().all() + for user in user_records: + users_map[user.id] = user + + content_map: Dict[str, StoredContent] = {} + if content_hashes: + content_records = (await session.execute( + select(StoredContent).where(StoredContent.hash.in_(content_hashes)) + )).scalars().all() + for content in content_records: + content_map[content.hash] = content + + stats_stmt = select( + func.count().label('total'), + func.sum(case((StarsInvoice.paid.is_(True), 1), else_=0)).label('paid'), + func.sum(StarsInvoice.amount).label('amount_total'), + func.sum(case((StarsInvoice.paid.is_(True), StarsInvoice.amount), else_=0)).label('amount_paid'), + ) + if filters: + stats_stmt = stats_stmt.where(and_(*filters)) + stats_row = (await session.execute(stats_stmt)).first() + total_filtered = int((stats_row.total if stats_row else 0) or 0) + paid_count = int((stats_row.paid if stats_row else 0) or 0) + amount_total_value = int((stats_row.amount_total if stats_row else 0) or 0) + amount_paid_value = int((stats_row.amount_paid if stats_row else 0) or 0) + unpaid_count = max(total_filtered - paid_count, 0) + amount_unpaid_value = max(amount_total_value - amount_paid_value, 0) + + type_stats_stmt = select( + StarsInvoice.type, + func.count().label('total'), + func.sum(case((StarsInvoice.paid.is_(True), 1), else_=0)).label('paid'), + func.sum(StarsInvoice.amount).label('amount_total'), + func.sum(case((StarsInvoice.paid.is_(True), StarsInvoice.amount), else_=0)).label('amount_paid'), + ).group_by(StarsInvoice.type) + if filters: + type_stats_stmt = type_stats_stmt.where(and_(*filters)) + type_stats_rows = (await session.execute(type_stats_stmt)).all() + type_stats: Dict[str, Dict[str, int]] = {} + for invoice_type, total_cnt, paid_cnt, amt_total, amt_paid in type_stats_rows: + total_value = int(total_cnt or 0) + paid_value = int(paid_cnt or 0) + amt_total_value = int(amt_total or 0) + amt_paid_value = int(amt_paid or 0) + type_stats[invoice_type or 'unknown'] = { + 'total': total_value, + 'paid': paid_value, + 'unpaid': max(total_value - paid_value, 0), + 'amount_total': amt_total_value, + 'amount_paid': amt_paid_value, + 'amount_unpaid': max(amt_total_value - amt_paid_value, 0), + } + + items: List[Dict[str, Any]] = [] + for invoice in invoice_rows: + user_payload = None + if invoice.user_id and invoice.user_id in users_map: + user = users_map[invoice.user_id] + user_meta = user.meta or {} + user_payload = { + 'id': user.id, + 'telegram_id': user.telegram_id, + 'username': user.username, + 'first_name': user_meta.get('first_name'), + 'last_name': user_meta.get('last_name'), + } + + content_payload = None + if invoice.content_hash and invoice.content_hash in content_map: + stored_content = content_map[invoice.content_hash] + stored_meta = stored_content.meta or {} + metadata_candidate = stored_meta.get('metadata') if isinstance(stored_meta.get('metadata'), dict) else {} + title_candidates = [ + stored_meta.get('title'), + metadata_candidate.get('name') if isinstance(metadata_candidate, dict) else None, + stored_meta.get('license', {}).get('title') if isinstance(stored_meta.get('license'), dict) else None, + ] + title_value = next((value for value in title_candidates if isinstance(value, str) and value.strip()), None) + try: + cid_value = stored_content.cid.serialize_v2() + except Exception: + cid_value = None + content_payload = { + 'id': stored_content.id, + 'hash': stored_content.hash, + 'cid': cid_value, + 'title': title_value or stored_content.hash, + 'onchain_index': stored_content.onchain_index, + 'owner_address': stored_content.owner_address, + 'user_id': stored_content.user_id, + 'download_url': stored_content.web_url, + } + + items.append({ + 'id': invoice.id, + 'external_id': invoice.external_id, + 'type': invoice.type, + 'amount': invoice.amount, + 'paid': bool(invoice.paid), + 'invoice_url': invoice.invoice_url, + 'created_at': _format_dt(invoice.created), + 'user': user_payload, + 'content': content_payload, + 'status': 'paid' if invoice.paid else 'pending', + }) + + base_payload.update({ + 'items': items, + 'stats': { + 'total': total_filtered, + 'paid': paid_count, + 'unpaid': unpaid_count, + 'amount_total': amount_total_value, + 'amount_paid': amount_paid_value, + 'amount_unpaid': amount_unpaid_value, + 'by_type': type_stats, + }, + }) + return response.json(base_payload) + + async def s_api_v1_admin_system(request): if (unauth := _ensure_admin(request)): return unauth